[run]
branch = True
source =
- pgwui_upload
+ pgwui_upload_core
views
[report]
dist/
docs/build/
README.html
-src/pgwui_upload.egg-info/
+src/pgwui_upload_core.egg-info/
include .coveragerc
include LICENSE.txt
include Makefile
-include src/pgwui_upload/VERSION
-include src/pgwui_upload/templates/*.mak
+include src/pgwui_upload_core/VERSION
+include src/pgwui_upload_core/templates/*.mak
include tox.ini
# Copyright (C) 2016, 2017, 2018, 2019 The Meme Factory, Inc.
# http://www.karlpinc.com/
-# This file is part of PGWUI_Upload.
+# This file is part of PGWUI_Upload_Core.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
here = path.abspath(path.dirname(__file__))
# Get program version
-with open(path.join(here, 'src', 'pgwui_upload', 'VERSION'),
+with open(path.join(here, 'src', 'pgwui_upload_core', 'VERSION'),
encoding='utf-8') as version_file:
version = version_file.read().strip()
]
setup(
- name='pgwui_upload',
+ name='pgwui_upload_core',
# Versioning is major.minor.fixes. Major releases change (after 1.0.0)
# when backward incompatibility is introduced. Minor releases introduce
version=version,
description=(
- 'A web interface for bulk PostgreSQL data validation and upload.'),
+ 'PGWUI API for bulk PostgreSQL data validation and upload.'),
long_description=long_description,
long_description_content_type='text/x-rst',
# The project's main homepage.
- url='http://pgwui_upload.readthedocs.io/',
+ url='http://pgwui_upload_core.readthedocs.io/',
# Author details
author='Karl O. Pinc',
- author_email='kop@meme.com',
+ author_email='kop@karlpinc.com',
# Choose your license
license='AGPLv3+',
# Run-time dependencies.
install_requires=[
+ 'attrs',
'markupsafe',
'pgwui_common',
'psycopg2',
# installed, specify them here. If using Python 2.6 or less, then these
# have to be included in MANIFEST.in as well.
package_data={
- 'pgwui_upload': [
+ 'pgwui_upload_core': [
'templates/*.mak',
'VERSION',
],
#
# Setup an entry point to support PGWUI autoconfigure discovery.
entry_points={
- 'pgwui.components': '.pgwui_upload = pgwui_upload',
- 'pgwui.check_settings':
- '.pgwui_upload = pgwui_upload.check_settings:check_settings'}
+ 'pgwui.components': '.pgwui_upload_core = pgwui_upload_core'}
)
+++ /dev/null
-# Copyright (C) 2018, 2020 The Meme Factory, Inc. http://www.karlpinc.com/
-
-# This file is part of PGWUI_Upload.
-#
-# This program is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of
-# the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-#
-
-# Karl O. Pinc <kop@karlpinc.com>
-
-from .pgwui_upload import includeme # noqa: F401
+++ /dev/null
-# Copyright (C) 2020 The Meme Factory, Inc. http://www.karlpinc.com/
-
-# This file is part of PGWUI_Upload.
-#
-# This program is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of
-# the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-#
-
-# Karl O. Pinc <kop@karlpinc.com>
-
-from pgwui_common import checkset
-from . import exceptions as upload_ex
-
-
-PGWUI_COMPONENT = 'pgwui_upload'
-UPLOAD_SETTINGS = ['menu_label',
- 'literal_column_headings',
- ]
-REQUIRED_SETTINGS = []
-BOOLEAN_SETTINGS = []
-
-
-def validate_literal_column_headings(errors, settings):
- '''Make sure the values are those allowed
- '''
- value = settings.get('literal_column_headings')
- if value is None:
- return
- if value not in ('on', 'off', 'ask'):
- errors.append(upload_ex.BadLiteralColumnHeadingsError(value))
-
-
-def check_settings(component_config):
- '''Check that all pgwui_upload specific settings are good.
- This includes:
- checking for unknown settings
- checking for missing required settings
- checking the boolean settings
- checking that the values of other settings are valid
- '''
- errors = []
- errors.extend(checkset.unknown_settings(
- PGWUI_COMPONENT, UPLOAD_SETTINGS, component_config))
- errors.extend(checkset.require_settings(
- PGWUI_COMPONENT, REQUIRED_SETTINGS, component_config))
- errors.extend(checkset.boolean_settings(
- PGWUI_COMPONENT, BOOLEAN_SETTINGS, component_config))
- validate_literal_column_headings(errors, component_config)
-
- return errors
+++ /dev/null
-# Copyright (C) 2020 The Meme Factory, Inc. http://www.karlpinc.com/
-
-# This file is part of PGWUI_Upload.
-#
-# This program is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of
-# the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-#
-
-# Karl O. Pinc <kop@karlpinc.com>
-
-from pgwui_common import exceptions as common_ex
-from pgwui_core import exceptions as core_ex
-
-
-# PGWUI setting related exceptions
-
-class UploadError(common_ex.Error):
- pass
-
-
-class BadLiteralColumnHeadingsError(UploadError):
- def __init__(self, value):
- super().__init__(
- 'The "pgwui.pgwui_upload.literal_column_headings" PGWUI setting '
- ' must be "on", "off", "ask", or not present')
-
-
-# Upload related exceptions
-
-class NoTableError(core_ex.PGWUIError):
- '''No table uploaded'''
- def __init__(self, e, descr='', detail=''):
- super(NoTableError, self).__init__(e, descr, detail)
-
-
-class BadTableError(core_ex.PGWUIError):
- '''Supplied name does not work for a table or view'''
- def __init__(self, e, descr='', detail=''):
- super(BadTableError, self).__init__(e, descr, detail)
-
-
-class MissingTableError(BadTableError):
- '''The supplied table or view does not exist'''
- def __init__(self, e, descr='', detail=''):
- super(MissingTableError, self).__init__(e, descr, detail)
-
-
-class MissingSchemaError(BadTableError):
- '''The schema portion of the supplied table or view does not exist'''
- def __init__(self, e, descr='', detail=''):
- super(MissingSchemaError, self).__init__(e, descr, detail)
-
-
-class CannotInsertError(BadTableError):
- '''Cannot insert into the supplied table or view'''
- def __init__(self, e, descr='', detail=''):
- super(CannotInsertError, self).__init__(e, descr, detail)
-
-
-class BadHeadersError(core_ex.PGWUIError):
- '''The headers in the uploaded file are bad.'''
- def __init__(self, e, descr='', detail=''):
- super(BadHeadersError, self).__init__(e, descr, detail)
+++ /dev/null
-# Copyright (C) 2018, 2020 The Meme Factory, Inc. http://www.karlpinc.com/
-
-# This file is part of PGWUI_Upload.
-#
-# This program is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of
-# the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-#
-
-# Karl O. Pinc <kop@karlpinc.com>
-
-'''Provide a way to configure PGWUI.
-'''
-PGWUI_COMPONENT = 'pgwui_upload'
-DEFAULT_UPLOAD_ROUTE = '/upload'
-DEFAULT_UPLOAD_MENU_LABEL = 'upload -- Upload File Into Database'
-
-
-def init_menu(config):
- '''Add default menu information into settings when they are not present
- '''
- settings = config.get_settings()
- pgwui = settings.setdefault('pgwui', dict())
- pgwui.setdefault(PGWUI_COMPONENT, dict())
- pgwui[PGWUI_COMPONENT].setdefault(
- 'menu_label', DEFAULT_UPLOAD_MENU_LABEL)
-
-
-def includeme(config):
- '''Pyramid configuration for PGWUI_Upload
- '''
- init_menu(config)
- config.add_route(PGWUI_COMPONENT, DEFAULT_UPLOAD_ROUTE)
- config.scan()
+++ /dev/null
-<%doc>
- Copyright (C) 2015, 2018, 2020 The Meme Factory, Inc.
- http://www.karlpinc.com/
-
- This file is part of PGWUI_Upload.
-
- This program is free software: you can redistribute it and/or
- modify it under the terms of the GNU Affero General Public License
- as published by the Free Software Foundation, either version 3 of
- the License, or (at your option) any later version.
-
- This program is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public
- License along with this program. If not, see
- <http://www.gnu.org/licenses/>.
-
- Template for generic upload page.
-
- Karl O. Pinc <kop@karlpinc.com>
-
- This template uses the following variables in it's context:
-
- ask_about_literal_cols
-
-</%doc>
-
-
-<%!
- from pgwui_common.path import asset_abspath
-
- auth_base_mak = asset_abspath('pgwui_common:templates/auth_base.mak')
-%>
-
-<%inherit file="${auth_base_mak}" />
-
-<%block name="title">${pgwui['pgwui_upload']['menu_label']}</%block>
-<%block name="meta_keywords">
- <meta name="keywords"
- content="PGWUI generic upload" />
-</%block>
-
-<%block name="meta_description">
- <meta name="description"
- content="Upload a file into a PostgreSQL table." />
-</%block>
-
-<%block name="action_success">
- <p><em class="success">Table (${table})
- successfully updated</em> from a file containing ${lines}
- lines<em class="success">!</em> (Including column headings.)
- </p>
-</%block>
-
-<h1>Upload File Into Database</h1>
-
-<%def name="table_row(tab_index)">
- <tr>
- <td class="label">
- <label for="table_id">Table or View:</label>
- </td>
- <td>
- <input name="table"
- tabindex="${tab_index}"
- id="table_id"
- type="text"
- size="30"
- value="${table}"
- />
- </td>
- </tr>
-</%def>
-
-<%def name="trim_row(tab_index)">
- <tr>
- <td class="label">
- <label for="trim_upload_id">Trim Leading/Trailing Spaces:</label>
- </td>
- <td>
- <input name="trim_upload"
- tabindex="${tab_index}"
- id="trim_upload_id"
- type="checkbox"
- ${trim_upload | n}
- />
- </td>
- </tr>
-</%def>
-
-<% form_elements = [table_row, trim_row] %>
-
-% if ask_about_literal_cols:
- <%def name="literal_row(tab_index)">
- <tr>
- <td class="label">
- <label for="literal_col_headings_id">Literal
- Uploaded Column Headings:</label>
- </td>
- <td>
- <input name="literal_col_headings"
- tabindex="${tab_index}"
- id="literal_col_headings_id"
- type="checkbox"
- ${literal_col_headings | n}
- />
- </td>
- </tr>
- </%def>
-
- <% form_elements.append(literal_row) %>
-% endif
-
-${parent.upload_form(form_elements)}
+++ /dev/null
-# Copyright (C) 2015, 2018, 2020 The Meme Factory, Inc.
-# http://www.karlpinc.com/
-
-# This file is part of PGWUI_Upload.
-#
-# This program is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of
-# the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-#
-
-# Karl O. Pinc <kop@karlpinc.com>
-#
-# Bugs:
-# All data is presented to the db as a string, which could result
-# in problems with type coercion.
-
-# Write python 3 compatible code.
-from __future__ import print_function
-from __future__ import unicode_literals
-from __future__ import absolute_import
-from __future__ import division
-
-from pyramid.view import view_config
-import logging
-import markupsafe
-import psycopg2.errorcodes
-from psycopg2 import ProgrammingError
-
-from pgwui_common.view import auth_base_view
-from pgwui_core.core import (
- UploadEngine,
- DataLineProcessor,
- UploadDoubleTableForm,
- TabularFileUploadHandler,
- UploadData,
- doublequote,
- escape_eol,
- is_checked,
-)
-
-from pgwui_upload import exceptions as upload_ex
-
-
-log = logging.getLogger(__name__)
-
-
-class SaveLine(DataLineProcessor):
- def __init__(self, ue, uh, insert_stmt):
- '''
- ue UploadEngine instance
- uh UploadHandler instance
- insert_stmt Statement used to insert into db.
- (psycopg2 formatted for substituion)
- '''
- super(SaveLine, self).__init__(ue, uh)
- self.insert_stmt = insert_stmt
-
- def eat(self, udl):
- '''
- Upload a line of data into the db.
-
- udl An UploadDataLine instance
- '''
- self.cur.execute(self.insert_stmt, udl.tuples)
-
-
-class TableUploadHandler(TabularFileUploadHandler):
- '''
- Attributes:
- request A pyramid request instance
- uf A GCUploadForm instance
- session A pyramid session instance
- ue
- cur
- '''
-
- def make_form(self):
- '''
- Make the upload form needed by this handler.
- '''
- return UploadDoubleTableForm(self)
-
- def get_data(self):
- '''
- Return an UploadData instance, with flags set as desired.
- '''
- uf = self.uf
- self.data = UploadData(uf['localfh'],
- uf['upload_fmt'],
- uf['upload_null'],
- uf['null_rep'],
- trim=uf['trim_upload'])
-
- def val_input(self):
- '''
- Validate input needed beyond that required to connect to the db.
-
- Returns:
- A list of PGWUIError instances
- '''
- uf = self.uf
- errors = super(TableUploadHandler, self).val_input()
-
- qualified_table = uf['table']
- if qualified_table == '':
- errors.append(upload_ex.NoTableError(
- 'No table or view name supplied'))
-
- self.double_validator(errors)
-
- return errors
-
- def write(self, result, errors):
- '''Add double upload key into form.'''
- response = super(TableUploadHandler, self).write(result, errors)
- self.write_double_key(response)
- return response
-
- def resolve_table(self, qualified_table):
- '''Return (schema, table) tuple of table name, or raise exception
- if not resolvable.
- '''
- try:
- self.cur.execute(
- ('SELECT nspname, relname'
- ' FROM pg_class'
- ' JOIN pg_namespace'
- ' ON (pg_namespace.oid = pg_class.relnamespace)'
- ' WHERE pg_class.oid = %s::REGCLASS::OID'),
- (qualified_table,))
- except ProgrammingError as err:
- pgcode = err.pgcode
- if pgcode == psycopg2.errorcodes.INVALID_SCHEMA_NAME:
- raise upload_ex.MissingSchemaError(
- 'No such schema',
- err.diag.message_primary,)
- elif pgcode == psycopg2.errorcodes.UNDEFINED_TABLE:
- raise upload_ex.MissingTableError(
- 'No such table or view',
- err.diag.message_primary,
- ('<p>Hint: Check spelling or try qualifying the'
- ' table name with a schema name</p>'))
- else:
- raise
- return self.cur.fetchone()
-
- def good_table(self, schema, table):
- '''Is the supplied table or view insertable?
- '''
- sql = ('SELECT 1 FROM information_schema.tables'
- ' WHERE tables.table_name = %s'
- ' AND tables.table_schema = %s'
- " AND (tables.is_insertable_into = 'YES'"
- # Unfortunatly, as of 9.2, the information_schema
- # tables.is_insertable_into does not reflect whether
- # there's an insert trigger on the table.
- " OR tables.table_type = 'VIEW')")
- self.cur.execute(sql, (table, schema))
- return self.cur.fetchone() is not None
-
- def quote_columns(self):
- '''Return boolean -- whether to take column names literally
- '''
- settings = self.request.registry.settings
- quoter_setting = settings['pgwui'].get('literal_column_headings')
- if quoter_setting == 'on':
- return True
- elif quoter_setting == 'ask':
- return self.uf['literal_col_headings']
- else:
- return False
-
- def factory(self, ue):
- '''Make a db loader function from an UploadEngine.
-
- Input:
-
- Side Effects:
- Yes, lots.
- '''
-
- self.ue = ue
- self.cur = ue.cur
- data = ue.data
- qualified_table = self.uf['table']
-
- quotecols = self.quote_columns()
- if quotecols:
- column_quoter = doublequote
- else:
- def column_quoter(x):
- return x
-
- schema, table = self.resolve_table(qualified_table)
-
- if not self.good_table(schema, table):
- raise upload_ex.CannotInsertError(
- 'Cannot insert into supplied table or view',
- ('({0}) is either is a view'
- ' that cannot be inserted into'
- ' or you do not have the necessary'
- ' permissions to the table or view').format(
- markupsafe.escape(qualified_table)))
-
- column_sql = ('SELECT 1 FROM information_schema.columns'
- ' WHERE columns.table_name = %s'
- ' AND columns.table_schema = %s')
- if quotecols:
- column_sql += ' AND columns.column_name = %s'
- else:
- column_sql += ' AND columns.column_name = lower(%s::name)'
-
- insert_stmt = 'INSERT INTO {0} ('.format(doublequote(qualified_table))
- value_string = ''
- col_sep = ''
- bad_cols = []
- for col_name in data.headers.tuples:
- # Check that colum name exists
- self.cur.execute(column_sql, (table, schema, col_name))
- if self.cur.fetchone() is None:
- bad_cols.append(col_name)
- else:
- # Add column to sql statement
- insert_stmt += col_sep + column_quoter(col_name)
- value_string += col_sep + '%s'
- col_sep = ', '
-
- if bad_cols:
- if quotecols:
- detail = ('<p>The following columns are not in the ({0})'
- ' table, or the supplied column names do not match'
- " the character case of the table's columns,"
- ' or you do not have permission to access'
- ' the columns:</p><ul>')
- else:
- detail = ('<p>The following columns are not in the ({0})'
- ' table, or the table has column names containing'
- ' upper case characters, or you do not have'
- ' permission to access the columns:</p><ul>')
- detail = detail.format(markupsafe.escape(qualified_table))
-
- for bad_col in bad_cols:
- detail += '<li>{0}</li>'.format(markupsafe.escape(bad_col))
- detail += '</ul>'
- raise upload_ex.BadHeadersError(
- 'Header line contains unknown column names',
- detail=detail)
-
- insert_stmt += ') VALUES({0})'.format(value_string)
-
- return SaveLine(ue, self, insert_stmt)
-
-
-@view_config(route_name='pgwui_upload',
- renderer='pgwui_upload:templates/upload.mak')
-@auth_base_view
-def upload_view(request):
-
- response = UploadEngine(TableUploadHandler(request)).run()
-
- settings = request.registry.settings
- quoter_setting = settings['pgwui'].get('literal_column_headings')
- response['ask_about_literal_cols'] = quoter_setting == 'ask'
- response.setdefault('pgwui', dict())
- response['pgwui']['pgwui_upload'] = settings['pgwui']['pgwui_upload']
-
- if response['db_changed']:
- if is_checked(response['csv_checked']):
- upload_fmt = 'CSV'
- else:
- upload_fmt = 'TAB'
- log.info('Successful upload: DB {db}: Table ({table}):'
- ' File ({filename}): Lines {lines}:'
- ' Format {format}: Upload Null {null}: Null Rep ({null_rep}):'
- ' Trim {trim}: By user {user}'
- .format(filename=response['filename'],
- lines=response['lines'],
- format=upload_fmt,
- null=is_checked(response['upload_null']),
- null_rep=escape_eol(response['null_rep']),
- trim=is_checked(response['trim_upload']),
- db=response['db'],
- table=response['table'],
- user=response['user']))
- return response
--- /dev/null
+# Copyright (C) 2018, 2020 The Meme Factory, Inc. http://www.karlpinc.com/
+
+# This file is part of PGWUI_Upload.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+# Karl O. Pinc <kop@karlpinc.com>
+
+from .pgwui_upload_core import includeme # noqa: F401
--- /dev/null
+# Copyright (C) 2020 The Meme Factory, Inc. http://www.karlpinc.com/
+
+# This file is part of PGWUI_Upload_Core.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+# Karl O. Pinc <kop@karlpinc.com>
+
+from pgwui_common import checkset
+from . import exceptions as upload_core_ex
+
+
+UPLOAD_SETTINGS = ['menu_label',
+ 'literal_column_headings',
+ ]
+REQUIRED_SETTINGS = []
+BOOLEAN_SETTINGS = []
+
+
+def validate_literal_column_headings(component, errors, settings):
+ '''Make sure the values are those allowed
+ '''
+ value = settings.get('literal_column_headings')
+ if value is None:
+ return
+ if value not in ('on', 'off', 'ask'):
+ errors.append(
+ upload_core_ex.BadLiteralColumnHeadingsError(component, value))
+
+
+def check_settings(
+ component, all_setngs, required_setngs, boolean_setngs,
+ component_config):
+ '''Check that all pgwui_upload specific settings are good.
+ This includes:
+ checking for unknown settings
+ checking for missing required settings
+ checking the boolean settings
+ checking that the values of other settings are valid
+ '''
+ errors = []
+ errors.extend(checkset.unknown_settings(
+ component, all_setngs, component_config))
+ errors.extend(checkset.require_settings(
+ component, required_setngs, component_config))
+ errors.extend(checkset.boolean_settings(
+ component, boolean_setngs, component_config))
+ validate_literal_column_headings(component, errors, component_config)
+
+ return errors
--- /dev/null
+# Copyright (C) 2020 The Meme Factory, Inc. http://www.karlpinc.com/
+
+# This file is part of PGWUI_Upload.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+# Karl O. Pinc <kop@karlpinc.com>
+
+from pgwui_core import exceptions as core_ex
+
+
+# PGWUI setting related exceptions
+
+class UploadError(core_ex.Error):
+ pass
+
+
+class BadLiteralColumnHeadingsError(UploadError):
+ def __init__(self, component, value):
+ super().__init__(
+ f'The "pgwui.{component}.literal_column_headings" PGWUI setting '
+ f' is ({value}), it must be "on", "off", "ask", or the'
+ ' entire setting be omitted')
+
+
+# Upload related exceptions
+
+class NoTableError(UploadError):
+ '''No table uploaded'''
+ def __init__(self, e, descr='', detail=''):
+ super(NoTableError, self).__init__(e, descr, detail)
+
+
+class BadTableError(UploadError):
+ '''Supplied name does not work for a table or view'''
+ def __init__(self, e, descr='', detail=''):
+ super(BadTableError, self).__init__(e, descr, detail)
+
+
+class MissingTableError(BadTableError):
+ '''The supplied table or view does not exist'''
+ def __init__(self, e, descr='', detail=''):
+ super(MissingTableError, self).__init__(e, descr, detail)
+
+
+class MissingSchemaError(BadTableError):
+ '''The schema portion of the supplied table or view does not exist'''
+ def __init__(self, e, descr='', detail=''):
+ super(MissingSchemaError, self).__init__(e, descr, detail)
+
+
+class CannotInsertError(BadTableError):
+ '''Cannot insert into the supplied table or view'''
+ def __init__(self, e, descr='', detail=''):
+ super(CannotInsertError, self).__init__(e, descr, detail)
+
+
+class BadHeadersError(UploadError):
+ '''The headers in the uploaded file are bad.'''
+ def __init__(self, e, descr='', detail=''):
+ super(BadHeadersError, self).__init__(e, descr, detail)
--- /dev/null
+# Copyright (C) 2018, 2020 The Meme Factory, Inc. http://www.karlpinc.com/
+
+# This file is part of Pgwui_Upload_Core.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+# Karl O. Pinc <kop@karlpinc.com>
+
+'''Provide a way to configure this PGWUI component.
+'''
+PGWUI_COMPONENT = 'pgwui_upload_core'
+
+
+def includeme(config):
+ '''Pyramid configuration for Pgwui_Upload_Core
+ '''
+ config.scan()
--- /dev/null
+<%doc>
+ Copyright (C) 2015, 2018, 2020 The Meme Factory, Inc.
+ http://www.karlpinc.com/
+
+ This file is part of PGWUI_Upload_Core.
+
+ This program is free software: you can redistribute it and/or
+ modify it under the terms of the GNU Affero General Public License
+ as published by the Free Software Foundation, either version 3 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public
+ License along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+
+ Template for generic upload pages.
+
+ Karl O. Pinc <kop@karlpinc.com>
+
+ This template uses the following variables in it's context:
+
+ ask_about_literal_cols
+
+ It makes available:
+
+ append_elements(elementlist) Adds form elements to the end of the list
+
+</%doc>
+
+
+<%!
+ from pgwui_common.path import asset_abspath
+
+ auth_base_mak = asset_abspath('pgwui_common:templates/auth_base.mak')
+%>
+
+<%inherit file="${auth_base_mak}" />
+
+<%def name="trim_row(tab_index)">
+ <tr>
+ <td class="label">
+ <label for="trim_upload_id">Trim Leading/Trailing Spaces:</label>
+ </td>
+ <td>
+ <input name="trim_upload"
+ tabindex="${tab_index}"
+ id="trim_upload_id"
+ type="checkbox"
+ ${trim_upload | n}
+ />
+ </td>
+ </tr>
+</%def>
+
+<%def name="append_elements(form_elements)">
+ <% form_elements.append(trim_row) %>
+ % if ask_about_literal_cols:
+ <%def name="literal_row(tab_index)">
+ <tr>
+ <td class="label">
+ <label for="literal_col_headings_id">Literal
+ Uploaded Column Headings:</label>
+ </td>
+ <td>
+ <input name="literal_col_headings"
+ tabindex="${tab_index}"
+ id="literal_col_headings_id"
+ type="checkbox"
+ ${literal_col_headings | n}
+ />
+ </td>
+ </tr>
+ </%def>
+
+ <% form_elements.append(literal_row) %>
+ % endif
+</%def>
--- /dev/null
+# Copyright (C) 2015, 2018, 2020 The Meme Factory, Inc.
+# http://www.karlpinc.com/
+
+# This file is part of PGWUI_Upload_Core.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+# Karl O. Pinc <kop@karlpinc.com>
+#
+# Bugs:
+# All data is presented to the db as a string, which could result
+# in problems with type coercion.
+
+import attr
+import logging
+import markupsafe
+import psycopg2.errorcodes
+from psycopg2 import ProgrammingError
+
+from pgwui_core.core import (
+ DataLineProcessor,
+ TabularFileUploadHandler,
+ UploadData,
+ doublequote,
+)
+
+from pgwui_upload_core import exceptions as upload_ex
+
+
+log = logging.getLogger(__name__)
+
+
+class SaveLine(DataLineProcessor):
+ def __init__(self, ue, uh, insert_stmt):
+ '''
+ ue UploadEngine instance
+ uh UploadHandler instance
+ insert_stmt Statement used to insert into db.
+ (psycopg2 formatted for substituion)
+ '''
+ super(SaveLine, self).__init__(ue, uh)
+ self.insert_stmt = insert_stmt
+
+ def eat(self, udl):
+ '''
+ Upload a line of data into the db.
+
+ udl An UploadDataLine instance
+ '''
+ self.cur.execute(self.insert_stmt, udl.tuples)
+
+
+@attr.s
+class BaseTableUploadHandler(TabularFileUploadHandler):
+ '''
+ Attributes:
+ request A pyramid request instance
+ uf A GCUploadForm instance
+ session A pyramid session instance
+ ue
+ cur
+ '''
+ ue = attr.ib(default=None)
+
+ def get_data(self):
+ '''
+ Return an UploadData instance, with flags set as desired.
+ '''
+ uf = self.uf
+ self.data = UploadData(uf['localfh'],
+ uf['upload_fmt'],
+ uf['upload_null'],
+ uf['null_rep'],
+ trim=uf['trim_upload'])
+
+ def val_input(self):
+ '''
+ Validate input needed beyond that required to connect to the db.
+
+ Returns:
+ A list of PGWUIError instances
+ '''
+ errors = super().val_input()
+
+ self.double_validator(errors)
+
+ return errors
+
+ def write(self, result, errors):
+ '''Add double upload key into form.'''
+ response = super().write(result, errors)
+ self.write_double_key(response)
+ return response
+
+ def resolve_table(self, qualified_table):
+ '''Return (schema, table) tuple of table name, or raise exception
+ if not resolvable.
+ '''
+ try:
+ self.cur.execute(
+ ('SELECT nspname, relname'
+ ' FROM pg_class'
+ ' JOIN pg_namespace'
+ ' ON (pg_namespace.oid = pg_class.relnamespace)'
+ ' WHERE pg_class.oid = %s::REGCLASS::OID'),
+ (qualified_table,))
+ except ProgrammingError as err:
+ pgcode = err.pgcode
+ if pgcode == psycopg2.errorcodes.INVALID_SCHEMA_NAME:
+ raise upload_ex.MissingSchemaError(
+ 'No such schema',
+ err.diag.message_primary,)
+ elif pgcode == psycopg2.errorcodes.UNDEFINED_TABLE:
+ raise upload_ex.MissingTableError(
+ 'No such table or view',
+ err.diag.message_primary,
+ ('<p>Hint: Check spelling or try qualifying the'
+ ' table name with a schema name</p>'))
+ else:
+ raise
+ return self.cur.fetchone()
+
+ def good_table(self, schema, table):
+ '''Is the supplied table or view insertable?
+ '''
+ sql = ('SELECT 1 FROM information_schema.tables'
+ ' WHERE tables.table_name = %s'
+ ' AND tables.table_schema = %s'
+ " AND (tables.is_insertable_into = 'YES'"
+ # Unfortunatly, as of 9.2, the information_schema
+ # tables.is_insertable_into does not reflect whether
+ # there's an insert trigger on the table.
+ " OR tables.table_type = 'VIEW')")
+ self.cur.execute(sql, (table, schema))
+ return self.cur.fetchone() is not None
+
+ def quote_columns(self):
+ '''Return boolean -- whether to take column names literally
+ '''
+ settings = self.request.registry.settings
+ quoter_setting = settings['pgwui'].get('literal_column_headings')
+ if quoter_setting == 'on':
+ return True
+ elif quoter_setting == 'ask':
+ return self.uf['literal_col_headings']
+ else:
+ return False
+
+ def validate_table(self, qualified_table):
+ '''Return schema and table names, or raise an exception
+ if the relation is not writable
+ '''
+ schema, table = self.resolve_table(qualified_table)
+ if not self.good_table(schema, table):
+ raise upload_ex.CannotInsertError(
+ 'Cannot insert into supplied table or view',
+ ('({0}) is either is a view'
+ ' that cannot be inserted into'
+ ' or you do not have the necessary'
+ ' permissions to the table or view').format(
+ markupsafe.escape(qualified_table)))
+ return (schema, table)
+
+ def report_bad_cols(self, qualified_table, bad_cols, quotecols):
+ if quotecols:
+ detail = ('<p>The following columns are not in the ({0})'
+ ' table, or the first line of your file is not'
+ ' in the expected format and your column names'
+ ' have merged (all the names appear in a single'
+ ' list item, below), or the supplied column names'
+ ' do not match'
+ " the character case of the table's columns,"
+ ' or you do not have permission to access'
+ ' the columns:</p><ul>')
+ else:
+ detail = ('<p>The following columns are not in the ({0})'
+ ' table, or the first line of your file is not'
+ ' in the expected format and your column names'
+ ' have merged (all the names appear in a single'
+ ' list item, below), '
+ ' or the table has column names containing'
+ ' upper case characters, or you do not have'
+ ' permission to access the columns:</p><ul>')
+ detail = detail.format(markupsafe.escape(qualified_table))
+
+ for bad_col in bad_cols:
+ detail += '<li>{0}</li>'.format(markupsafe.escape(bad_col))
+ detail += '</ul>'
+ raise upload_ex.BadHeadersError(
+ 'Header line contains unknown column names',
+ detail=detail)
+
+ def get_column_quoter(self, quotecols):
+ if quotecols:
+ return doublequote
+ else:
+ def column_quoter(x):
+ return x
+ return column_quoter
+
+ def build_insert_stmt(
+ self, data, qualified_table, quotecols, column_quoter):
+ schema, table = self.validate_table(qualified_table)
+
+ column_sql = ('SELECT 1 FROM information_schema.columns'
+ ' WHERE columns.table_name = %s'
+ ' AND columns.table_schema = %s')
+ if quotecols:
+ column_sql += ' AND columns.column_name = %s'
+ else:
+ column_sql += ' AND columns.column_name = lower(%s::name)'
+
+ insert_stmt = 'INSERT INTO {0} ('.format(doublequote(qualified_table))
+ value_string = ''
+ col_sep = ''
+ bad_cols = []
+ for col_name in data.headers.tuples:
+ # Check that colum name exists
+ self.cur.execute(column_sql, (table, schema, col_name))
+ if self.cur.fetchone() is None:
+ bad_cols.append(col_name)
+ else:
+ # Add column to sql statement
+ insert_stmt += col_sep + column_quoter(col_name)
+ value_string += col_sep + '%s'
+ col_sep = ', '
+
+ if bad_cols:
+ self.report_bad_cols(qualified_table, bad_cols, quotecols)
+
+ return insert_stmt + ') VALUES({0})'.format(value_string)
+
+ def factory(self, ue):
+ '''Make a db loader function from an UploadEngine.
+
+ Input:
+
+ Side Effects:
+ Yes, lots.
+ '''
+ raise NotImplementedError()
# Copyright (C) 2020 The Meme Factory, Inc. http://www.karlpinc.com/
-# This file is part of PGWUI_Upload.
+# This file is part of PGWUI_Upload_Core.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
import pytest
-import pgwui_upload.check_settings as check_settings
+import pgwui_upload_core.check_settings as check_settings
from pgwui_common import checkset
from pgwui_testing import testing
-from pgwui_upload import exceptions as upload_ex
+from pgwui_upload_core import exceptions as upload_ex
-# Activiate our pytest plugin
+# Activiate the PGWUI pytest plugins
pytest_plugins = ("pgwui",)
-# Module packaging test
-
-def test_check_setting_is_pgwui_check_settings(
- pgwui_check_settings_entry_point):
- '''Ensure that pgwui_upload has a pgwui.check_settings entry point
- '''
- assert (pgwui_check_settings_entry_point('pgwui_upload.check_settings')
- is True)
-
-
# Mocks
mock_unknown_settings = testing.make_mock_fixture(
# validate_literal_column_headings()
+@pytest.mark.parametrize(
+ ('settings', 'error_class'), [
+ ({}, None),
+ ({'literal_column_headings': 'on'}, None),
+ ({'literal_column_headings': 'off'}, None),
+ ({'literal_column_headings': 'ask'}, None),
+ ({'literal_column_headings': 'bad'},
+ upload_ex.BadLiteralColumnHeadingsError)])
@pytest.mark.unittest
-def test_validate_literal_column_headings_nosetting():
+def test_validate_literal_column_headings(settings, error_class):
'''No error is delivered when there's no setting'''
errors = []
- check_settings.validate_literal_column_headings(errors, {})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_on():
- '''No error is delivered when the setting is "on"'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'on'})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_off():
- '''No error is delivered when the setting is "off"'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'off'})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_ask():
- '''No error is delivered when the setting is "ask"'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'ask'})
-
- assert errors == []
-
-
-@pytest.mark.unittest
-def test_validate_literal_column_headings_bad():
- '''delivers an error when given a bad value'''
- errors = []
- check_settings.validate_literal_column_headings(
- errors, {'literal_column_headings': 'bad'})
+ check_settings.validate_literal_column_headings(None, errors, settings)
- assert errors
- assert isinstance(
- errors[0], upload_ex.BadLiteralColumnHeadingsError)
+ if error_class:
+ assert len(errors) == 1
+ assert isinstance(
+ errors[0], error_class)
+ else:
+ assert errors == []
literal_err = 'literal column headings error'
mock_validate_literal_column_headings = testing.make_mock_fixture(
check_settings, 'validate_literal_column_headings',
- wraps=lambda errors, *args: errors.append(literal_err))
+ wraps=lambda component, errors, *args: errors.append(literal_err))
# check_settings()
mock_require_settings.return_value = require_retval
mock_boolean_settings.return_value = boolean_retval
- result = check_settings.check_settings({})
+ result = check_settings.check_settings(None, [], [], [], {})
mock_unknown_settings.assert_called_once
mock_require_settings.assert_called_once
+++ /dev/null
-# Copyright (C) 2019, 2020 The Meme Factory, Inc. http://www.karlpinc.com/
-
-# This file is part of PGWUI_Upload.
-#
-# This program is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of
-# the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-#
-
-# Karl O. Pinc <kop@karlpinc.com>
-
-import pytest
-import pyramid.testing
-
-import pgwui_upload.pgwui_upload as pgwui_upload
-
-from pgwui_testing import testing
-
-# Activiate our pytest plugin
-pytest_plugins = ("pgwui",)
-
-
-# Module packaging test
-
-def test_pgwui_upload_is_pgwui_component(pgwui_component_entry_point):
- '''Ensure that pgwui_upload is a pgwui.component entry point
- '''
- assert pgwui_component_entry_point('pgwui_upload') is True
-
-
-# init_menu()
-
-@pytest.mark.unittest
-def test_init_menu_default():
- '''The settings get the module's default value when no settings exist
- '''
- with pyramid.testing.testConfig() as config:
-
- pgwui_upload.init_menu(config)
-
- new_settings = config.get_settings()
- assert new_settings['pgwui']['pgwui_upload']['menu_label'] \
- == pgwui_upload.DEFAULT_UPLOAD_MENU_LABEL
-
-
-@pytest.mark.unittest
-def test_init_menu_no_default():
- '''The settings keep their value when they exist
- '''
- test_menu_label = 'test label'
-
- with pyramid.testing.testConfig() as config:
- sample_settings = config.get_settings()
-
- sample_settings['pgwui'] = dict()
- sample_settings['pgwui']['pgwui_upload'] = dict()
- sample_settings['pgwui']['pgwui_upload']['menu_label'] \
- = test_menu_label
-
- pgwui_upload.init_menu(config)
-
- new_settings = config.get_settings()
- assert new_settings['pgwui']['pgwui_upload']['menu_label'] \
- == test_menu_label
-
-
-mock_init_menu = testing.make_mock_fixture(pgwui_upload, 'init_menu')
-
-
-# includeme()
-
-mock_add_route = testing.instance_method_mock_fixture('add_route')
-mock_scan = testing.instance_method_mock_fixture('scan')
-
-
-@pytest.mark.unittest
-def test_includeme(mock_init_menu, mock_add_route, mock_scan):
- '''init_menu, add_route, and scan are all called
- '''
- with pyramid.testing.testConfig() as config:
- mocked_add_route = mock_add_route(config)
- mocked_scan = mock_scan(config)
-
- pgwui_upload.includeme(config)
-
- mock_init_menu.assert_called_once()
- mocked_add_route.assert_called_once()
- mocked_scan.assert_called_once()
--- /dev/null
+# Copyright (C) 2019, 2020 The Meme Factory, Inc. http://www.karlpinc.com/
+
+# This file is part of Pgwui_Upload_Core.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+# Karl O. Pinc <kop@karlpinc.com>
+
+import pytest
+import pyramid.testing
+
+import pgwui_upload_core.pgwui_upload_core as pgwui_upload_core
+
+from pgwui_testing import testing
+
+# Activiate our pytest plugin
+pytest_plugins = ("pgwui",)
+
+
+# Module packaging test
+
+def test_pgwui_upload_core_is_pgwui_component(pgwui_component_entry_point):
+ '''Ensure that pgwui_upload_core is a pgwui.component entry point
+ '''
+ assert pgwui_component_entry_point('pgwui_upload_core') is True
+
+
+# includeme()
+
+mock_scan = testing.instance_method_mock_fixture('scan')
+
+
+@pytest.mark.unittest
+def test_includeme(mock_scan):
+ '''scan() is called
+ '''
+ with pyramid.testing.testConfig() as config:
+ mocked_scan = mock_scan(config)
+
+ pgwui_upload_core.includeme(config)
+
+ mocked_scan.assert_called_once()
# Copyright (C) 2018, 2019, 2020 The Meme Factory, Inc.
# http://www.karlpinc.com/
-# This file is part of PGWUI_Upload.
+# This file is part of PGWUI_Upload_Core.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
# Karl O. Pinc <kop@karlpinc.com>
-import logging
+import markupsafe
import pytest
from pyramid.testing import DummyRequest
-from pyramid.threadlocal import get_current_request, get_current_registry
from pgwui_common.__init__ import includeme as pgwui_common_includeme
from pgwui_core import constants
-from pgwui_upload.__init__ import includeme as pgwui_upload_includeme
-from pgwui_upload.views import upload
+from pgwui_upload_core.__init__ import includeme as pgwui_upload_core_includeme
+from pgwui_upload_core import exceptions as upload_ex
+from pgwui_upload_core.views import upload
+from pgwui_testing import testing
# Activiate our pytest plugin
pytest_plugins = ("pgwui",)
+# Mark all tests with "unittest"
+pytestmark = pytest.mark.unittest
# Constants
CHANGED_RESPONSE = {
'home_page': '/'}
+mock_escape = testing.make_mock_fixture(
+ markupsafe, 'escape')
+
+
# Helper classes
class MockUploadEngine():
monkeypatch.setattr(upload, 'UploadEngine', upload_engine)
monkeypatch.setattr(
- upload, 'TableUploadHandler', MockTableUploadHandler)
+ upload, 'BaseTableUploadHandler', MockTableUploadHandler)
settings = pyramid_request_config.get_settings()
settings['pgwui'] = settings.get('pgwui', dict())
settings['pgwui'].update({'home_page': HOME_PAGE_SETTINGS})
pgwui_common_includeme(pyramid_request_config)
- pgwui_upload_includeme(pyramid_request_config)
+ pgwui_upload_core_includeme(pyramid_request_config)
settings['pgwui'].update({'urls': DEFAULT_URLS})
pyramid_request_config.add_settings(settings)
# Tests
-# TableUploadHandler()
+# BaseTableUploadHandler()
+
+# BaseTableUploadHandler.__init__()
+
+mock_tuh_init = testing.instance_method_mock_fixture('__init__')
+
+
+# BaseTableUploadHandler.resolve_table()
+
+mock_resolve_table = testing.instance_method_mock_fixture('resolve_table')
+
+
+# BaseTableUploadHandler.good_table()
+
+mock_good_table = testing.instance_method_mock_fixture('good_table')
+
@pytest.fixture
def neuter_tableuploadhandler(monkeypatch):
- '''Make TableUploadHander have a mock parent and the given uploadform
+ '''Make TableUploadHander have the given uploadform
'''
def run(uploadform, request):
- monkeypatch.setattr(
- upload, 'TabularFileUploadHandler', MockTableUploadHandler)
-
- uh = upload.TableUploadHandler(request)
+ uh = upload.BaseTableUploadHandler(request)
monkeypatch.setattr(uh, 'uf', uploadform)
return uh
return run
-# TableUploadHandler.get_form_column_quoter()
+# BaseTableUploadHandler.quote_columns()
@pytest.fixture
def get_quote_columns(neuter_tableuploadhandler):
assert result is False
-# upload_view()
-
-@pytest.fixture
-def return_log_tuples(isolate_upload_view, caplog):
- '''Get result and the caplog.record_tuples from the upload_view() call'''
- caplog.set_level(logging.DEBUG)
+# BaseTableUploadHandler.validate_table()
- def run(response):
- isolate_upload_view(response)
- result = upload.upload_view(get_current_request())
- del result['pgwui'] # Remove variables added by pgwui view decorators
-
- return (result, caplog.record_tuples)
-
- return run
-
-
-def test_upload_view_db_not_changed(return_log_tuples):
- '''When the db did not change nothing logs'''
- response = UNCHANGED_RESPONSE
- (result, log_tuples) = return_log_tuples(response)
- assert result == response
- assert log_tuples == []
-
-
-def test_upload_view_db_changed_csv(return_log_tuples):
- '''When the db did change from CSV input something logs'''
- response = CHANGED_RESPONSE
- response['csv_checked'] = constants.CHECKED
- (result, log_tuples) = return_log_tuples(response)
-
- assert result == response
- assert ([tup[:2] for tup in log_tuples]
- == [('pgwui_upload.views.upload', logging.INFO)])
-
-
-def test_upload_view_db_changed_no_csv(return_log_tuples):
- '''When the db did change from not-CSV input something logs'''
- response = CHANGED_RESPONSE
- response['csv_checked'] = constants.UNCHECKED
- (result, log_tuples) = return_log_tuples(response)
+def test_validate_table_good(
+ mock_resolve_table, mock_good_table, mock_escape):
+ '''When the table is good, the results of resolve_table() are returned
+ '''
+ expected = ('schema', 'table')
- assert result == response
- assert ([tup[:2] for tup in log_tuples]
- == [('pgwui_upload.views.upload', logging.INFO)])
+ request = DummyRequest()
+ uh = upload.BaseTableUploadHandler(request)
+ mocked_resolve_table = mock_resolve_table(uh)
+ mocked_good_table = mock_good_table(uh)
+ mocked_resolve_table.return_value = expected
+ mocked_good_table.return_value = True
+ result = uh.validate_table(None)
-def test_upload_view_literal_cols_ask(isolate_upload_view):
- '''When literal_column_headings == ask the respose should reflect this'''
+ assert result == expected
- response = UNCHANGED_RESPONSE
- isolate_upload_view(response)
- settings = get_current_request().registry.settings
- settings['pgwui'].update({'literal_column_headings': 'ask'})
+def test_validate_table_bad(
+ mock_resolve_table, mock_good_table, mock_escape):
+ '''When the table is not good, the right exception is raised
+ '''
+ expected = ('schema', 'table')
- result = upload.upload_view(get_current_request())
+ request = DummyRequest()
+ uh = upload.BaseTableUploadHandler(request)
+ mocked_resolve_table = mock_resolve_table(uh)
+ mocked_good_table = mock_good_table(uh)
- assert result['ask_about_literal_cols']
+ mocked_resolve_table.return_value = expected
+ mocked_good_table.return_value = False
+ with pytest.raises(upload_ex.CannotInsertError):
+ uh.validate_table(None)
+ assert True
-def test_upload_view_literal_cols_noask(isolate_upload_view):
- '''When literal_column_headings != ask the respose should reflect this'''
- response = UNCHANGED_RESPONSE
- isolate_upload_view(response)
+# BaseTableUploadHandler.report_bad_cols()
- settings = get_current_registry().settings
- settings['pgwui'].update({'literal_column_headings': 'no'})
+@pytest.mark.parametrize(
+ ('quotecols',), [
+ (True,),
+ (False,)])
+def test_report_bad_cols(mock_escape, quotecols):
+ '''The expected exception is raised
+ '''
- result = upload.upload_view(get_current_request())
+ request = DummyRequest()
+ uh = upload.BaseTableUploadHandler(request)
+ with pytest.raises(upload_ex.BadHeadersError):
+ uh.report_bad_cols(None, ['col1', 'col2'], quotecols)
- assert not(result['ask_about_literal_cols'])
+ assert True
python setup.py sdist
twine check dist/*
flake8 .
- py.test -m unittest --cov=pgwui_upload tests/
- py.test -m 'not unittest' --cov=pgwui_upload tests/
- # coverage run --source src/pgwui_upload -m py.test
+ py.test -m unittest --cov=pgwui_upload_core tests/
+ py.test -m 'not unittest' --cov=pgwui_upload_core tests/
+ # coverage run --source src/pgwui_upload_core -m py.test
# coverage report
[flake8]