diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 9fae67c2..9c6e0a67 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -28,6 +28,8 @@ except ImportError: get_user_from_token = None +log = logging.getLogger(__name__) + SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True)) if not SSL_VERIFY: requests.packages.urllib3.disable_warnings() @@ -82,7 +84,6 @@ def xloader_data_into_datastore(input): db.mark_job_as_errored(job_id, str(e)) job_dict['status'] = 'error' job_dict['error'] = str(e) - log = logging.getLogger(__name__) log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc())) errored = True except Exception as e: @@ -90,7 +91,6 @@ def xloader_data_into_datastore(input): job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e)) job_dict['status'] = 'error' job_dict['error'] = str(e) - log = logging.getLogger(__name__) log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc())) errored = True finally: diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 11eb637c..4da314a8 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -16,7 +16,7 @@ import ckan.plugins as p from .job_exceptions import FileCouldNotBeLoadedError, LoaderError -from .parser import CSV_SAMPLE_LINES, XloaderCSVParser +from .parser import CSV_SAMPLE_LINES, TypeConverter from .utils import datastore_resource_exists, headers_guess, type_guess from ckan.plugins.toolkit import config @@ -238,13 +238,13 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): try: file_format = os.path.splitext(table_filepath)[1].strip('.') with Stream(table_filepath, format=file_format, - custom_parsers={'csv': XloaderCSVParser}) as stream: + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] with Stream(table_filepath, format=file_format, - custom_parsers={'csv': XloaderCSVParser}) as stream: + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) @@ -279,9 +279,10 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): for t, h in zip(types, headers)] headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()] + type_converter = TypeConverter(types=types) with Stream(table_filepath, format=file_format, skip_rows=skip_rows, - custom_parsers={'csv': XloaderCSVParser}) as stream: + post_parse=[type_converter.convert_types]) as stream: def row_iterator(): for row in stream: data_row = {} diff --git a/ckanext/xloader/parser.py b/ckanext/xloader/parser.py index 82539f4d..812ccd1f 100644 --- a/ckanext/xloader/parser.py +++ b/ckanext/xloader/parser.py @@ -1,161 +1,71 @@ # -*- coding: utf-8 -*- -import csv +import datetime from decimal import Decimal, InvalidOperation -from itertools import chain +import re +import six from ckan.plugins.toolkit import asbool -from dateutil.parser import isoparser, parser -from dateutil.parser import ParserError - -from tabulator import helpers -from tabulator.parser import Parser +from dateutil.parser import isoparser, parser, ParserError from ckan.plugins.toolkit import config CSV_SAMPLE_LINES = 1000 +DATE_REGEX = re.compile(r'''^\d{1,4}[-/.\s]\S+[-/.\s]\S+''') -class XloaderCSVParser(Parser): - """Extends tabulator CSVParser to detect datetime and numeric values. +class TypeConverter: + """ Post-process table cells to convert strings into numbers and timestamps + as desired. """ - # Public - - options = [ - 'delimiter', - 'doublequote', - 'escapechar', - 'quotechar', - 'quoting', - 'skipinitialspace', - 'lineterminator' - ] - - def __init__(self, loader, force_parse=False, **options): - super(XloaderCSVParser, self).__init__(loader, force_parse, **options) - # Set attributes - self.__loader = loader - self.__options = options - self.__force_parse = force_parse - self.__extended_rows = None - self.__encoding = None - self.__dialect = None - self.__chars = None - - @property - def closed(self): - return self.__chars is None or self.__chars.closed - - def open(self, source, encoding=None): - # Close the character stream, if necessary, before reloading it. - self.close() - self.__chars = self.__loader.load(source, encoding=encoding) - self.__encoding = getattr(self.__chars, 'encoding', encoding) - if self.__encoding: - self.__encoding.lower() - self.reset() - - def close(self): - if not self.closed: - self.__chars.close() - - def reset(self): - helpers.reset_stream(self.__chars) - self.__extended_rows = self.__iter_extended_rows() - - @property - def encoding(self): - return self.__encoding - - @property - def dialect(self): - if self.__dialect: - dialect = { - 'delimiter': self.__dialect.delimiter, - 'doubleQuote': self.__dialect.doublequote, - 'lineTerminator': self.__dialect.lineterminator, - 'quoteChar': self.__dialect.quotechar, - 'skipInitialSpace': self.__dialect.skipinitialspace, - } - if self.__dialect.escapechar is not None: - dialect['escapeChar'] = self.__dialect.escapechar - return dialect - - @property - def extended_rows(self): - return self.__extended_rows - - # Private - - def __iter_extended_rows(self): - - def type_value(value): - """Returns numeric values as Decimal(). Uses dateutil to parse - date values. Otherwise, returns values as it receives them - (strings). - """ - if value in ('', None): - return '' - - try: - return Decimal(value) - except InvalidOperation: - pass - - try: - i = isoparser() - return i.isoparse(value) - except ValueError: - pass - - try: - p = parser() - yearfirst = asbool(config.get( - 'ckanext.xloader.parse_dates_yearfirst', False)) - dayfirst = asbool(config.get( - 'ckanext.xloader.parse_dates_dayfirst', False)) - return p.parse(value, yearfirst=yearfirst, dayfirst=dayfirst) - except ParserError: - pass - - return value - - sample, dialect = self.__prepare_dialect(self.__chars) - items = csv.reader(chain(sample, self.__chars), dialect=dialect) - for row_number, item in enumerate(items, start=1): - values = [] - for value in item: - value = type_value(value) - values.append(value) - yield row_number, None, list(values) - - def __prepare_dialect(self, stream): - - # Get sample - sample = [] - while True: - try: - sample.append(next(stream)) - except StopIteration: - break - if len(sample) >= CSV_SAMPLE_LINES: - break - - # Get dialect + def __init__(self, types=None): + self.types = types + + def convert_types(self, extended_rows): + """ Try converting cells to numbers or timestamps if applicable. + If a list of types was supplied, use that. + If not, then try converting each column to numeric first, + then to a timestamp. If both fail, just keep it as a string. + """ + for row_number, headers, row in extended_rows: + for cell_index, cell_value in enumerate(row): + if cell_value is None: + row[cell_index] = '' + if not cell_value: + continue + cell_type = self.types[cell_index] if self.types else None + if cell_type in [Decimal, None]: + converted_value = to_number(cell_value) + if converted_value: + row[cell_index] = converted_value + continue + if cell_type in [datetime.datetime, None]: + converted_value = to_timestamp(cell_value) + if converted_value: + row[cell_index] = converted_value + yield (row_number, headers, row) + + +def to_number(value): + if not isinstance(value, six.string_types): + return None + try: + return Decimal(value) + except InvalidOperation: + return None + + +def to_timestamp(value): + if not isinstance(value, six.string_types) or not DATE_REGEX.search(value): + return None + try: + i = isoparser() + return i.isoparse(value) + except ValueError: try: - separator = '' - delimiter = self.__options.get('delimiter', ',\t;|') - dialect = csv.Sniffer().sniff(separator.join(sample), delimiter) - if not dialect.escapechar: - dialect.doublequote = True - except csv.Error: - class dialect(csv.excel): - pass - for key, value in self.__options.items(): - setattr(dialect, key, value) - # https://github.com/frictionlessdata/FrictionlessDarwinCore/issues/1 - if getattr(dialect, 'quotechar', None) == '': - setattr(dialect, 'quoting', csv.QUOTE_NONE) - - self.__dialect = dialect - return sample, dialect + p = parser() + yearfirst = asbool(config.get('ckanext.xloader.parse_dates_yearfirst', False)) + dayfirst = asbool(config.get('ckanext.xloader.parse_dates_dayfirst', False)) + return p.parse(value, yearfirst=yearfirst, dayfirst=dayfirst) + except ParserError: + return None diff --git a/ckanext/xloader/tests/samples/non_timestamp_sample.csv b/ckanext/xloader/tests/samples/non_timestamp_sample.csv new file mode 100644 index 00000000..d1b39e90 --- /dev/null +++ b/ckanext/xloader/tests/samples/non_timestamp_sample.csv @@ -0,0 +1,4 @@ +Title,Postal postcode,Latitude,Longitude,Mon am,Mon pm,Last updated +Adavale,4474,-25.9092582,144.5975769,8:00,16:00,19/07/2018 +Aramac,4726,-22.971298,145.241481,9:00-13:00,14:00-16:45,17/07/2018 +Barcaldine,4725,-23.55327901,145.289156,9:00-12:30,13:30-16:30,20/07/2018 diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index d55ec949..f17e6c10 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -1215,3 +1215,24 @@ def test_with_mixed_quotes(self, Session): logger=logger, ) assert len(self._get_records(Session, resource_id)) == 2 + + def test_preserving_time_ranges(self, Session): + """ Time ranges should not be treated as timestamps + """ + csv_filepath = get_sample_filepath("non_timestamp_sample.csv") + resource = factories.Resource() + resource_id = resource['id'] + loader.load_table( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + assert self._get_records(Session, resource_id) == [ + (1, "Adavale", 4474, Decimal("-25.9092582"), Decimal("144.5975769"), + "8:00", "16:00", datetime.datetime(2018, 7, 19)), + (2, "Aramac", 4726, Decimal("-22.971298"), Decimal("145.241481"), + "9:00-13:00", "14:00-16:45", datetime.datetime(2018, 7, 17)), + (3, "Barcaldine", 4725, Decimal("-23.55327901"), Decimal("145.289156"), + "9:00-12:30", "13:30-16:30", datetime.datetime(2018, 7, 20)) + ] diff --git a/ckanext/xloader/tests/test_parser.py b/ckanext/xloader/tests/test_parser.py index 67929d9f..ac4047dd 100644 --- a/ckanext/xloader/tests/test_parser.py +++ b/ckanext/xloader/tests/test_parser.py @@ -6,7 +6,7 @@ from datetime import datetime from tabulator import Stream -from ckanext.xloader.parser import XloaderCSVParser +from ckanext.xloader.parser import TypeConverter csv_filepath = os.path.abspath( os.path.join(os.path.dirname(__file__), "samples", "date_formats.csv") @@ -16,7 +16,7 @@ class TestParser(object): def test_simple(self): with Stream(csv_filepath, format='csv', - custom_parsers={'csv': XloaderCSVParser}) as stream: + post_parse=[TypeConverter().convert_types]) as stream: assert stream.sample == [ [ 'date', @@ -49,7 +49,7 @@ def test_simple(self): def test_dayfirst(self): print('test_dayfirst') with Stream(csv_filepath, format='csv', - custom_parsers={'csv': XloaderCSVParser}) as stream: + post_parse=[TypeConverter().convert_types]) as stream: assert stream.sample == [ [ 'date', @@ -82,7 +82,7 @@ def test_dayfirst(self): def test_yearfirst(self): print('test_yearfirst') with Stream(csv_filepath, format='csv', - custom_parsers={'csv': XloaderCSVParser}) as stream: + post_parse=[TypeConverter().convert_types]) as stream: assert stream.sample == [ [ 'date', @@ -115,7 +115,7 @@ def test_yearfirst(self): @pytest.mark.ckan_config("ckanext.xloader.parse_dates_yearfirst", True) def test_yearfirst_dayfirst(self): with Stream(csv_filepath, format='csv', - custom_parsers={'csv': XloaderCSVParser}) as stream: + post_parse=[TypeConverter().convert_types]) as stream: assert stream.sample == [ [ 'date',