Skip to content

Commit

Permalink
Merge pull request #31 from qld-gov-au/develop
Browse files Browse the repository at this point in the history
Develop to master
  • Loading branch information
ThrawnCA authored Jun 28, 2021
2 parents 8ecfa62 + 9946bab commit 79d6ea0
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 68 deletions.
21 changes: 21 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[flake8]
# @see https://flake8.pycqa.org/en/latest/user/configuration.html?highlight=.flake8

exclude =
ckan
scripts

# Extended output format.
format = pylint

# Show the source of errors.
show_source = True

max-complexity = 10
max-line-length = 127

# List ignore rules one per line.
ignore =
E501
C901
W503
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#based on https://raw.githubusercontent.com/ckan/ckanext-scheming/master/.github/workflows/test.yml
# alternative https://github.com/ckan/ckan/blob/master/contrib/cookiecutter/ckan_extension/%7B%7Bcookiecutter.project%7D%7D/.github/workflows/test.yml
name: Tests
on: [push, pull_request]
on: [push]
jobs:
lint:
runs-on: ubuntu-18.04
Expand Down
19 changes: 9 additions & 10 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from dateutil.parser import parse as parse_date

import ckan.model as model
import ckan.lib.navl.dictization_functions
import ckan.logic as logic
import ckan.plugins as p
Expand Down Expand Up @@ -115,12 +114,12 @@ def xloader_submit(context, data_dict):
job.description).groups()[0]
for job in get_queue().get_jobs()
if 'xloader_to_datastore' in six.text_type(job) # filter out test_job etc
]
]
updated = datetime.datetime.strptime(
existing_task['last_updated'], '%Y-%m-%dT%H:%M:%S.%f')
time_since_last_updated = datetime.datetime.utcnow() - updated
if (res_id not in queued_res_ids and
time_since_last_updated > assume_task_stillborn_after):
if (res_id not in queued_res_ids
and time_since_last_updated > assume_task_stillborn_after):
# it's not on the queue (and if it had just been started then
# its taken too long to update the task_status from pending -
# the first thing it should do in the xloader job).
Expand Down Expand Up @@ -163,8 +162,8 @@ def xloader_submit(context, data_dict):
'set_url_type': data_dict.get('set_url_type', False),
'task_created': task['last_updated'],
'original_url': resource_dict.get('url'),
}
}
}
timeout = config.get('ckanext.xloader.job_timeout', '3600')
try:
try:
Expand Down Expand Up @@ -282,8 +281,8 @@ def xloader_hook(context, data_dict):
})

# Check if the uploaded file has been modified in the meantime
if (resource_dict.get('last_modified') and
metadata.get('task_created')):
if (resource_dict.get('last_modified')
and metadata.get('task_created')):
try:
last_modified_datetime = parse_date(
resource_dict['last_modified'])
Expand All @@ -295,9 +294,9 @@ def xloader_hook(context, data_dict):
except ValueError:
pass
# Check if the URL of the file has been modified in the meantime
elif (resource_dict.get('url') and
metadata.get('original_url') and
resource_dict['url'] != metadata['original_url']):
elif (resource_dict.get('url')
and metadata.get('original_url')
and resource_dict['url'] != metadata['original_url']):
log.debug('URLs are different: {0} != {1}'.format(
resource_dict['url'], metadata['original_url']))
resubmit = True
Expand Down
13 changes: 6 additions & 7 deletions ckanext/xloader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,15 @@ def _submit_resource(self, resource, user, indent=0):
from ckanext.xloader.plugin import XLoaderFormats

if not XLoaderFormats.is_it_an_xloader_format(resource['format']):
print(' ' * indent +
'Skipping resource {r[id]} because format "{r[format]}" is '
print(' ' * indent
+ 'Skipping resource {r[id]} because format "{r[format]}" is '
'not configured to be xloadered'.format(r=resource))
return
if resource['url_type'] in ('datapusher', 'xloader'):
print(' ' * indent +
'Skipping resource {r[id]} because url_type "{r[url_type]}" '
print(' ' * indent
+ 'Skipping resource {r[id]} because url_type "{r[url_type]}" '
'means resource.url points to the datastore '
'already, so loading would be circular.'.format(
r=resource))
'already, so loading would be circular.'.format(r=resource))
return
dataset_ref = resource.get('package_name', resource['package_id'])
print('{indent}Submitting /dataset/{dataset}/resource/{r[id]}\n'
Expand Down Expand Up @@ -233,7 +232,7 @@ def _print_status(self):
enqueued=job.enqueued_at,
res_id=job_metadata['resource_id'],
url=job_metadata['original_url'],
))
))


class MigrateTypesCommand(cli.CkanCommand):
Expand Down
6 changes: 3 additions & 3 deletions ckanext/xloader/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def _init_jobs_table():
sqlalchemy.Column('result_url', sqlalchemy.UnicodeText),
# CKAN API key:
sqlalchemy.Column('api_key', sqlalchemy.UnicodeText),
)
)
return _jobs_table


Expand All @@ -424,7 +424,7 @@ def _init_metadata_table():
sqlalchemy.Column('key', sqlalchemy.UnicodeText, primary_key=True),
sqlalchemy.Column('value', sqlalchemy.UnicodeText, index=True),
sqlalchemy.Column('type', sqlalchemy.UnicodeText),
)
)
return _metadata_table


Expand All @@ -441,7 +441,7 @@ def _init_logs_table():
sqlalchemy.Column('module', sqlalchemy.UnicodeText),
sqlalchemy.Column('funcName', sqlalchemy.UnicodeText),
sqlalchemy.Column('lineno', sqlalchemy.Integer)
)
)
return _logs_table


Expand Down
7 changes: 3 additions & 4 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import sqlalchemy as sa

import ckan.model as model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, c
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound
try:
from ckan.plugins.toolkit import config
except ImportError:
Expand Down Expand Up @@ -136,12 +136,11 @@ def xloader_data_into_datastore_(input, job_dict):

data = input['metadata']

ckan_url = data['ckan_url']
resource_id = data['resource_id']
api_key = input.get('api_key')
try:
resource, dataset = get_resource_and_dataset(resource_id, api_key)
except (JobError, ObjectNotFound) as e:
except (JobError, ObjectNotFound):
# try again in 5 seconds just in case CKAN is slow at adding resource
time.sleep(5)
resource, dataset = get_resource_and_dataset(resource_id, api_key)
Expand Down Expand Up @@ -335,7 +334,7 @@ def _download_resource_data(resource, data, api_key, logger):
def get_response(url, headers):
def get_url():
kwargs = {'headers': headers, 'timeout': DOWNLOAD_TIMEOUT,
'verify': SSL_VERIFY, 'stream': True} # just gets the headers for now
'verify': SSL_VERIFY, 'stream': True} # just gets the headers for now
if 'ckan.download_proxy' in config:
proxy = config.get('ckan.download_proxy')
kwargs['proxies'] = {'http': proxy, 'https': proxy}
Expand Down
21 changes: 11 additions & 10 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
data_dict = dict(
resource_id=resource_id,
fields=fields,
)
)
data_dict['records'] = None # just create an empty table
data_dict['force'] = True # TODO check this - I don't fully
# understand read-only/datastore resources
Expand Down Expand Up @@ -220,7 +220,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
for h in headers]),
delimiter=delimiter,
encoding='UTF8',
),
),
f)
except psycopg2.DataError as e:
# e is a str but with foreign chars e.g.
Expand Down Expand Up @@ -254,7 +254,7 @@ def create_column_indexes(fields, resource_id, logger):
data_dict = dict(
resource_id=resource_id,
fields=fields,
)
)
engine = get_write_engine()
connection = context['connection'] = engine.connect()

Expand Down Expand Up @@ -283,7 +283,7 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):

try:
table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct)
except messytables.ReadError as e:
except messytables.ReadError:
# try again with format
tmp.seek(0)
try:
Expand Down Expand Up @@ -313,10 +313,11 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):

