import markupsafe
import psycopg.errors
+from html import escape as html_escape
+
from pgwui_core.core import (
UploadNullFileInitialPost,
TabularFileUploadHandler,
self.write_double_key(response)
return response
- def _execute(self, stmt, tupl):
+ def execute(self, stmt, tupl):
'''Execute a statement and express encoding errors
'''
try:
if not resolvable.
'''
try:
- self._execute(
+ self.execute(
('SELECT nspname, relname'
' FROM pg_class'
' JOIN pg_namespace'
# tables.is_insertable_into does not reflect whether
# there's an insert trigger on the table.
" OR tables.table_type = 'VIEW')")
- self._execute(sql, (table, schema))
+ self.execute(sql, (table, schema))
return self.cur.fetchone() is not None
def quote_columns(self, settings):
def build_insert_stmt(
self, data, qualified_table, quotecols, column_quoter):
- schema, table = self.validate_table(qualified_table)
+ insert_stmt = InsertStmt()
+ insert_stmt.build_insert_stmt(
+ self, data, qualified_table, quotecols, column_quoter)
+ return insert_stmt
+
+ def factory(self, ue):
+ '''Make a db loader function from an UploadEngine.
+
+ Input:
+
+ Side Effects:
+ Assigns: self.ue, self.cur
+ And, lots of changes to the db
+ '''
+ self.ue = ue
+ self.cur = ue.cur
+
+
+@attrs.define
+class InsertStmt:
+ stmt = attrs.field(default=None)
+ cols = attrs.field(default=None)
+
+ def build_insert_stmt(
+ self, tuh, data, qualified_table, quotecols, column_quoter):
+ '''tuh: A TableUploadHandler'''
+ schema, table = tuh.validate_table(qualified_table)
column_sql = ('SELECT 1 FROM information_schema.columns'
' WHERE columns.table_name = %s'
else:
column_sql += ' AND columns.column_name = lower(%s::name)'
- insert_stmt = f'INSERT INTO {self.quotetable(schema, table)} ('
+ self.stmt = f'INSERT INTO {tuh.quotetable(schema, table)} ('
value_string = ''
col_sep = ''
bad_cols = []
for col_name in data.headers.tuples:
# Check that colum name exists
- self._execute(column_sql, (table, schema, col_name))
- if self.cur.fetchone() is None:
+ tuh.execute(column_sql, (table, schema, col_name))
+ if tuh.cur.fetchone() is None:
bad_cols.append(col_name)
else:
# Add column to sql statement
- insert_stmt += col_sep + column_quoter(col_name)
+ self.stmt += col_sep + column_quoter(col_name)
value_string += col_sep + '%s'
col_sep = ', '
if bad_cols:
self.report_bad_cols(qualified_table, bad_cols, quotecols)
- return insert_stmt + ') VALUES({0})'.format(value_string)
-
- def factory(self, ue):
- '''Make a db loader function from an UploadEngine.
-
- Input:
-
- Side Effects:
- Assigns: self.ue, self.cur
- And, lots of changes to the db
- '''
- self.ue = ue
- self.cur = ue.cur
+ self.stmt += ') VALUES({0})'.format(value_string)
+ self.cols = len(data.headers.tuples)
def set_upload_response(component, request, response):
response.setdefault('pgwui', dict())
response['pgwui']['upload_settings'] = upload_settings
+
+
+def match_insert_to_dataline(udl, insert_stmt, source_file=None):
+ '''Make sure the UploadDataLine has the number of values expected
+ by the insert statement
+ '''
+ data_cols = len(udl.tuples)
+ if data_cols != insert_stmt.cols:
+ descr = ('The number of data columns does not match the'
+ " number of headings given in the file's first line")
+
+ source_detail = ''
+ if source_file is not None:
+ source_detail = f'File: ({html_escape(source_file)}); '
+ detail = (f'{source_detail}'
+ f'Expected number of columns: {data_cols}')
+
+ if data_cols > insert_stmt.cols:
+ raise core_ex.TooManyColsError(
+ udl.lineno, 'Too many data columns', descr=descr,
+ detail=detail, data=udl.raw)
+ else:
+ raise core_ex.TooFewColsError(
+ udl.lineno, 'Too few data columns', descr=descr,
+ detail=detail, data=udl.raw)