Template for generic upload page.
Karl O. Pinc <kop@meme.com>
+
+ This template uses the following variables in it's context:
+
+ ask_about_literal_cols
+
</%doc>
</tr>
</%def>
-${parent.upload_form([table_row, trim_row])}
+<% form_elements = [table_row, trim_row] %>
+
+% if ask_about_literal_cols:
+ <%def name="literal_row(tab_index)">
+ <tr>
+ <td class="label">
+ <label for="literal_col_headings_id">Take Column
+ Headings Literally:</label>
+ </td>
+ <td>
+ <input name="literal_col_headings"
+ tabindex="${tab_index}"
+ id="literal_col_headings_id"
+ type="checkbox"
+ ${literal_col_headings | n}
+ />
+ </td>
+ </tr>
+ </%def>
+
+ <% form_elements.append(literal_row) %>
+% endif
+
+${parent.upload_form(form_elements)}
self.cur.execute(sql, (table, schema))
return self.cur.fetchone() is not None
+ def quote_columns(self):
+ '''Return boolean -- whether to take column names literally
+ '''
+ settings = self.request.registry.settings
+ quoter_setting = settings.get('pgwui.literal_column_headings')
+ if quoter_setting == 'on':
+ return True
+ elif quoter_setting == 'ask':
+ return self.uf['literal_col_headings']
+ else:
+ return False
+
def factory(self, ue):
'''Make a db loader function from an UploadEngine.
data = ue.data
qualified_table = self.uf['table']
+ quotecols = self.quote_columns()
+ if quotecols:
+ column_quoter = doublequote
+ else:
+ def column_quoter(x):
+ return x
+
schema, table = self.resolve_table(qualified_table)
if not self.good_table(schema, table):
column_sql = ('SELECT 1 FROM information_schema.columns'
' WHERE columns.table_name = %s'
- ' AND columns.table_schema = %s'
- ' AND columns.column_name = %s')
+ ' AND columns.table_schema = %s')
+ if quotecols:
+ column_sql += ' AND columns.column_name = %s'
+ else:
+ column_sql += ' AND columns.column_name = lower(%s::name)'
+
insert_stmt = 'INSERT INTO {0} ('.format(doublequote(qualified_table))
value_string = ''
col_sep = ''
bad_cols.append(col_name)
else:
# Add column to sql statement
- insert_stmt += col_sep + doublequote(col_name)
+ insert_stmt += col_sep + column_quoter(col_name)
value_string += col_sep + '%s'
col_sep = ', '
if bad_cols:
- detail = (('<p>The following columns are not in the ({0})'
- ' table or you do not have permission to access'
- ' them:</p><ul>')
- .format(markupsafe.escape(qualified_table)))
+ if quotecols:
+ detail = ('<p>The following columns are not in the ({0})'
+ ' table, or the supplied column names do not match'
+ " the character case of the table's columns,"
+ ' or you do not have permission to access'
+ ' the columns:</p><ul>')
+ else:
+ detail = ('<p>The following columns are not in the ({0})'
+ ' table, or the table has column names containing'
+ ' upper case characters, or you do not have'
+ ' permission to access the columns:</p><ul>')
+ detail = detail.format(markupsafe.escape(qualified_table))
+
for bad_col in bad_cols:
detail += '<li>{0}</li>'.format(markupsafe.escape(bad_col))
detail += '</ul>'
def upload_view(request):
response = UploadEngine(TableUploadHandler(request)).run()
+
+ settings = request.registry.settings
+ quoter_setting = settings.get('pgwui.literal_column_headings')
+ response['ask_about_literal_cols'] = quoter_setting == 'ask'
+
if response['db_changed']:
if is_checked(response['csv_checked']):
upload_fmt = 'CSV'
import logging
import pytest
-from pyramid.threadlocal import get_current_request
+from pyramid.testing import DummyRequest
+from pyramid.threadlocal import get_current_request, get_current_registry
from pgwui_common.__init__ import includeme as pgwui_common_includeme
from pgwui_core import form_constants
from pgwui_upload.__init__ import includeme as pgwui_upload_includeme
'user': 'someuser',
}
+UNCHANGED_RESPONSE = {'db_changed': False}
+
+# A "upload form"
+UPLOAD_FORM_W_LIT_CHECKED = {'literal_col_headings': True}
+
# Helper classes
# Tests
+# TableUploadHandler()
+
+@pytest.fixture
+def neuter_tableuploadhandler(monkeypatch):
+ '''Make TableUploadHander have a mock parent and the given uploadform
+ '''
+ def run(uploadform, request):
+ monkeypatch.setattr(
+ upload, 'TabularFileUploadHandler', MockTableUploadHandler)
+
+ uh = upload.TableUploadHandler(request)
+ monkeypatch.setattr(uh, 'uf', uploadform)
+
+ return uh
+
+ return run
+
+
+# TableUploadHandler.get_form_column_quoter()
+
+@pytest.fixture
+def get_quote_columns(neuter_tableuploadhandler):
+ '''Return result of the upload handler's get_form_column_quoter()
+ '''
+ def run(uploadform, settings):
+ request = DummyRequest()
+ request.registry.settings = settings
+
+ uh = neuter_tableuploadhandler(uploadform, DummyRequest())
+ return uh.quote_columns()
+
+ return run
+
+
+def test_tableuploadhandler_quote_columns_on(get_quote_columns):
+ '''When the settings ask for literal_column_headings = on return
+ True
+ '''
+ result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
+ {'pgwui.literal_column_headings': 'on'})
+ assert result is True
+
+
+def test_tableuploadhandler_quote_columns_off(get_quote_columns):
+ '''When the settings ask for literal_column_headings = off return
+ False
+ '''
+ result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
+ {'pgwui.literal_column_headings': 'off'})
+ assert result is False
+
+
+def test_tableuploadhandler_quote_columns_default(get_quote_columns):
+ '''When the settings literal_column_headings is not present return
+ False (as default)
+ '''
+ result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED, {})
+ assert result is False
+
+
+def test_tableuploadhandler_quote_columns_ask_on(get_quote_columns):
+ '''When the form asks for literal column headings return True
+ '''
+ result = get_quote_columns(UPLOAD_FORM_W_LIT_CHECKED,
+ {'pgwui.literal_column_headings': 'ask'})
+ assert result is True
+
+
+def test_tableuploadhandler_quote_columns_ask_off(get_quote_columns):
+ '''When the form does not ask for literal column headings return False
+ '''
+ result = get_quote_columns({'literal_col_headings': False},
+ {'pgwui.literal_column_headings': 'ask'})
+ assert result is False
+
+
# upload_view()
@pytest.fixture
assert result == response
assert ([tup[:2] for tup in log_tuples]
== [('pgwui_upload.views.upload', logging.INFO)])
+
+
+def test_upload_view_literal_cols_ask(isolate_upload_view):
+ '''When literal_column_headings == ask the respose should reflect this'''
+
+ response = UNCHANGED_RESPONSE
+
+ settings = get_current_registry().settings
+ settings['pgwui.literal_column_headings'] = 'ask'
+
+ isolate_upload_view(response)
+ result = upload.upload_view(get_current_request())
+
+ assert result['ask_about_literal_cols']
+
+
+def test_upload_view_literal_cols_noask(isolate_upload_view):
+ '''When literal_column_headings != ask the respose should reflect this'''
+
+ response = UNCHANGED_RESPONSE
+
+ settings = get_current_registry().settings
+ settings['pgwui.literal_column_headings'] = 'no'
+
+ isolate_upload_view(response)
+ result = upload.upload_view(get_current_request())
+
+ assert not(result['ask_about_literal_cols'])