# There are main objects, and their subclasses, here:
-# LoadedForm
# DBHandler (generally referred to a an "upload handler", at present)
# DBConnector (UploadEngine)
#
# See their documentation below.
from csv import reader as csv_reader
-import collections.abc
import attrs
import markupsafe
import hashlib
import io
-from . import exceptions as core_ex
-
-# We are not really using wtforms. We use it to (barely)
-# interact with the html and post request but really
-# we define our own classes to handle working memory
-# and interacting with the session.
-from wtforms import (
- Form,
- BooleanField,
- StringField,
- RadioField,
- PasswordField,
- FileField)
-
import psycopg
import psycopg.errors
+from . import exceptions as core_ex
+
from pgwui_core.constants import (
- CHECKED,
- UNCHECKED,
CSV,
- TAB,
- CSV_VALUE,
- TAB_VALUE,
)
-# Setup default values for forms.
-
-@attrs.define(slots=False)
-class UserInitialPost():
- db = attrs.field(default='')
- user = attrs.field(default='')
- password = attrs.field(default='')
-
- def build(self, settings={}):
- self.db = settings['pgwui'].get('default_db', '')
- return self
-
-
-@attrs.define(slots=False)
-class UploadFileInitialPost(UserInitialPost):
- upload_fmt = attrs.field(default=CSV)
- trim_upload = attrs.field(default=True)
- literal_col_headings = attrs.field(default=False)
- datafile = attrs.field(default='')
-
-
-@attrs.define(slots=False)
-class UploadNullFileInitialPost(UploadFileInitialPost):
- upload_null = attrs.field(default=True)
- null_rep = attrs.field(default='')
-
-
-@attrs.define(slots=False)
-class UploadTableInitialPostMixin():
- table = attrs.field(default='')
-
-
-@attrs.define(slots=False)
-class UploadTableInitialPost(UploadNullFileInitialPost,
- UploadTableInitialPostMixin):
- pass
-
-
-# The wtforms that suck data out of the html.
-
-class UserWTForm(Form):
- '''The wtform used to connect to the db to authenticate .'''
- # We don't actually use the labels, wanting the template to
- # look (and render) like html, but I'll define them anyway
- # just to keep my hand in.
- user = StringField('User:')
- password = PasswordField('Password:')
-
-
-class AuthWTForm(UserWTForm):
- '''The wtform used to connect to any db and authenticate.'''
- # We don't actually use the labels, wanting the template to
- # look (and render) like html, but I'll define them anyway
- # just to keep my hand in.
- db = StringField('Database:')
-
-
-class UploadFileWTForm(AuthWTForm):
- '''The wtform used for uploading files.'''
- # We don't actually use the labels, wanting the template to
- # look (and render) like html, but I'll define them anyway
- # just to keep my hand in.
- upload_fmt = RadioField('Upload Format:',
- choices=[('Upload CSV Data:', CSV),
- ('Upload tab delimited Data:', TAB)])
- datafile = FileField('File with CSV or Tab delimited Data:')
- trim_upload = BooleanField('Trim Leading/Trailing Spaces:')
- literal_col_headings = BooleanField('Literal Uploaded Column Headings:')
-
-
-class UploadNullFileWTForm(UploadFileWTForm):
- '''The wtform used for uploading files that may contain NULL.'''
- # We don't actually use the labels, wanting the template to
- # look (and render) like html, but I'll define them anyway
- # just to keep my hand in.
- upload_null = BooleanField('Upload NULL Values:')
- null_rep = StringField('NULL Representation:')
-
-
-class UploadTableWTForm(UploadNullFileWTForm):
- '''The wtform used for uploading arbitrary data into tables.'''
- table = StringField('Table or View:')
-
-
-@attrs.define(slots=False)
-class LoadedForm(collections.abc.MutableMapping):
- '''
- Abstract class representing an upload form.
-
- Responsible for getting information into and out of
- html forms.
-
- The user API is that it acts like a dict, but with extra methods.
-
- Attributes:
- uh The UploadHandler instance using the form
-
- _store Where the real dict is kept
- _form Instantaiated html form object (WTForms)
- _fc Class handling html form
- '''
- fc_default = attrs.field(default=None)
- ip_default = attrs.field(default=None)
- uh = attrs.field(default=None)
- _store = attrs.field(factory=dict)
- _fc = attrs.field(default=None)
- _form = attrs.field(default=None)
- ivals = attrs.field(default=None)
-
- def build(self, uh, fc=None, ip=None, data={}, **kwargs):
- '''Form initialization
- ip is the instantiated initial post
- '''
- self.uh = uh
- if data == {}:
- store = dict(kwargs)
- else:
- store = dict(data)
- store.update(kwargs)
- self._store = store
- self._fc = (self.fc_default if fc is None else fc)
- ip_used = (self.ip_default if ip is None else ip)
- self.ivals = ip_used.build(self.uh.request.registry.settings)
- return self
-
- def __iter__(self):
- for item in self._store:
- yield item
-
- def __len__(self):
- return len(self._store)
-
- def __getitem__(self, key):
- return self._store[key]
-
- def __setitem__(self, key, value):
- self._store[key] = value
-
- def __delitem__(self, key):
- del self._store[key]
-
- def booleanize_post(self, post, key):
- '''The key, if present, is a boolean value. But post data
- is all strings. Convert the post data to a Python boolean.
- '''
- if key in post:
- if post[key] == 'False':
- post[key] = False
- else:
- post[key] = True
-
- def read_post_and_session(self, post, session, key):
- '''Read an attribute into self, from either POST or the session,
- and synchronize the session with the POST value when there is a POST
- value.
-
- post POST
- session The session
- key The attribute to read
-
- Returns: Boolean. True when a value is set; the key is in
- either POST or the session.
- '''
- if key in post:
- self[key] = post[key]
- self.session_put(key, self[key])
- elif key in session:
- self[key] = session[key]
- else:
- return False
- return True
-
- def read(self):
- '''
- In the children this loads form from pyramid self.uh.request
- object and self._form and the session.
-
- In this case we instantiate _form and give it some defaults
- '''
- post = self.uh.request.POST
- if post:
- self._form = self._fc(formdata=post)
- else:
- self._form = self._fc(obj=self.ivals)
-
- def write(self, response, errors):
- '''
- Produces the dict pyramid will use to render the form.
-
- Input:
- response Dict of results from connection execution
- errors List of errors from connection execution
- '''
- response['errors'] = errors
- return response
-
-
-@attrs.define(slots=False)
-class CredsLoadedForm(LoadedForm):
- '''
- Acts like a dict, but with extra methods.
- Manages credentials (but not db) needed to authenticate.
-
- Attributes:
- uh The UploadHandler instance using the form
- user The username used to login
- _form Instantaiated html form object (WXForms)
-
- Methods:
- read() Load form from pyramid request object.
- '''
- fc_default = attrs.field(default=UserWTForm)
- ip_default = attrs.field(factory=UserInitialPost)
- user = attrs.field(default=None)
- password = attrs.field(default=None)
- action = attrs.field(default=None)
-
- def session_put(self, key, value):
- '''
- Put data into the session.
-
- Input:
- key The key
- value The value
-
- Returns:
-
- Side effects:
- Modifies session
-
- May be overridden by a subclass to keep data out
- of the session.
- '''
- self.uh.session[key] = value
-
- def session_del(self, key):
- '''
- Deletes data from the session.
-
- Input:
- key The key to delete
-
- Returns:
-
- Side effects:
- Modifies session
- '''
- self.uh.session.pop(key, None)
-
- def read(self):
- '''
- Read form data from the client
- '''
-
- # Read parent's data
- super().read()
-
- # Read our form data
-
- # Keep password and user (and db, in AuthLoadedForm, below) in
- # the session. All the other form variables must be re-posted.
- post = self.uh.request.POST
- session = self.uh.request.session
-
- # Defaults are now in place in self._form for password
- # and user. Ignore these since we want to know whether
- # to go to the session for data values.
- self.read_post_and_session(post, session, 'password')
-
- if not self.read_post_and_session(post, session, 'user'):
- self['user'] = ''
-
- # Other, hidden, POST variables
- if 'action' in post:
- self['action'] = post['action']
- else:
- self['action'] = ''
-
- def write(self, result, errors):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- response = super().write(result, errors)
- havecreds = self.uh.session.get('havecreds', False)
- response.update({'havecreds': havecreds})
- if havecreds:
- response['user'] = self['user']
- else:
- # We don't know if the credentials are good or
- # we know they are bad. Keep them out of the session.
- response['user'] = ''
- response['password'] = ''
- self.session_put('user', '')
- self.session_put('password', '')
- return response
-
-
-@attrs.define(slots=False)
-class AuthLoadedForm(CredsLoadedForm):
- '''
- Acts like a dict, but with extra methods.
- Manages form data needed to authenticate, including db to authenticate
- in.
-
- Attributes:
- uh The UploadHandler instance using the form
- user The Username used to login
- db The db to login to
- db_changed
- Boolean. Whether the prior request changed some db's content.
- "Prior request" means the last time a logged-in session
- was submitted; requests resulting in expired sessions are
- ignored.
- _form Instantiated html form object (WXForms)
-
- '''
- db = attrs.field(default=None)
- db_changed = attrs.field(default=False)
-
- def read(self):
- '''
- Read form data from the client
- '''
-
- # Read parent's data
- super().read()
- post = self.uh.request.POST
- session = self.uh.request.session
-
- # Keep form variables handy
- # The db is kept in the session partly for the user's convenience
- # when switching between menu items, but mostly so that double-upload
- # of the same file can be detected when the user reloads the form
- # by pressing "enter" in the URL bar. Because otherwise the
- # generated last_key does not have the right db value.
- if not self.read_post_and_session(post, session, 'db'):
- self['db'] = ''
-
- def write(self, result, errors):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- response = super().write(result, errors)
- response['db'] = self['db']
- return response
-
-
-@attrs.define(slots=False)
-class UploadFileForm(AuthLoadedForm):
- '''
- Acts like a dict, but with extra methods.
-
- Attributes:
- uh The UploadHandler instance using the form
-
- Methods:
- read() Load form from pyramid request object.
- '''
- fc_default = attrs.field(default=UploadFileWTForm)
- ip_default = attrs.field(factory=UploadFileInitialPost)
- upload_fmt = attrs.field(default=None)
- trim_upload = attrs.field(default=None)
- literal_col_headings = attrs.field(default=None)
- filename = attrs.field(default=None)
- localfh = attrs.field(default=None)
-
- def read(self):
- '''
- Read form data from the client
- '''
-
- # Read parent's data
- super().read()
-
- # Read our own data
- self['upload_fmt'] = self._form.upload_fmt.data
- self['trim_upload'] = self._form.trim_upload.data
- self['literal_col_headings'] = self._form.literal_col_headings.data
-
- # Other POST variables involving a file
- post = self.uh.request.POST
- session = self.uh.request.session
- self.booleanize_post(post, 'db_changed')
- if not self.read_post_and_session(post, session, 'db_changed'):
- self['db_changed'] = False
- self['filename'] = ''
- self['localfh'] = ''
- if self['action']:
- if self._form.datafile.data != '':
- if hasattr(post['datafile'], 'filename'):
- self['filename'] = post['datafile'].filename
- if hasattr(post['datafile'], 'file'):
- self['localfh'] = post['datafile'].file
-
- def write(self, result, errors):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- if self['upload_fmt'] == CSV:
- csv_checked = CHECKED
- tab_checked = UNCHECKED
- else:
- tab_checked = CHECKED
- csv_checked = UNCHECKED
-
- if self['trim_upload']:
- trim_upload_checked = CHECKED
- else:
- trim_upload_checked = UNCHECKED
-
- if self['literal_col_headings']:
- literal_col_headings_checked = CHECKED
- else:
- literal_col_headings_checked = UNCHECKED
-
- response = super().write(result, errors)
- # Although we read-in db_changed, we do not write it because
- # it, like last_key, is computed.
- response['filename'] = self['filename']
- response['trim_upload'] = trim_upload_checked
- response['csv_value'] = CSV_VALUE
- response['tab_value'] = TAB_VALUE
- response['csv_checked'] = csv_checked
- response['tab_checked'] = tab_checked
- response['literal_col_headings'] = literal_col_headings_checked
- return response
-
-
-class UploadFormBaseMixin():
- '''
- Mixins add to attributes to self, and to response.
- '''
- def write_response(self, response):
- return response
-
-
-@attrs.define(slots=False)
-class UploadDoubleFileFormMixin(UploadFormBaseMixin):
- '''
- Adds a last_key attribute to self, from POST
-
- Acts like a dict, but with extra methods.
-
- Attributes:
- uh The UploadHandler instance using the form
-
- Methods:
- read() Load form from pyramid request object.
- '''
- # Keep the last_key in both the form and the session; in the
- # session because that way double-upload detection works when the
- # user presses "enter" in the URL bar.
- last_key = attrs.field(default=None)
-
- def read(self):
- '''
- Read form data from the client
- '''
- super().read()
- post = self.uh.request.POST
- session = self.uh.request.session
-
- if not self.read_post_and_session(post, session, 'last_key'):
- self['last_key'] = ''
-
- def write_response(self, response):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- if self.uh.double_upload:
- # Erase the last key from all state
- response.pop('last_key', None)
- self.session_del('last_key')
- else:
- response['last_key'] = self['last_key']
- return super().write_response(response)
-
-
-@attrs.define(slots=False)
-class UploadDoubleFileForm(UploadDoubleFileFormMixin, UploadFileForm):
- '''
- Acts like a dict, but with extra methods.
-
- Attributes:
- uh The UploadHandler instance using the form
-
- Methods:
- read() Load form from pyramid request object.
- '''
- def read(self):
- '''
- Read form data from the client
- '''
- # Read all parents' data
- super().read()
-
- def write(self, result, errors):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- response = super().write(result, errors)
- return super().write_response(response)
-
-
-@attrs.define(slots=False)
-class UploadNullMixin(UploadFormBaseMixin):
- '''
- Acts like a dict, but with extra methods.
-
- Attributes:
- uh The UploadHandler instance using the form
-
- Methods:
- read() Load form from pyramid request object.
- '''
- def read(self):
- '''
- Read form data from the client
- '''
- super().read()
- self['upload_null'] = self._form.upload_null.data
- self['null_rep'] = self._form.null_rep.data
-
- def write_response(self, response):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- if self['upload_null']:
- upload_null_checked = CHECKED
- else:
- upload_null_checked = UNCHECKED
-
- response['upload_null'] = upload_null_checked
- response['null_rep'] = self['null_rep']
- return super().write_response(response)
-
-
-@attrs.define(slots=False)
-class UploadTableForm(UploadNullMixin, UploadFileForm):
- '''
- Acts like a dict, but with extra methods.
-
- Attributes:
- uh The UploadHandler instance using the form
-
- Methods:
- read() Load form from pyramid request object.
- '''
- fc_default = attrs.field(default=UploadTableWTForm)
- ip_default = attrs.field(factory=UploadTableInitialPost)
-
- def read(self):
- '''
- Read form data from the client
- '''
-
- # Read all parents' data
- super().read()
- # Read our own data
- self['table'] = self._form.table.data
-
- def write(self, result, errors):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- response = super().write(result, errors)
- response['table'] = self['table']
- return super().write_response(response)
-
-
-@attrs.define(slots=False)
-class UploadDoubleTableForm(UploadDoubleFileFormMixin, UploadTableForm):
- '''
- Acts like a dict, but with extra methods.
-
- Attributes:
- uh The UploadHandler instance using the form
-
- Methods:
- read() Load form from pyramid request object.
- '''
- def read(self):
- '''
- Read form data from the client
- '''
- # Read all parents' data
- super().read()
-
- def write(self, result, errors):
- '''
- Produces the dict pyramid will use to render the form.
- '''
- response = super().write(result, errors)
- return super().write_response(response)
-
-
# Upload processing
@attrs.define(slots=False)
--- /dev/null
+# Copyright (C) 2013, 2014, 2015, 2018, 2020, 2021, 2024 The Meme Factory, Inc.
+# http://www.karlpinc.com/
+
+# This file is part of PGWUI_Core.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+# Karl O. Pinc <kop@karlpinc.com>
+
+'''Form processing
+
+Pretty much everything in here is probably done wrong.
+'''
+
+import collections.abc
+import attrs
+
+# We are not really using wtforms. We use it to (barely)
+# interact with the html and post request but really
+# we define our own classes to handle working memory
+# and interacting with the session.
+from wtforms import (
+ Form,
+ BooleanField,
+ StringField,
+ RadioField,
+ PasswordField,
+ FileField)
+
+from pgwui_core.constants import (
+ CHECKED,
+ UNCHECKED,
+ CSV,
+ TAB,
+ CSV_VALUE,
+ TAB_VALUE,
+)
+
+
+# Setup default values for forms.
+
+@attrs.define(slots=False)
+class UserInitialPost():
+ db = attrs.field(default='')
+ user = attrs.field(default='')
+ password = attrs.field(default='')
+
+ def build(self, settings={}):
+ self.db = settings['pgwui'].get('default_db', '')
+ return self
+
+
+@attrs.define(slots=False)
+class UploadFileInitialPost(UserInitialPost):
+ upload_fmt = attrs.field(default=CSV)
+ trim_upload = attrs.field(default=True)
+ literal_col_headings = attrs.field(default=False)
+ datafile = attrs.field(default='')
+
+
+@attrs.define(slots=False)
+class UploadNullFileInitialPost(UploadFileInitialPost):
+ upload_null = attrs.field(default=True)
+ null_rep = attrs.field(default='')
+
+
+@attrs.define(slots=False)
+class UploadTableInitialPostMixin():
+ table = attrs.field(default='')
+
+
+@attrs.define(slots=False)
+class UploadTableInitialPost(UploadNullFileInitialPost,
+ UploadTableInitialPostMixin):
+ pass
+
+
+# The wtforms that suck data out of the html.
+
+class UserWTForm(Form):
+ '''The wtform used to connect to the db to authenticate .'''
+ # We don't actually use the labels, wanting the template to
+ # look (and render) like html, but I'll define them anyway
+ # just to keep my hand in.
+ user = StringField('User:')
+ password = PasswordField('Password:')
+
+
+class AuthWTForm(UserWTForm):
+ '''The wtform used to connect to any db and authenticate.'''
+ # We don't actually use the labels, wanting the template to
+ # look (and render) like html, but I'll define them anyway
+ # just to keep my hand in.
+ db = StringField('Database:')
+
+
+class UploadFileWTForm(AuthWTForm):
+ '''The wtform used for uploading files.'''
+ # We don't actually use the labels, wanting the template to
+ # look (and render) like html, but I'll define them anyway
+ # just to keep my hand in.
+ upload_fmt = RadioField('Upload Format:',
+ choices=[('Upload CSV Data:', CSV),
+ ('Upload tab delimited Data:', TAB)])
+ datafile = FileField('File with CSV or Tab delimited Data:')
+ trim_upload = BooleanField('Trim Leading/Trailing Spaces:')
+ literal_col_headings = BooleanField('Literal Uploaded Column Headings:')
+
+
+class UploadNullFileWTForm(UploadFileWTForm):
+ '''The wtform used for uploading files that may contain NULL.'''
+ # We don't actually use the labels, wanting the template to
+ # look (and render) like html, but I'll define them anyway
+ # just to keep my hand in.
+ upload_null = BooleanField('Upload NULL Values:')
+ null_rep = StringField('NULL Representation:')
+
+
+class UploadTableWTForm(UploadNullFileWTForm):
+ '''The wtform used for uploading arbitrary data into tables.'''
+ table = StringField('Table or View:')
+
+
+@attrs.define(slots=False)
+class LoadedForm(collections.abc.MutableMapping):
+ '''
+ Abstract class representing an upload form.
+
+ Responsible for getting information into and out of
+ html forms.
+
+ The user API is that it acts like a dict, but with extra methods.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+
+ _store Where the real dict is kept
+ _form Instantaiated html form object (WTForms)
+ _fc Class handling html form
+ '''
+ fc_default = attrs.field(default=None)
+ ip_default = attrs.field(default=None)
+ uh = attrs.field(default=None)
+ _store = attrs.field(factory=dict)
+ _fc = attrs.field(default=None)
+ _form = attrs.field(default=None)
+ ivals = attrs.field(default=None)
+
+ def build(self, uh, fc=None, ip=None, data={}, **kwargs):
+ '''Form initialization
+ ip is the instantiated initial post
+ '''
+ self.uh = uh
+ if data == {}:
+ store = dict(kwargs)
+ else:
+ store = dict(data)
+ store.update(kwargs)
+ self._store = store
+ self._fc = (self.fc_default if fc is None else fc)
+ ip_used = (self.ip_default if ip is None else ip)
+ self.ivals = ip_used.build(self.uh.request.registry.settings)
+ return self
+
+ def __iter__(self):
+ for item in self._store:
+ yield item
+
+ def __len__(self):
+ return len(self._store)
+
+ def __getitem__(self, key):
+ return self._store[key]
+
+ def __setitem__(self, key, value):
+ self._store[key] = value
+
+ def __delitem__(self, key):
+ del self._store[key]
+
+ def booleanize_post(self, post, key):
+ '''The key, if present, is a boolean value. But post data
+ is all strings. Convert the post data to a Python boolean.
+ '''
+ if key in post:
+ if post[key] == 'False':
+ post[key] = False
+ else:
+ post[key] = True
+
+ def read_post_and_session(self, post, session, key):
+ '''Read an attribute into self, from either POST or the session,
+ and synchronize the session with the POST value when there is a POST
+ value.
+
+ post POST
+ session The session
+ key The attribute to read
+
+ Returns: Boolean. True when a value is set; the key is in
+ either POST or the session.
+ '''
+ if key in post:
+ self[key] = post[key]
+ self.session_put(key, self[key])
+ elif key in session:
+ self[key] = session[key]
+ else:
+ return False
+ return True
+
+ def read(self):
+ '''
+ In the children this loads form from pyramid self.uh.request
+ object and self._form and the session.
+
+ In this case we instantiate _form and give it some defaults
+ '''
+ post = self.uh.request.POST
+ if post:
+ self._form = self._fc(formdata=post)
+ else:
+ self._form = self._fc(obj=self.ivals)
+
+ def write(self, response, errors):
+ '''
+ Produces the dict pyramid will use to render the form.
+
+ Input:
+ response Dict of results from connection execution
+ errors List of errors from connection execution
+ '''
+ response['errors'] = errors
+ return response
+
+
+@attrs.define(slots=False)
+class CredsLoadedForm(LoadedForm):
+ '''
+ Acts like a dict, but with extra methods.
+ Manages credentials (but not db) needed to authenticate.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+ user The username used to login
+ _form Instantaiated html form object (WXForms)
+
+ Methods:
+ read() Load form from pyramid request object.
+ '''
+ fc_default = attrs.field(default=UserWTForm)
+ ip_default = attrs.field(factory=UserInitialPost)
+ user = attrs.field(default=None)
+ password = attrs.field(default=None)
+ action = attrs.field(default=None)
+
+ def session_put(self, key, value):
+ '''
+ Put data into the session.
+
+ Input:
+ key The key
+ value The value
+
+ Returns:
+
+ Side effects:
+ Modifies session
+
+ May be overridden by a subclass to keep data out
+ of the session.
+ '''
+ self.uh.session[key] = value
+
+ def session_del(self, key):
+ '''
+ Deletes data from the session.
+
+ Input:
+ key The key to delete
+
+ Returns:
+
+ Side effects:
+ Modifies session
+ '''
+ self.uh.session.pop(key, None)
+
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+
+ # Read parent's data
+ super().read()
+
+ # Read our form data
+
+ # Keep password and user (and db, in AuthLoadedForm, below) in
+ # the session. All the other form variables must be re-posted.
+ post = self.uh.request.POST
+ session = self.uh.request.session
+
+ # Defaults are now in place in self._form for password
+ # and user. Ignore these since we want to know whether
+ # to go to the session for data values.
+ self.read_post_and_session(post, session, 'password')
+
+ if not self.read_post_and_session(post, session, 'user'):
+ self['user'] = ''
+
+ # Other, hidden, POST variables
+ if 'action' in post:
+ self['action'] = post['action']
+ else:
+ self['action'] = ''
+
+ def write(self, result, errors):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ response = super().write(result, errors)
+ havecreds = self.uh.session.get('havecreds', False)
+ response.update({'havecreds': havecreds})
+ if havecreds:
+ response['user'] = self['user']
+ else:
+ # We don't know if the credentials are good or
+ # we know they are bad. Keep them out of the session.
+ response['user'] = ''
+ response['password'] = ''
+ self.session_put('user', '')
+ self.session_put('password', '')
+ return response
+
+
+@attrs.define(slots=False)
+class AuthLoadedForm(CredsLoadedForm):
+ '''
+ Acts like a dict, but with extra methods.
+ Manages form data needed to authenticate, including db to authenticate
+ in.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+ user The Username used to login
+ db The db to login to
+ db_changed
+ Boolean. Whether the prior request changed some db's content.
+ "Prior request" means the last time a logged-in session
+ was submitted; requests resulting in expired sessions are
+ ignored.
+ _form Instantiated html form object (WXForms)
+
+ '''
+ db = attrs.field(default=None)
+ db_changed = attrs.field(default=False)
+
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+
+ # Read parent's data
+ super().read()
+ post = self.uh.request.POST
+ session = self.uh.request.session
+
+ # Keep form variables handy
+ # The db is kept in the session partly for the user's convenience
+ # when switching between menu items, but mostly so that double-upload
+ # of the same file can be detected when the user reloads the form
+ # by pressing "enter" in the URL bar. Because otherwise the
+ # generated last_key does not have the right db value.
+ if not self.read_post_and_session(post, session, 'db'):
+ self['db'] = ''
+
+ def write(self, result, errors):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ response = super().write(result, errors)
+ response['db'] = self['db']
+ return response
+
+
+@attrs.define(slots=False)
+class UploadFileForm(AuthLoadedForm):
+ '''
+ Acts like a dict, but with extra methods.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+
+ Methods:
+ read() Load form from pyramid request object.
+ '''
+ fc_default = attrs.field(default=UploadFileWTForm)
+ ip_default = attrs.field(factory=UploadFileInitialPost)
+ upload_fmt = attrs.field(default=None)
+ trim_upload = attrs.field(default=None)
+ literal_col_headings = attrs.field(default=None)
+ filename = attrs.field(default=None)
+ localfh = attrs.field(default=None)
+
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+
+ # Read parent's data
+ super().read()
+
+ # Read our own data
+ self['upload_fmt'] = self._form.upload_fmt.data
+ self['trim_upload'] = self._form.trim_upload.data
+ self['literal_col_headings'] = self._form.literal_col_headings.data
+
+ # Other POST variables involving a file
+ post = self.uh.request.POST
+ session = self.uh.request.session
+ self.booleanize_post(post, 'db_changed')
+ if not self.read_post_and_session(post, session, 'db_changed'):
+ self['db_changed'] = False
+ self['filename'] = ''
+ self['localfh'] = ''
+ if self['action']:
+ if self._form.datafile.data != '':
+ if hasattr(post['datafile'], 'filename'):
+ self['filename'] = post['datafile'].filename
+ if hasattr(post['datafile'], 'file'):
+ self['localfh'] = post['datafile'].file
+
+ def write(self, result, errors):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ if self['upload_fmt'] == CSV:
+ csv_checked = CHECKED
+ tab_checked = UNCHECKED
+ else:
+ tab_checked = CHECKED
+ csv_checked = UNCHECKED
+
+ if self['trim_upload']:
+ trim_upload_checked = CHECKED
+ else:
+ trim_upload_checked = UNCHECKED
+
+ if self['literal_col_headings']:
+ literal_col_headings_checked = CHECKED
+ else:
+ literal_col_headings_checked = UNCHECKED
+
+ response = super().write(result, errors)
+ # Although we read-in db_changed, we do not write it because
+ # it, like last_key, is computed.
+ response['filename'] = self['filename']
+ response['trim_upload'] = trim_upload_checked
+ response['csv_value'] = CSV_VALUE
+ response['tab_value'] = TAB_VALUE
+ response['csv_checked'] = csv_checked
+ response['tab_checked'] = tab_checked
+ response['literal_col_headings'] = literal_col_headings_checked
+ return response
+
+
+class UploadFormBaseMixin():
+ '''
+ Mixins add to attributes to self, and to response.
+ '''
+ def write_response(self, response):
+ return response
+
+
+@attrs.define(slots=False)
+class UploadDoubleFileFormMixin(UploadFormBaseMixin):
+ '''
+ Adds a last_key attribute to self, from POST
+
+ Acts like a dict, but with extra methods.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+
+ Methods:
+ read() Load form from pyramid request object.
+ '''
+ # Keep the last_key in both the form and the session; in the
+ # session because that way double-upload detection works when the
+ # user presses "enter" in the URL bar.
+ last_key = attrs.field(default=None)
+
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+ super().read()
+ post = self.uh.request.POST
+ session = self.uh.request.session
+
+ if not self.read_post_and_session(post, session, 'last_key'):
+ self['last_key'] = ''
+
+ def write_response(self, response):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ if self.uh.double_upload:
+ # Erase the last key from all state
+ response.pop('last_key', None)
+ self.session_del('last_key')
+ else:
+ response['last_key'] = self['last_key']
+ return super().write_response(response)
+
+
+@attrs.define(slots=False)
+class UploadDoubleFileForm(UploadDoubleFileFormMixin, UploadFileForm):
+ '''
+ Acts like a dict, but with extra methods.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+
+ Methods:
+ read() Load form from pyramid request object.
+ '''
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+ # Read all parents' data
+ super().read()
+
+ def write(self, result, errors):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ response = super().write(result, errors)
+ return super().write_response(response)
+
+
+@attrs.define(slots=False)
+class UploadNullMixin(UploadFormBaseMixin):
+ '''
+ Acts like a dict, but with extra methods.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+
+ Methods:
+ read() Load form from pyramid request object.
+ '''
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+ super().read()
+ self['upload_null'] = self._form.upload_null.data
+ self['null_rep'] = self._form.null_rep.data
+
+ def write_response(self, response):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ if self['upload_null']:
+ upload_null_checked = CHECKED
+ else:
+ upload_null_checked = UNCHECKED
+
+ response['upload_null'] = upload_null_checked
+ response['null_rep'] = self['null_rep']
+ return super().write_response(response)
+
+
+@attrs.define(slots=False)
+class UploadTableForm(UploadNullMixin, UploadFileForm):
+ '''
+ Acts like a dict, but with extra methods.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+
+ Methods:
+ read() Load form from pyramid request object.
+ '''
+ fc_default = attrs.field(default=UploadTableWTForm)
+ ip_default = attrs.field(factory=UploadTableInitialPost)
+
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+
+ # Read all parents' data
+ super().read()
+ # Read our own data
+ self['table'] = self._form.table.data
+
+ def write(self, result, errors):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ response = super().write(result, errors)
+ response['table'] = self['table']
+ return super().write_response(response)
+
+
+@attrs.define(slots=False)
+class UploadDoubleTableForm(UploadDoubleFileFormMixin, UploadTableForm):
+ '''
+ Acts like a dict, but with extra methods.
+
+ Attributes:
+ uh The UploadHandler instance using the form
+
+ Methods:
+ read() Load form from pyramid request object.
+ '''
+ def read(self):
+ '''
+ Read form data from the client
+ '''
+ # Read all parents' data
+ super().read()
+
+ def write(self, result, errors):
+ '''
+ Produces the dict pyramid will use to render the form.
+ '''
+ response = super().write(result, errors)
+ return super().write_response(response)