diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index c0f3f84f..22529559 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -140,17 +140,18 @@ def xloader_submit(context, data_dict): qualified=True ) data = { - 'api_key': utils.get_xloader_user_apitoken(), - 'job_type': 'xloader_to_datastore', - 'result_url': callback_url, - 'metadata': { - 'ignore_hash': data_dict.get('ignore_hash', False), - 'ckan_url': config['ckan.site_url'], - 'resource_id': res_id, - 'set_url_type': data_dict.get('set_url_type', False), - 'task_created': task['last_updated'], - 'original_url': resource_dict.get('url'), - } + "api_key": utils.get_xloader_user_apitoken(), + "job_type": "xloader_to_datastore", + "result_url": callback_url, + "metadata": { + "ignore_hash": data_dict.get("ignore_hash", False), + "ckan_url": config.get("ckanext.xloader.site_url") + or config["ckan.site_url"], + "resource_id": res_id, + "set_url_type": data_dict.get("set_url_type", False), + "task_created": task["last_updated"], + "original_url": resource_dict.get("url"), + }, } if custom_queue != rq_jobs.DEFAULT_QUEUE_NAME: # Don't automatically retry if it's a custom run diff --git a/ckanext/xloader/command.py b/ckanext/xloader/command.py index 4c0f2d2f..91a39acf 100644 --- a/ckanext/xloader/command.py +++ b/ckanext/xloader/command.py @@ -114,12 +114,11 @@ def _submit_resource(self, resource, user, indent=0, sync=False, queue=None): 'ignore_hash': True, } if sync: - data_dict['ckan_url'] = tk.config.get('ckan.site_url') - input_dict = { - 'metadata': data_dict, - 'api_key': 'TODO' - } - logger = logging.getLogger('ckanext.xloader.cli') + data_dict["ckan_url"] = tk.config.get( + "ckanext.xloader.site_url" + ) or tk.config.get("ckan.site_url") + input_dict = {"metadata": data_dict, "api_key": "TODO"} + logger = logging.getLogger("ckanext.xloader.cli") xloader_data_into_datastore_(input_dict, None, logger) else: if queue: diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 6d85e896..948a4050 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -2,6 +2,14 @@ version: 1 groups: - annotation: ckanext-xloader settings options: + - key: ckanext.xloader.site_url + example: http://ckan-dev:5000 + default: + description: | + Provide an alternate site URL for the xloader_submit action. + This is useful, for example, when the site is running within a docker network. + validators: configured_default("ckan.site_url",None) + required: false - key: ckanext.xloader.jobs_db.uri default: sqlite:////tmp/xloader_jobs.db description: | @@ -152,5 +160,3 @@ groups: they will also display "complete", "active", "inactive", and "unknown". type: bool required: false - - diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 3ac8ebba..0e3d0dfd 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -17,12 +17,14 @@ from rq import get_current_job import sqlalchemy as sa +from urllib.parse import urljoin, urlunsplit + from ckan import model from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config from . import db, loader from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError -from .utils import datastore_resource_exists, set_resource_metadata +from .utils import datastore_resource_exists, set_resource_metadata, modify_resource_url, modify_ckan_url try: from ckan.lib.api_token import get_user_from_token @@ -79,8 +81,10 @@ def xloader_data_into_datastore(input): # First flag that this task is running, to indicate the job is not # stillborn, for when xloader_submit is deciding whether another job would # be a duplicate or not + job_dict = dict(metadata=input['metadata'], status='running') + callback_xloader_hook(result_url=input['result_url'], api_key=input['api_key'], job_dict=job_dict) @@ -276,9 +280,11 @@ def _download_resource_data(resource, data, api_key, logger): data['datastore_contains_all_records_of_source_file'] = False which will be saved to the resource later on. ''' - # check scheme + url = resource.get('url') - url_parts = urlsplit(url) + url = modify_resource_url(url) + # check scheme + url_parts = urlsplit(url) scheme = url_parts.scheme if scheme not in ('http', 'https', 'ftp'): raise JobError( @@ -438,7 +444,8 @@ def callback_xloader_hook(result_url, api_key, job_dict): else: header, key = 'Authorization', api_key headers[header] = key - + + result_url = modify_ckan_url(result_url, job_dict['metadata']['ckan_url']) try: result = requests.post( result_url, diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 07af8db7..a7869e60 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -61,13 +61,11 @@ def configure(self, config_): else: self.ignore_hash = False - for config_option in ("ckan.site_url",): - if not config_.get(config_option): - raise Exception( - "Config option `{0}` must be set to use ckanext-xloader.".format( - config_option - ) - ) + site_url_configs = ("ckan.site_url", "ckanext.xloader.site_url") + if not any(site_url_configs): + raise Exception( + f"One of config options {site_url_configs} must be set to use ckanext-xloader." + ) # IDomainObjectModification diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index e819dad9..ab4cae87 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -59,17 +59,18 @@ def data(create_with_upload, apikey): "api.action", ver=3, logic_function="xloader_hook", qualified=True ) return { - 'api_key': apikey, - 'job_type': 'xloader_to_datastore', - 'result_url': callback_url, - 'metadata': { - 'ignore_hash': True, - 'ckan_url': toolkit.config.get('ckan.site_url'), - 'resource_id': resource["id"], - 'set_url_type': False, - 'task_created': datetime.utcnow().isoformat(), - 'original_url': resource["url"], - } + "api_key": apikey, + "job_type": "xloader_to_datastore", + "result_url": callback_url, + "metadata": { + "ignore_hash": True, + "ckan_url": toolkit.config.get("ckanext.xloader.site_url") + or toolkit.config.get("ckan.site_url"), + "resource_id": resource["id"], + "set_url_type": False, + "task_created": datetime.utcnow().isoformat(), + "original_url": resource["url"], + }, } @@ -89,6 +90,66 @@ def test_xloader_data_into_datastore(self, cli, data): resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) assert resource["datastore_contains_all_records_of_source_file"] + def test_download_resource_data_with_ckanext_xloader_site_url(self, cli, data): + # Set the ckanext.xloader.site_url in the config + with mock.patch.dict(toolkit.config, {'ckanext.xloader.site_url': 'http://xloader-site-url'}): + data['metadata']['original_url'] = 'http://xloader-site-url/resource.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + def test_download_resource_data_with_ckan_site_url(self, cli, data): + # Set the ckan.site_url in the config + with mock.patch.dict(toolkit.config, {'ckan.site_url': 'http://ckan-site-url'}): + data['metadata']['original_url'] = 'http://ckan-site-url/resource.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + def test_download_resource_data_with_different_original_url(self, cli, data): + # Set the ckan.site_url in the config + with mock.patch.dict(toolkit.config, {'ckan.site_url': 'http://ckan-site-url'}): + data['metadata']['original_url'] = 'http://external-site-url/resource.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + def test_callback_xloader_hook_with_ckanext_xloader_site_url(self, cli, data): + # Set the ckanext.xloader.site_url in the config + with mock.patch.dict(toolkit.config, {'ckanext.xloader.site_url': 'http://xloader-site-url'}): + data['result_url'] = 'http://xloader-site-url/api/3/action/xloader_hook' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + def test_callback_xloader_hook_with_ckan_site_url(self, cli, data): + # Set the ckan.site_url in the config + with mock.patch.dict(toolkit.config, {'ckan.site_url': 'http://ckan-site-url'}): + data['result_url'] = 'http://ckan-site-url/api/3/action/xloader_hook' + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + def test_xloader_ignore_hash(self, cli, data): self.enqueue(jobs.xloader_data_into_datastore, [data]) with mock.patch("ckanext.xloader.jobs.get_response", get_response): diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index db8ff06f..e7978682 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -2,6 +2,7 @@ import json import datetime +import re from six import text_type as str, binary_type @@ -13,6 +14,8 @@ import ckan.plugins as p from ckan.plugins.toolkit import config +from urllib.parse import urljoin, urlunsplit, urlparse + # resource.formats accepted by ckanext-xloader. Must be lowercase here. DEFAULT_FORMATS = [ "csv", @@ -107,6 +110,60 @@ def get_xloader_user_apitoken(): return site_user["apikey"] +def modify_ckan_url(result_url: str, ckan_url: str) -> str: + """ Modifies a URL based on CKAN site URL comparison. + + This function compares the base URL of a given result URL against a CKAN site URL. + If they differ, the result URL is modified to use the CKAN site URL as its base + while preserving the original path. + + Args: + result_url (str): The original URL to potentially modify + ckan_url (str): The base CKAN site URL to compare against + Returns: + str: The modified URL if base URLs differ, otherwise returns original URL unchanged + """ + parsed_url = urlparse(result_url) + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + if base_url != ckan_url: + path_url = parsed_url.path + result_url = urljoin(ckan_url, path_url) + + return result_url + + +def modify_resource_url(orig_ckan_url: str) -> str: + """Returns a potentially modified CKAN URL. + + This function takes a CKAN URL and potentially modifies its base URL while preserving the path, + query parameters, and fragments. The modification occurs only if two conditions are met: + 1. The base URL of the input matches the configured CKAN site URL + 2. An xloader_site_url is configured in the settings + + Args: + orig_ckan_url (str): The original CKAN URL to potentially modify + Returns: + str: Either the modified URL with new base URL from xloader_site_url, + or the original URL if conditions aren't met + """ + xloader_site_url = config.get('ckanext.xloader.site_url') + ckan_site_url = config.get('ckan.site_url') + + parsed_url = urlparse(orig_ckan_url) + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + + # If the base URL matches the CKAN site URL and xloader_site_url is set, modify the URL + if base_url == ckan_site_url and xloader_site_url: + modified_ckan_url = urljoin(xloader_site_url, parsed_url.path) + if parsed_url.query: + modified_ckan_url += f"?{parsed_url.query}" + if parsed_url.fragment: + modified_ckan_url += f"#{parsed_url.fragment}" + return modified_ckan_url + + return orig_ckan_url + + def set_resource_metadata(update_dict): ''' Set appropriate datastore_active flag on CKAN resource.