--- /dev/null
+# Copyright (C) 2024 The Meme Factory, Inc. http://www.karlpinc.com/
+
+# This file is part of PGWUI_SQL.
+#
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public License
+# as published by the Free Software Foundation, either version 3 of
+# the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+import attrs
+import pgwui_core.core
+import pgwui_core.forms
+import pgwui_sql.views.base
+import pgwui_sql.exceptions as sql_ex
+
+
+@attrs.define(slots=False)
+class SQLEditForm(pgwui_sql.views.base.SQLForm):
+ '''Always set the "action" so that the SQL gets executed
+ '''
+ def read(self):
+ super().read()
+ self['action'] = 'u'
+
+
+# Utility functions
+def _fetcher(cur):
+ '''Get the result of `SHOW search_path;` from a psycopg3 cursor.
+ '''
+ result = cur.fetchone()
+ final = cur.fetchone()
+ if final is not None:
+ raise sql_ex.ExecutionError(
+ 'Problem obtaining the search_path', '',
+ f'Extra result row returned ({final})')
+ return result[0]
+
+
+@attrs.define(slots=False)
+class SQLEditHandler(pgwui_core.core.SessionDBHandler):
+ '''
+ Execute pre-defined SQL statement(s)
+ '''
+ def make_form(self):
+ return SQLEditForm().build(
+ self, ip=pgwui_sql.views.base.SQLInitialPost(),
+ fc=pgwui_sql.views.base.SQLWTForm)
+
+ def get_data(self):
+ '''
+ Build and stash the SQL to be executed.
+
+ Returns:
+ A SQLData instance
+ '''
+ # Get the search_path
+ self.data = pgwui_core.sql.SQLData(
+ [pgwui_core.sql.SQLCommand(
+ 'SHOW search_path;', (), fetcher=_fetcher)])
+
+ def factory(self, ue):
+ '''Make a db loader function from an UploadEngine.
+
+ Input:
+
+ Side Effects:
+ Yes, lots.
+ '''
+ return pgwui_core.sql.ExecuteSQL(ue, self)
+
+ def render(self, errors, result):
+ '''Instead of rendering, just return our results so we can
+ decide what to do next.
+
+ Input:
+ errors List of Error instances
+ result Db connection result dict
+ '''
+ response = super().render(errors, result)
+ return (response, errors)
#
from pyramid.view import view_config
-import attrs
import logging
import pgwui_core.core
import pgwui_core.forms
import pgwui_sql.views.base
-import pgwui_sql.exceptions as sql_ex
+import pgwui_sql.views.search_path_base
from pgwui_common.view import auth_base_view
log = logging.getLogger(__name__)
-@attrs.define(slots=False)
-class SQLEditForm(pgwui_sql.views.base.SQLForm):
- '''Always set the "action" so that the SQL gets executed
- '''
- def read(self):
- super().read()
- self['action'] = 'u'
-
-
-# Utility functions
-def _fetcher(cur):
- '''Get the result of `SHOW search_path;` from a psycopg3 cursor.
- '''
- result = cur.fetchone()
- final = cur.fetchone()
- if final is not None:
- raise sql_ex.ExecutionError(
- 'Problem obtaining the search_path', '',
- f'Extra result row returned ({final})')
- return result[0]
-
-
-@attrs.define(slots=False)
-class SQLEditHandler(pgwui_core.core.SessionDBHandler):
- '''
- Execute pre-defined SQL statement(s)
- '''
- def make_form(self):
- return SQLEditForm().build(
- self, ip=pgwui_sql.views.base.SQLInitialPost(),
- fc=pgwui_sql.views.base.SQLWTForm)
-
- def get_data(self):
- '''
- Build and stash the SQL to be executed.
-
- Returns:
- A SQLData instance
- '''
- # Get the search_path
- self.data = pgwui_core.sql.SQLData(
- [pgwui_core.sql.SQLCommand(
- 'SHOW search_path;', (), fetcher=_fetcher)])
-
- def factory(self, ue):
- '''Make a db loader function from an UploadEngine.
-
- Input:
-
- Side Effects:
- Yes, lots.
- '''
- return pgwui_core.sql.ExecuteSQL(ue, self)
-
- def render(self, errors, result):
- '''Instead of rendering, just return our results so we can
- decide what to do next.
-
- Input:
- errors List of Error instances
- result Db connection result dict
- '''
- response = super().render(errors, result)
- return (response, errors)
-
-
@view_config(route_name='pgwui_sql_edit',
renderer='pgwui_sql:templates/sql_edit.mak')
@auth_base_view
# We don't have access to the CSRF token because the page is not
# sent a POST request.
- uh = SQLEditHandler(request).init()
+ uh = pgwui_sql.views.search_path_base.SQLEditHandler(request).init()
response, errors = pgwui_core.core.UnsafeUploadEngine(uh).run()
response['errors'] = errors
if errors: