diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index aabc8148..2c62c824 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -40,6 +40,10 @@ def xloader_submit(context, data_dict): :param ignore_hash: If set to True, the xloader will reload the file even if it haven't changed. (optional, default: False) :type ignore_hash: bool + :param sync_mode: If set to True, the xloader callback will be executed right + away, instead of a job being enqueued. It will also delete any existing jobs + for the given resource. (optional, default: False) + :type sync_mode: bool Returns ``True`` if the job has been submitted and ``False`` if the job has not been submitted, i.e. when ckanext-xloader is not configured. @@ -53,6 +57,9 @@ def xloader_submit(context, data_dict): p.toolkit.check_access('xloader_submit', context, data_dict) + sync_mode = data_dict.pop('sync_mode', False) + #TODO: implement the sync_mode logic + res_id = data_dict['resource_id'] try: resource_dict = p.toolkit.get_action('resource_show')(context, { diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 89114783..cd702e81 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -138,6 +138,14 @@ groups: See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema for more details. + - key: ckanext.xloader.validation.chain_xloader + default: True + example: False + description: | + Resources that pass Validation will immediately get XLoadered instead of having + a job enqueued for it. + + If this option is set to `False`, jobs will be enqueued like normal. - key: ckanext.xloader.clean_datastore_tables default: False example: True diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 801c25c2..b939b76b 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -104,6 +104,12 @@ def notify(self, entity, operation): log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", entity.id) return + elif utils.do_chain_after_validation(resource_dict.get('id')): + # At this point, the Resource has passed validation requirements, + # and chainging is turned on. We will execute XLoader right away, + # instead of enqueueing a job. + self._submit_to_xloader(resource_dict, sync_mode=True) + return elif not getattr(entity, 'url_changed', False): # do not submit to xloader if the url has not changed. return @@ -118,6 +124,10 @@ def after_resource_create(self, context, resource_dict): "resource did not pass validation yet.", resource_dict.get('id')) return + if utils.do_chain_after_validation(resource_dict.get('id')): + self._submit_to_xloader(resource_dict, sync_mode=True) + return + self._submit_to_xloader(resource_dict) def before_resource_show(self, resource_dict): @@ -160,7 +170,7 @@ def before_show(self, resource_dict): def after_update(self, context, resource_dict): self.after_resource_update(context, resource_dict) - def _submit_to_xloader(self, resource_dict): + def _submit_to_xloader(self, resource_dict, sync_mode=False): context = {"ignore_auth": True, "defer_commit": True} if not XLoaderFormats.is_it_an_xloader_format(resource_dict["format"]): log.debug( @@ -187,6 +197,7 @@ def _submit_to_xloader(self, resource_dict): { "resource_id": resource_dict["id"], "ignore_hash": self.ignore_hash, + "sync_mode": sync_mode, }, ) except toolkit.ValidationError as e: diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index c0e8d938..2eac0baf 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -29,6 +29,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], + 'sync_mode': [ignore_missing, boolean_validator], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] } diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index eaad84ac..e7ccf923 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -2,6 +2,7 @@ import json import datetime +from rq import get_current_job from ckan import model from ckan.lib import search @@ -48,6 +49,7 @@ def is_it_an_xloader_format(cls, format_): def awaiting_validation(res_dict): + # type: (dict) -> bool """ Checks the existence of a logic action from the ckanext-validation plugin, thus supporting any extending of the Validation Plugin class. @@ -88,6 +90,39 @@ def awaiting_validation(res_dict): return False +def do_chain_after_validation(resource_id): + # type: (str) -> bool + if not p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)): + # we are not requiring resources to pass validation + return False + + if not p.toolkit.asbool(config.get('ckanext.xloader.validation.chain_xloader', True)): + # we are not chaining validation to xloader + return False + + current_job = get_current_job() + + if not current_job: + # we are outside of the job context, thus not running a job + return False + + if current_job.func_name != 'ckanext.validation.jobs.run_validation_job': + # the current running job is not the ckanext-validation validate job + #FIXME: how to do a better check for the caller in the stack?? + return False + + try: + job_rid = current_job.args[0].get('id', None) + except (KeyError): + job_rid = None + if resource_id != job_rid: + # the current running job's Resource ID is not + # the same as the passed Resource ID + return False + + return True + + def resource_data(id, resource_id, rows=None): if p.toolkit.request.method == "POST":