Use pgwui_upload_core
authorKarl O. Pinc <kop@karlpinc.com>
Tue, 5 Jan 2021 21:19:02 +0000 (15:19 -0600)
committerKarl O. Pinc <kop@karlpinc.com>
Tue, 5 Jan 2021 21:19:02 +0000 (15:19 -0600)
setup.py
src/pgwui_upload/check_settings.py
src/pgwui_upload/exceptions.py
src/pgwui_upload/templates/upload.mak
src/pgwui_upload/views/upload.py
tests/test_check_settings.py
tests/views/test_upload.py

index 65fd1672ecfdaaf562c8c7b3ecac11680f48f217..3560c6e53d4daa542f42a1019c412cf1bc9f9dd7 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -74,7 +74,8 @@ setup(
     version=version,
 
     description=(
-        'A web interface for bulk PostgreSQL data validation and upload.'),
+        'Upload into a PostgreSQL table, assisting data cleanup, via the web.'
+    ),
     long_description=long_description,
     long_description_content_type='text/x-rst',
 
@@ -157,7 +158,7 @@ setup(
     # Run-time dependencies.
     install_requires=[
         'markupsafe',
-        'pgwui_common',
+        'pgwui_upload_core==' + version,
         'psycopg2',
         'pyramid',
     ],
index 1b725dd6ed89b275d96d0216046e6aac55944759..df90b693526c2cff4397296893e5a881c4681eff 100644 (file)
 
 # Karl O. Pinc <kop@karlpinc.com>
 
-from pgwui_common import checkset
-from . import exceptions as upload_ex
-
+import pgwui_upload_core.check_settings
 
 PGWUI_COMPONENT = 'pgwui_upload'
-UPLOAD_SETTINGS = ['menu_label',
-                   'literal_column_headings',
-                   ]
-REQUIRED_SETTINGS = []
-BOOLEAN_SETTINGS = []
-
-
-def validate_literal_column_headings(errors, settings):
-    '''Make sure the values are those allowed
-    '''
-    value = settings.get('literal_column_headings')
-    if value is None:
-        return
-    if value not in ('on', 'off', 'ask'):
-        errors.append(upload_ex.BadLiteralColumnHeadingsError(value))
 
 
 def check_settings(component_config):
@@ -50,12 +33,11 @@ def check_settings(component_config):
       checking that the values of other settings are valid
     '''
     errors = []
-    errors.extend(checkset.unknown_settings(
-        PGWUI_COMPONENT, UPLOAD_SETTINGS, component_config))
-    errors.extend(checkset.require_settings(
-        PGWUI_COMPONENT, REQUIRED_SETTINGS, component_config))
-    errors.extend(checkset.boolean_settings(
-        PGWUI_COMPONENT, BOOLEAN_SETTINGS, component_config))
-    validate_literal_column_headings(errors, component_config)
+    errors.extend(pgwui_upload_core.check_settings.check_settings(
+        PGWUI_COMPONENT,
+        pgwui_upload_core.check_settings.UPLOAD_SETTINGS,
+        pgwui_upload_core.check_settings.REQUIRED_SETTINGS,
+        pgwui_upload_core.check_settings.BOOLEAN_SETTINGS,
+        component_config))
 
     return errors
index ac017b4a4656b0e95153f5b28aafa6ec05686368..406cfc67e8012c35e61733edf70790472e340fff 100644 (file)
 
 # Karl O. Pinc <kop@karlpinc.com>
 
-from pgwui_common import exceptions as common_ex
 from pgwui_core import exceptions as core_ex
 
 
-# PGWUI setting related exceptions
-
-class UploadError(common_ex.Error):
-    pass
-
-
-class BadLiteralColumnHeadingsError(UploadError):
-    def __init__(self, value):
-        super().__init__(
-            'The "pgwui.pgwui_upload.literal_column_headings" PGWUI setting '
-            ' must be "on", "off", "ask", or not present')
-
-
 # Upload related exceptions
 
-class NoTableError(core_ex.PGWUIError):
+class NoTableError(core_ex.Error):
     '''No table uploaded'''
     def __init__(self, e, descr='', detail=''):
         super(NoTableError, self).__init__(e, descr, detail)
 
 
-class BadTableError(core_ex.PGWUIError):
+class BadTableError(core_ex.Error):
     '''Supplied name does not work for a table or view'''
     def __init__(self, e, descr='', detail=''):
         super(BadTableError, self).__init__(e, descr, detail)
index 425c41fc54cf04c54e2ec22fb5682560b36e9f55..f5ff25aea4d5792b50e8573dc8171103a94a62dc 100644 (file)
 <%!
     from pgwui_common.path import asset_abspath
 
-    auth_base_mak = asset_abspath('pgwui_common:templates/auth_base.mak')
+    upload_base_mak = asset_abspath('pgwui_upload_core:templates/upload.mak')
 %>
 
-<%inherit file="${auth_base_mak}" />
+<%inherit file="${upload_base_mak}" />
 
 <%block name="title">${pgwui['pgwui_upload']['menu_label']}</%block>
 <%block name="meta_keywords">
   <meta name="keywords"
-        content="PGWUI generic upload" />
+        content="PGWUI generic table upload" />
 </%block>
 
 <%block name="meta_description">
@@ -55,7 +55,7 @@
   </p>
 </%block>
 
-<h1>Upload File Into Database</h1>
+<h1>Upload File Into Table Or View</h1>
 
 <%def name="table_row(tab_index)">
       <tr>
       </tr>
 </%def>
 
-<%def name="trim_row(tab_index)">
-      <tr>
-        <td class="label">
-          <label for="trim_upload_id">Trim Leading/Trailing Spaces:</label>
-        </td>
-        <td>
-          <input name="trim_upload"
-                 tabindex="${tab_index}"
-                 id="trim_upload_id"
-                 type="checkbox"
-                 ${trim_upload | n}
-                 />
-        </td>
-      </tr>
-</%def>
-
-<% form_elements = [table_row, trim_row] %>
-
-% if ask_about_literal_cols:
-    <%def name="literal_row(tab_index)">
-          <tr>
-            <td class="label">
-              <label for="literal_col_headings_id">Literal
-                     Uploaded Column Headings:</label>
-            </td>
-            <td>
-              <input name="literal_col_headings"
-                     tabindex="${tab_index}"
-                     id="literal_col_headings_id"
-                     type="checkbox"
-                     ${literal_col_headings | n}
-                     />
-            </td>
-          </tr>
-    </%def>
-
-    <% form_elements.append(literal_row) %>
-% endif
+<%
+    form_elements = [table_row]
+    self.append_elements(form_elements) %>
 
-${parent.upload_form(form_elements)}
+${self.upload_form(form_elements)}
index 99408a51cfa8690b3ccd93091e0b2fe7d15e5ed3..bf2b900f56dad76655c9dc313239292e9a66c96c 100644 (file)
@@ -32,21 +32,19 @@ from __future__ import division
 
 from pyramid.view import view_config
 import logging
-import markupsafe
-import psycopg2.errorcodes
-from psycopg2 import ProgrammingError
 
 from pgwui_common.view import auth_base_view
 from pgwui_core.core import (
     UploadEngine,
     DataLineProcessor,
     UploadDoubleTableForm,
-    TabularFileUploadHandler,
     UploadData,
-    doublequote,
     escape_eol,
     is_checked,
 )
+from pgwui_upload_core.views.upload import (
+    BaseTableUploadHandler,
+)
 
 from pgwui_upload import exceptions as upload_ex
 
@@ -74,7 +72,7 @@ class SaveLine(DataLineProcessor):
         self.cur.execute(self.insert_stmt, udl.tuples)
 
 
-class TableUploadHandler(TabularFileUploadHandler):
+class TableUploadHandler(BaseTableUploadHandler):
     '''
     Attributes:
       request       A pyramid request instance
@@ -109,77 +107,15 @@ class TableUploadHandler(TabularFileUploadHandler):
           A list of PGWUIError instances
         '''
         uf = self.uf
-        errors = super(TableUploadHandler, self).val_input()
+        errors = super().val_input()
 
         qualified_table = uf['table']
         if qualified_table == '':
             errors.append(upload_ex.NoTableError(
                 'No table or view name supplied'))
 
-        self.double_validator(errors)
-
         return errors
 
-    def write(self, result, errors):
-        '''Add double upload key into form.'''
-        response = super(TableUploadHandler, self).write(result, errors)
-        self.write_double_key(response)
-        return response
-
-    def resolve_table(self, qualified_table):
-        '''Return (schema, table) tuple of table name, or raise exception
-        if not resolvable.
-        '''
-        try:
-            self.cur.execute(
-                ('SELECT nspname, relname'
-                 '  FROM pg_class'
-                 '       JOIN pg_namespace'
-                 '            ON (pg_namespace.oid = pg_class.relnamespace)'
-                 '  WHERE pg_class.oid = %s::REGCLASS::OID'),
-                (qualified_table,))
-        except ProgrammingError as err:
-            pgcode = err.pgcode
-            if pgcode == psycopg2.errorcodes.INVALID_SCHEMA_NAME:
-                raise upload_ex.MissingSchemaError(
-                    'No such schema',
-                    err.diag.message_primary,)
-            elif pgcode == psycopg2.errorcodes.UNDEFINED_TABLE:
-                raise upload_ex.MissingTableError(
-                    'No such table or view',
-                    err.diag.message_primary,
-                    ('<p>Hint: Check spelling or try qualifying the'
-                     ' table name with a schema name</p>'))
-            else:
-                raise
-        return self.cur.fetchone()
-
-    def good_table(self, schema, table):
-        '''Is the supplied table or view insertable?
-        '''
-        sql = ('SELECT 1 FROM information_schema.tables'
-               '  WHERE tables.table_name = %s'
-               '        AND tables.table_schema = %s'
-               "        AND (tables.is_insertable_into = 'YES'"
-               # Unfortunatly, as of 9.2, the information_schema
-               # tables.is_insertable_into does not reflect whether
-               # there's an insert trigger on the table.
-               "             OR tables.table_type = 'VIEW')")
-        self.cur.execute(sql, (table, schema))
-        return self.cur.fetchone() is not None
-
-    def quote_columns(self):
-        '''Return boolean -- whether to take column names literally
-        '''
-        settings = self.request.registry.settings
-        quoter_setting = settings['pgwui'].get('literal_column_headings')
-        if quoter_setting == 'on':
-            return True
-        elif quoter_setting == 'ask':
-            return self.uf['literal_col_headings']
-        else:
-            return False
-
     def factory(self, ue):
         '''Make a db loader function from an UploadEngine.
 
@@ -191,72 +127,13 @@ class TableUploadHandler(TabularFileUploadHandler):
 
         self.ue = ue
         self.cur = ue.cur
-        data = ue.data
         qualified_table = self.uf['table']
 
         quotecols = self.quote_columns()
-        if quotecols:
-            column_quoter = doublequote
-        else:
-            def column_quoter(x):
-                return x
-
-        schema, table = self.resolve_table(qualified_table)
-
-        if not self.good_table(schema, table):
-            raise upload_ex.CannotInsertError(
-                'Cannot insert into supplied table or view',
-                ('({0}) is either is a view'
-                 ' that cannot be inserted into'
-                 ' or you do not have the necessary'
-                 ' permissions to the table or view').format(
-                    markupsafe.escape(qualified_table)))
-
-        column_sql = ('SELECT 1 FROM information_schema.columns'
-                      '  WHERE columns.table_name = %s'
-                      '        AND columns.table_schema = %s')
-        if quotecols:
-            column_sql += '    AND columns.column_name = %s'
-        else:
-            column_sql += '    AND columns.column_name = lower(%s::name)'
-
-        insert_stmt = 'INSERT INTO {0} ('.format(doublequote(qualified_table))
-        value_string = ''
-        col_sep = ''
-        bad_cols = []
-        for col_name in data.headers.tuples:
-            # Check that colum name exists
-            self.cur.execute(column_sql, (table, schema, col_name))
-            if self.cur.fetchone() is None:
-                bad_cols.append(col_name)
-            else:
-                # Add column to sql statement
-                insert_stmt += col_sep + column_quoter(col_name)
-                value_string += col_sep + '%s'
-                col_sep = ', '
-
-        if bad_cols:
-            if quotecols:
-                detail = ('<p>The following columns are not in the ({0})'
-                          ' table, or the supplied column names do not match'
-                          " the character case of the table's columns,"
-                          ' or you do not have permission to access'
-                          ' the columns:</p><ul>')
-            else:
-                detail = ('<p>The following columns are not in the ({0})'
-                          ' table, or the table has column names containing'
-                          ' upper case characters, or you do not have'
-                          ' permission to access the columns:</p><ul>')
-            detail = detail.format(markupsafe.escape(qualified_table))
-
-            for bad_col in bad_cols:
-                detail += '<li>{0}</li>'.format(markupsafe.escape(bad_col))
-            detail += '</ul>'
-            raise upload_ex.BadHeadersError(
-                'Header line contains unknown column names',
-                detail=detail)
-
-        insert_stmt += ') VALUES({0})'.format(value_string)
+        column_quoter = self.get_column_quoter(quotecols)
+
+        insert_stmt = self.build_insert_stmt(
+            ue.data, qualified_table, quotecols, column_quoter)
 
         return SaveLine(ue, self, insert_stmt)
 
@@ -286,11 +163,13 @@ def log_success(response):
 @auth_base_view
 def upload_view(request):
 
-    response = UploadEngine(TableUploadHandler(request)).run()
+    tuh = TableUploadHandler(request).init()
+    response = UploadEngine(tuh).run()
 
     settings = request.registry.settings
     quoter_setting = settings['pgwui'].get('literal_column_headings')
     response['ask_about_literal_cols'] = quoter_setting == 'ask'
+    # Keep these next 2
     response.setdefault('pgwui', dict())
     response['pgwui']['pgwui_upload'] = settings['pgwui']['pgwui_upload']
 
index 3833fd0ff06641ef0711a79632624164efaca06d..bc2d7e69dc1c2f5fbc80d214a96245452a2d7383 100644 (file)
 
 import pytest
 
+import pgwui_upload_core.check_settings
 import pgwui_upload.check_settings as check_settings
 
-from pgwui_common import checkset
 from pgwui_testing import testing
-from pgwui_upload import exceptions as upload_ex
 
 # Activiate our pytest plugin
 pytest_plugins = ("pgwui",)
@@ -43,102 +42,24 @@ def test_check_setting_is_pgwui_check_settings(
 
 # Mocks
 
-mock_unknown_settings = testing.make_mock_fixture(
-    checkset, 'unknown_settings')
-
-mock_require_settings = testing.make_mock_fixture(
-    checkset, 'require_settings')
-
-mock_boolean_settings = testing.make_mock_fixture(
-    checkset, 'boolean_settings')
-
-
-# validate_literal_column_headings()
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_nosetting():
-    '''No error is delivered when there's no setting'''
-    errors = []
-    check_settings.validate_literal_column_headings(errors, {})
-
-    assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_on():
-    '''No error is delivered when the setting is "on"'''
-    errors = []
-    check_settings.validate_literal_column_headings(
-        errors, {'literal_column_headings': 'on'})
-
-    assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_off():
-    '''No error is delivered when the setting is "off"'''
-    errors = []
-    check_settings.validate_literal_column_headings(
-        errors, {'literal_column_headings': 'off'})
-
-    assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_ask():
-    '''No error is delivered when the setting is "ask"'''
-    errors = []
-    check_settings.validate_literal_column_headings(
-        errors, {'literal_column_headings': 'ask'})
-
-    assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_bad():
-    '''delivers an error when given a bad value'''
-    errors = []
-    check_settings.validate_literal_column_headings(
-        errors, {'literal_column_headings': 'bad'})
-
-    assert errors
-    assert isinstance(
-        errors[0], upload_ex.BadLiteralColumnHeadingsError)
-
-
-literal_err = 'literal column headings error'
-mock_validate_literal_column_headings = testing.make_mock_fixture(
-    check_settings, 'validate_literal_column_headings',
-    wraps=lambda errors, *args: errors.append(literal_err))
+mock_core_check_settings = testing.make_mock_fixture(
+    pgwui_upload_core.check_settings, 'check_settings')
 
 
 # check_settings()
 
 @pytest.mark.unittest
-def test_check_settings(mock_unknown_settings,
-                        mock_require_settings,
-                        mock_boolean_settings,
-                        mock_validate_literal_column_headings):
+def test_check_settings(mock_core_check_settings):
     '''The setting checking functions are called once, the check_settings()
     call returns all the errors from each mock.
     '''
 
-    unknown_retval = ['unk err']
-    require_retval = ['req err']
-    boolean_retval = ['bool err']
+    expected_errors = ['some error']
 
-    mock_unknown_settings.return_value = unknown_retval
-    mock_require_settings.return_value = require_retval
-    mock_boolean_settings.return_value = boolean_retval
+    mock_core_check_settings.return_value = expected_errors
 
     result = check_settings.check_settings({})
 
-    mock_unknown_settings.assert_called_once
-    mock_require_settings.assert_called_once
-    mock_boolean_settings.assert_called_once
-    mock_validate_literal_column_headings.assert_called_once
+    mock_core_check_settings.assert_called_once
 
-    assert result.sort() == ([literal_err]
-                             + unknown_retval
-                             + require_retval
-                             + boolean_retval).sort()
+    assert result == expected_errors
index 11c173d80ef77682b6da4bbb55216cf764271bc4..99607a806da9c35dddfb8ee918967854bc95631a 100644 (file)
@@ -59,6 +59,8 @@ DEFAULT_URLS = {'pgwui_upload': '/upload',
                 'pgwui_logout': '/logout',
                 'home_page': '/'}
 
+mock_init = testing.instance_method_mock_fixture('init')
+
 
 # Helper classes
 
@@ -70,11 +72,6 @@ class MockUploadEngine():
         return self.run_result
 
 
-class MockTableUploadHandler():
-    def __init__(self, *args):
-        pass
-
-
 # Fixtures
 
 @pytest.fixture
@@ -89,8 +86,6 @@ def isolate_upload_view(monkeypatch, pyramid_request_config):
             return MockUploadEngine(response)
 
         monkeypatch.setattr(upload, 'UploadEngine', upload_engine)
-        monkeypatch.setattr(
-            upload, 'TableUploadHandler', MockTableUploadHandler)
 
         settings = pyramid_request_config.get_settings()
         settings['pgwui'] = settings.get('pgwui', dict())
@@ -108,14 +103,14 @@ def isolate_upload_view(monkeypatch, pyramid_request_config):
 # TableUploadHandler()
 
 @pytest.fixture
-def neuter_tableuploadhandler(monkeypatch):
-    '''Make TableUploadHander have a mock parent and the given uploadform
+def neuter_tableuploadhandler(monkeypatch, mock_init):
+    '''Make TableUploadHander have no initialization and the given uploadform
     '''
     def run(uploadform, request):
-        monkeypatch.setattr(
-            upload, 'TabularFileUploadHandler', MockTableUploadHandler)
-
         uh = upload.TableUploadHandler(request)
+        mocked_init = mock_init(uh)
+        mocked_init.return_value = uh
+
         monkeypatch.setattr(uh, 'uf', uploadform)
 
         return uh
@@ -139,49 +134,8 @@ def get_quote_columns(neuter_tableuploadhandler):
     return run
 
 
-def test_tableuploadhandler_quote_columns_on(get_quote_columns):
-    '''When the settings ask for literal_column_headings = on return
-    True
-    '''
-    result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
-                               {'pgwui': {'literal_column_headings': 'on'}})
-    assert result is True
-
-
-def test_tableuploadhandler_quote_columns_off(get_quote_columns):
-    '''When the settings ask for literal_column_headings = off return
-    False
-    '''
-    result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
-                               {'pgwui': {'literal_column_headings': 'off'}})
-    assert result is False
-
-
-def test_tableuploadhandler_quote_columns_default(get_quote_columns):
-    '''When the settings literal_column_headings is not present return
-    False (as default)
-    '''
-    result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED, {'pgwui': {}})
-    assert result is False
-
-
-def test_tableuploadhandler_quote_columns_ask_on(get_quote_columns):
-    '''When the form asks for literal column headings return True
-    '''
-    result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
-                               {'pgwui': {'literal_column_headings': 'ask'}})
-    assert result is True
-
-
-def test_tableuploadhandler_quote_columns_ask_off(get_quote_columns):
-    '''When the form does not ask for literal column headings return False
-    '''
-    result = get_quote_columns({'literal_col_headings': False},
-                               {'pgwui': {'literal_column_headings': 'ask'}})
-    assert result is False
-
-
 # log_success()
+
 @pytest.mark.parametrize(
     ('checked',), [
         (constants.CHECKED,),
@@ -194,6 +148,7 @@ def test_log_success(caplog, checked):
     response = CHANGED_RESPONSE.copy()
     response['csv_checked'] = checked
     upload.log_success(response)
+
     logs = caplog.record_tuples
     assert len(logs) == 1
     assert logs[0][1] == logging.INFO