# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
#
+from __future__ import generator_stop
from pyramid.view import view_config
import attr
class UploadBulkData(UploadData):
def __init__(self, fileo, file_fmt, null_data, null_rep,
- filepath, relation, trim=True):
+ path, relation, trim=True):
'''
- fileo Uploaded file object
+ fileo Stream to uploaded file
file_fmt File format: CSV or TAB
null_data (boolean) Uploaded data contains nulls
null_rep Uploaded string representation of null
- filepath Path of file (zip_root relative)
+ path pathlib path to file.
relation Relation, possibly schema qualified
trim (boolean) Trim leading and trailing whitespace?
+
+ filepath Path of file (zip_root relative)
+ reopened The file has been re-opened after reading the header line
'''
super().__init__(fileo, file_fmt, null_data, null_rep, trim=True)
- self.filepath = filepath
+ self.path = path
+ self.filepath = archive_path(path)
self.relation = relation
+ self.reopened = False
+
+ def _thunk(self):
+ '''Get the thunk which returns the next udl
+ '''
+ try:
+ yield from super()._thunk()
+ except ValueError:
+ # The file isn't open
+ if self.reopened:
+ super()._thunk().close()
+ return # skip this file
+ # Reopen the file, now that it is time to upload it
+ self.reopened = True
+ try:
+ self.open_fileo(self.path.open('rb'))
+ except OSError as exp:
+ # If the file does not open on the next iteration it is skipped
+ raise ex.CannotReadError(self.filepath, exp)
+ next(super()._thunk()) # skip header
+ yield from super()._thunk()
@attr.s
uf['upload_fmt'],
uf['upload_null'],
uf['null_rep'],
- archive_path(name),
+ name,
fmap['relation'],
trim=uf['trim_upload']))
except core_ex.PGWUIError as exp:
map_description(fileinfo.filepath, fileinfo.relation),
fileinfo.filepath, fileinfo.relation)
errors.append(exp)
+ # Limit number of open files, close the file handle until it
+ # is time to read the file
+ fileinfo.close_fileo()
if errors:
raise core_ex.MultiError(errors)