# override with types user requested
if existing_info:
types = [{
'text': messytables.StringType(),
'numeric': messytables.DecimalType(),
'timestamp': messytables.DateUtilType(),
types = [
{
'text': messytables.StringType(),
'numeric': messytables.DecimalType(),
'timestamp': messytables.DateUtilType(),
}.get(existing_info.get(h, {}).get('type_override'), t)
for t, h in zip(types, headers)]

Expand Down Expand Up @@ -529,11 +530,11 @@ def _populate_fulltext(connection, resource_id, fields):
'coalesce({}, \'\')'.format(
identifier(field['id'])
+ ('::text' if field['type'] != 'text' else '')
)
)
for field in fields
if not field['id'].startswith('_')
)
)
)
connection.execute(sql)


Expand Down
29 changes: 16 additions & 13 deletions ckanext/xloader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

class XLoaderFormats(object):
formats = None

@classmethod
def is_it_an_xloader_format(cls, format_):
if cls.formats is None:
Expand All @@ -48,12 +49,14 @@ class xloaderPlugin(plugins.SingletonPlugin):

if toolkit.check_ckan_version('2.9'):
plugins.implements(plugins.IBlueprint)

# IBlueprint
def get_blueprint(self):
from ckanext.xloader.views import get_blueprints
return get_blueprints()
else:
plugins.implements(plugins.IRoutes, inherit=True)

# IRoutes
def before_map(self, m):
m.connect(
Expand Down Expand Up @@ -107,8 +110,8 @@ def configure(self, config_):

def notify(self, entity, operation=None):
if isinstance(entity, model.Resource):
if (operation == model.domain_object.DomainObjectOperation.new or
not operation):
if (not operation
or operation == model.domain_object.DomainObjectOperation.new):
# if operation is None, resource URL has been changed, as
# the notify function in IResourceUrlChange only takes
# 1 parameter
Expand All @@ -127,22 +130,22 @@ def notify(self, entity, operation=None):
'would be circular.'.format(r=entity))
return

try:
task = p.toolkit.get_action('task_status_show')(
context, {
'entity_id': entity.id,
'task_type': 'xloader',
'key': 'xloader'}
)
# try:
# task = p.toolkit.get_action('task_status_show')(
# context, {
# 'entity_id': entity.id,
# 'task_type': 'xloader',
# 'key': 'xloader'}
# )
# if task.get('state') == 'pending':
# # There already is a pending DataPusher submission,
# # skip this one ...
# log.debug(
# 'Skipping DataPusher submission for '
# 'resource {0}'.format(entity.id))
# return
except p.toolkit.ObjectNotFound:
pass
# except p.toolkit.ObjectNotFound:
# pass

try:
log.debug('Submitting resource {0} to be xloadered'
Expand All @@ -164,15 +167,15 @@ def get_actions(self):
'xloader_submit': action.xloader_submit,
'xloader_hook': action.xloader_hook,
'xloader_status': action.xloader_status,
}
}

# IAuthFunctions

def get_auth_functions(self):
return {
'xloader_submit': auth.xloader_submit,
'xloader_status': auth.xloader_status,
}
}

# ITemplateHelpers

Expand Down
3 changes: 0 additions & 3 deletions ckanext/xloader/tests/ckan_setup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
try:
from ckan.tests.pytest_ckan.ckan_setup import *
except ImportError:
from ckan.config.middleware import make_app
from ckan.common import config

import pkg_resources
from paste.deploy import loadapp
import sys
Expand Down
2 changes: 0 additions & 2 deletions ckanext/xloader/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sqlalchemy.orm as orm
import os

from ckan.tests import helpers
from ckanext.datastore.tests import helpers as datastore_helpers
from ckanext.xloader.loader import get_write_engine

Expand All @@ -13,7 +12,6 @@

try:
from ckan.tests.pytest_ckan.fixtures import *

except ImportError:
import pytest

Expand Down
1 change: 0 additions & 1 deletion ckanext/xloader/tests/test_action.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest
import mock

import ckan.plugins as p
from ckan.tests import helpers, factories


Expand Down
Loading

0 comments on commit 79d6ea0

Please sign in to comment.