version=version,
description=(
- 'A web interface for bulk PostgreSQL data validation and upload.'),
+ 'Upload into a PostgreSQL table, assisting data cleanup, via the web.'
+ ),
long_description=long_description,
long_description_content_type='text/x-rst',
# Run-time dependencies.
install_requires=[
'markupsafe',
- 'pgwui_common',
+ 'pgwui_upload_core==' + version,
'psycopg2',
'pyramid',
],
# Karl O. Pinc <kop@karlpinc.com>
-from pgwui_common import checkset
-from . import exceptions as upload_ex
-
+import pgwui_upload_core.check_settings
PGWUI_COMPONENT = 'pgwui_upload'
-UPLOAD_SETTINGS = ['menu_label',
- 'literal_column_headings',
- ]
-REQUIRED_SETTINGS = []
-BOOLEAN_SETTINGS = []
-
-
-def validate_literal_column_headings(errors, settings):
- '''Make sure the values are those allowed
- '''
- value = settings.get('literal_column_headings')
- if value is None:
- return
- if value not in ('on', 'off', 'ask'):
- errors.append(upload_ex.BadLiteralColumnHeadingsError(value))
def check_settings(component_config):
checking that the values of other settings are valid
'''
errors = []
- errors.extend(checkset.unknown_settings(
- PGWUI_COMPONENT, UPLOAD_SETTINGS, component_config))
- errors.extend(checkset.require_settings(
- PGWUI_COMPONENT, REQUIRED_SETTINGS, component_config))
- errors.extend(checkset.boolean_settings(
- PGWUI_COMPONENT, BOOLEAN_SETTINGS, component_config))
- validate_literal_column_headings(errors, component_config)
+ errors.extend(pgwui_upload_core.check_settings.check_settings(
+ PGWUI_COMPONENT,
+ pgwui_upload_core.check_settings.UPLOAD_SETTINGS,
+ pgwui_upload_core.check_settings.REQUIRED_SETTINGS,
+ pgwui_upload_core.check_settings.BOOLEAN_SETTINGS,
+ component_config))
return errors
# Karl O. Pinc <kop@karlpinc.com>
-from pgwui_common import exceptions as common_ex
from pgwui_core import exceptions as core_ex
-# PGWUI setting related exceptions
-
-class UploadError(common_ex.Error):
- pass
-
-
-class BadLiteralColumnHeadingsError(UploadError):
- def __init__(self, value):
- super().__init__(
- 'The "pgwui.pgwui_upload.literal_column_headings" PGWUI setting '
- ' must be "on", "off", "ask", or not present')
-
-
# Upload related exceptions
-class NoTableError(core_ex.PGWUIError):
+class NoTableError(core_ex.Error):
'''No table uploaded'''
def __init__(self, e, descr='', detail=''):
super(NoTableError, self).__init__(e, descr, detail)
-class BadTableError(core_ex.PGWUIError):
+class BadTableError(core_ex.Error):
'''Supplied name does not work for a table or view'''
def __init__(self, e, descr='', detail=''):
super(BadTableError, self).__init__(e, descr, detail)
<%!
from pgwui_common.path import asset_abspath
- auth_base_mak = asset_abspath('pgwui_common:templates/auth_base.mak')
+ upload_base_mak = asset_abspath('pgwui_upload_core:templates/upload.mak')
%>
-<%inherit file="${auth_base_mak}" />
+<%inherit file="${upload_base_mak}" />
<%block name="title">${pgwui['pgwui_upload']['menu_label']}</%block>
<%block name="meta_keywords">
<meta name="keywords"
- content="PGWUI generic upload" />
+ content="PGWUI generic table upload" />
</%block>
<%block name="meta_description">
</p>
</%block>
-<h1>Upload File Into Database</h1>
+<h1>Upload File Into Table Or View</h1>
<%def name="table_row(tab_index)">
<tr>
</tr>
</%def>
-<%def name="trim_row(tab_index)">
- <tr>
- <td class="label">
- <label for="trim_upload_id">Trim Leading/Trailing Spaces:</label>
- </td>
- <td>
- <input name="trim_upload"
- tabindex="${tab_index}"
- id="trim_upload_id"
- type="checkbox"
- ${trim_upload | n}
- />
- </td>
- </tr>
-</%def>
-
-<% form_elements = [table_row, trim_row] %>
-
-% if ask_about_literal_cols:
- <%def name="literal_row(tab_index)">
- <tr>
- <td class="label">
- <label for="literal_col_headings_id">Literal
- Uploaded Column Headings:</label>
- </td>
- <td>
- <input name="literal_col_headings"
- tabindex="${tab_index}"
- id="literal_col_headings_id"
- type="checkbox"
- ${literal_col_headings | n}
- />
- </td>
- </tr>
- </%def>
-
- <% form_elements.append(literal_row) %>
-% endif
+<%
+ form_elements = [table_row]
+ self.append_elements(form_elements) %>
-${parent.upload_form(form_elements)}
+${self.upload_form(form_elements)}
from pyramid.view import view_config
import logging
-import markupsafe
-import psycopg2.errorcodes
-from psycopg2 import ProgrammingError
from pgwui_common.view import auth_base_view
from pgwui_core.core import (
UploadEngine,
DataLineProcessor,
UploadDoubleTableForm,
- TabularFileUploadHandler,
UploadData,
- doublequote,
escape_eol,
is_checked,
)
+from pgwui_upload_core.views.upload import (
+ BaseTableUploadHandler,
+)
from pgwui_upload import exceptions as upload_ex
self.cur.execute(self.insert_stmt, udl.tuples)
-class TableUploadHandler(TabularFileUploadHandler):
+class TableUploadHandler(BaseTableUploadHandler):
'''
Attributes:
request A pyramid request instance
A list of PGWUIError instances
'''
uf = self.uf
- errors = super(TableUploadHandler, self).val_input()
+ errors = super().val_input()
qualified_table = uf['table']
if qualified_table == '':
errors.append(upload_ex.NoTableError(
'No table or view name supplied'))
- self.double_validator(errors)
-
return errors
- def write(self, result, errors):
- '''Add double upload key into form.'''
- response = super(TableUploadHandler, self).write(result, errors)
- self.write_double_key(response)
- return response
-
- def resolve_table(self, qualified_table):
- '''Return (schema, table) tuple of table name, or raise exception
- if not resolvable.
- '''
- try:
- self.cur.execute(
- ('SELECT nspname, relname'
- ' FROM pg_class'
- ' JOIN pg_namespace'
- ' ON (pg_namespace.oid = pg_class.relnamespace)'
- ' WHERE pg_class.oid = %s::REGCLASS::OID'),
- (qualified_table,))
- except ProgrammingError as err:
- pgcode = err.pgcode
- if pgcode == psycopg2.errorcodes.INVALID_SCHEMA_NAME:
- raise upload_ex.MissingSchemaError(
- 'No such schema',
- err.diag.message_primary,)
- elif pgcode == psycopg2.errorcodes.UNDEFINED_TABLE:
- raise upload_ex.MissingTableError(
- 'No such table or view',
- err.diag.message_primary,
- ('<p>Hint: Check spelling or try qualifying the'
- ' table name with a schema name</p>'))
- else:
- raise
- return self.cur.fetchone()
-
- def good_table(self, schema, table):
- '''Is the supplied table or view insertable?
- '''
- sql = ('SELECT 1 FROM information_schema.tables'
- ' WHERE tables.table_name = %s'
- ' AND tables.table_schema = %s'
- " AND (tables.is_insertable_into = 'YES'"
- # Unfortunatly, as of 9.2, the information_schema
- # tables.is_insertable_into does not reflect whether
- # there's an insert trigger on the table.
- " OR tables.table_type = 'VIEW')")
- self.cur.execute(sql, (table, schema))
- return self.cur.fetchone() is not None
-
- def quote_columns(self):
- '''Return boolean -- whether to take column names literally
- '''
- settings = self.request.registry.settings
- quoter_setting = settings['pgwui'].get('literal_column_headings')
- if quoter_setting == 'on':
- return True
- elif quoter_setting == 'ask':
- return self.uf['literal_col_headings']
- else:
- return False
-
def factory(self, ue):
'''Make a db loader function from an UploadEngine.
self.ue = ue
self.cur = ue.cur
- data = ue.data
qualified_table = self.uf['table']
quotecols = self.quote_columns()
- if quotecols:
- column_quoter = doublequote
- else:
- def column_quoter(x):
- return x
-
- schema, table = self.resolve_table(qualified_table)
-
- if not self.good_table(schema, table):
- raise upload_ex.CannotInsertError(
- 'Cannot insert into supplied table or view',
- ('({0}) is either is a view'
- ' that cannot be inserted into'
- ' or you do not have the necessary'
- ' permissions to the table or view').format(
- markupsafe.escape(qualified_table)))
-
- column_sql = ('SELECT 1 FROM information_schema.columns'
- ' WHERE columns.table_name = %s'
- ' AND columns.table_schema = %s')
- if quotecols:
- column_sql += ' AND columns.column_name = %s'
- else:
- column_sql += ' AND columns.column_name = lower(%s::name)'
-
- insert_stmt = 'INSERT INTO {0} ('.format(doublequote(qualified_table))
- value_string = ''
- col_sep = ''
- bad_cols = []
- for col_name in data.headers.tuples:
- # Check that colum name exists
- self.cur.execute(column_sql, (table, schema, col_name))
- if self.cur.fetchone() is None:
- bad_cols.append(col_name)
- else:
- # Add column to sql statement
- insert_stmt += col_sep + column_quoter(col_name)
- value_string += col_sep + '%s'
- col_sep = ', '
-
- if bad_cols:
- if quotecols:
- detail = ('<p>The following columns are not in the ({0})'
- ' table, or the supplied column names do not match'
- " the character case of the table's columns,"
- ' or you do not have permission to access'
- ' the columns:</p><ul>')
- else:
- detail = ('<p>The following columns are not in the ({0})'
- ' table, or the table has column names containing'
- ' upper case characters, or you do not have'
- ' permission to access the columns:</p><ul>')
- detail = detail.format(markupsafe.escape(qualified_table))
-
- for bad_col in bad_cols:
- detail += '<li>{0}</li>'.format(markupsafe.escape(bad_col))
- detail += '</ul>'
- raise upload_ex.BadHeadersError(
- 'Header line contains unknown column names',
- detail=detail)
-
- insert_stmt += ') VALUES({0})'.format(value_string)
+ column_quoter = self.get_column_quoter(quotecols)
+
+ insert_stmt = self.build_insert_stmt(
+ ue.data, qualified_table, quotecols, column_quoter)
return SaveLine(ue, self, insert_stmt)
@auth_base_view
def upload_view(request):
- response = UploadEngine(TableUploadHandler(request)).run()
+ tuh = TableUploadHandler(request).init()
+ response = UploadEngine(tuh).run()
settings = request.registry.settings
quoter_setting = settings['pgwui'].get('literal_column_headings')
response['ask_about_literal_cols'] = quoter_setting == 'ask'
+ # Keep these next 2
response.setdefault('pgwui', dict())
response['pgwui']['pgwui_upload'] = settings['pgwui']['pgwui_upload']
import pytest
+import pgwui_upload_core.check_settings
import pgwui_upload.check_settings as check_settings
-from pgwui_common import checkset
from pgwui_testing import testing
-from pgwui_upload import exceptions as upload_ex
# Activiate our pytest plugin
pytest_plugins = ("pgwui",)
# Mocks
-mock_unknown_settings = testing.make_mock_fixture(
- checkset, 'unknown_settings')
-
-mock_require_settings = testing.make_mock_fixture(
- checkset, 'require_settings')
-
-mock_boolean_settings = testing.make_mock_fixture(
- checkset, 'boolean_settings')
-
-
-# validate_literal_column_headings()
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_nosetting():
- '''No error is delivered when there's no setting'''
- errors = []
- check_settings.validate_literal_column_headings(errors, {})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_on():
- '''No error is delivered when the setting is "on"'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'on'})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_off():
- '''No error is delivered when the setting is "off"'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'off'})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_ask():
- '''No error is delivered when the setting is "ask"'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'ask'})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_bad():
- '''delivers an error when given a bad value'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'bad'})
-
- assert errors
- assert isinstance(
- errors[0], upload_ex.BadLiteralColumnHeadingsError)
-
-
-literal_err = 'literal column headings error'
-mock_validate_literal_column_headings = testing.make_mock_fixture(
- check_settings, 'validate_literal_column_headings',
- wraps=lambda errors, *args: errors.append(literal_err))
+mock_core_check_settings = testing.make_mock_fixture(
+ pgwui_upload_core.check_settings, 'check_settings')
# check_settings()
@pytest.mark.unittest
-def test_check_settings(mock_unknown_settings,
- mock_require_settings,
- mock_boolean_settings,
- mock_validate_literal_column_headings):
+def test_check_settings(mock_core_check_settings):
'''The setting checking functions are called once, the check_settings()
call returns all the errors from each mock.
'''
- unknown_retval = ['unk err']
- require_retval = ['req err']
- boolean_retval = ['bool err']
+ expected_errors = ['some error']
- mock_unknown_settings.return_value = unknown_retval
- mock_require_settings.return_value = require_retval
- mock_boolean_settings.return_value = boolean_retval
+ mock_core_check_settings.return_value = expected_errors
result = check_settings.check_settings({})
- mock_unknown_settings.assert_called_once
- mock_require_settings.assert_called_once
- mock_boolean_settings.assert_called_once
- mock_validate_literal_column_headings.assert_called_once
+ mock_core_check_settings.assert_called_once
- assert result.sort() == ([literal_err]
- + unknown_retval
- + require_retval
- + boolean_retval).sort()
+ assert result == expected_errors
'pgwui_logout': '/logout',
'home_page': '/'}
+mock_init = testing.instance_method_mock_fixture('init')
+
# Helper classes
return self.run_result
-class MockTableUploadHandler():
- def __init__(self, *args):
- pass
-
-
# Fixtures
@pytest.fixture
return MockUploadEngine(response)
monkeypatch.setattr(upload, 'UploadEngine', upload_engine)
- monkeypatch.setattr(
- upload, 'TableUploadHandler', MockTableUploadHandler)
settings = pyramid_request_config.get_settings()
settings['pgwui'] = settings.get('pgwui', dict())
# TableUploadHandler()
@pytest.fixture
-def neuter_tableuploadhandler(monkeypatch):
- '''Make TableUploadHander have a mock parent and the given uploadform
+def neuter_tableuploadhandler(monkeypatch, mock_init):
+ '''Make TableUploadHander have no initialization and the given uploadform
'''
def run(uploadform, request):
- monkeypatch.setattr(
- upload, 'TabularFileUploadHandler', MockTableUploadHandler)
-
uh = upload.TableUploadHandler(request)
+ mocked_init = mock_init(uh)
+ mocked_init.return_value = uh
+
monkeypatch.setattr(uh, 'uf', uploadform)
return uh
return run
-def test_tableuploadhandler_quote_columns_on(get_quote_columns):
- '''When the settings ask for literal_column_headings = on return
- True
- '''
- result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
- {'pgwui': {'literal_column_headings': 'on'}})
- assert result is True
-
-
-def test_tableuploadhandler_quote_columns_off(get_quote_columns):
- '''When the settings ask for literal_column_headings = off return
- False
- '''
- result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
- {'pgwui': {'literal_column_headings': 'off'}})
- assert result is False
-
-
-def test_tableuploadhandler_quote_columns_default(get_quote_columns):
- '''When the settings literal_column_headings is not present return
- False (as default)
- '''
- result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED, {'pgwui': {}})
- assert result is False
-
-
-def test_tableuploadhandler_quote_columns_ask_on(get_quote_columns):
- '''When the form asks for literal column headings return True
- '''
- result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
- {'pgwui': {'literal_column_headings': 'ask'}})
- assert result is True
-
-
-def test_tableuploadhandler_quote_columns_ask_off(get_quote_columns):
- '''When the form does not ask for literal column headings return False
- '''
- result = get_quote_columns({'literal_col_headings': False},
- {'pgwui': {'literal_column_headings': 'ask'}})
- assert result is False
-
-
# log_success()
+
@pytest.mark.parametrize(
('checked',), [
(constants.CHECKED,),
response = CHANGED_RESPONSE.copy()
response['csv_checked'] = checked
upload.log_success(response)
+
logs = caplog.record_tuples
assert len(logs) == 1
assert logs[0][1] == logging.INFO