From e1c00d5f7256382944e833bf28ae385214fd21a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niccol=C3=B2=20Cant=C3=B9?= Date: Wed, 23 Oct 2024 13:54:10 +0200 Subject: [PATCH] procrastinate queues --- config/settings/base.py | 6 ++ docker-compose.yml | 27 +++--- metadata_catalogue/datasets/libs/ipt.py | 9 +- metadata_catalogue/datasets/models.py | 4 +- metadata_catalogue/datasets/tasks.py | 24 +++++ pdm.lock | 118 +++++++++++++++++++++++- pyproject.toml | 1 + 7 files changed, 164 insertions(+), 25 deletions(-) create mode 100644 metadata_catalogue/datasets/tasks.py diff --git a/config/settings/base.py b/config/settings/base.py index 0000d47..686bdea 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -50,6 +50,8 @@ DATABASES["default"]["ENGINE"] = "psqlextra.backend" POSTGRES_EXTRA_DB_BACKEND_BASE = "django.contrib.gis.db.backends.postgis" DATABASES["default"]["ATOMIC_REQUESTS"] = True +DATABASES["default"]["CONN_MAX_AGE"] = env.int("CONN_MAX_AGE", default=None) + # https://docs.djangoproject.com/en/stable/ref/settings/#std:setting-DEFAULT_AUTO_FIELD DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" @@ -106,6 +108,7 @@ "slippers", "fontawesomefree", "leaflet", + "procrastinate.contrib.django", ] LOCAL_APPS = [ @@ -406,3 +409,6 @@ DRF_STANDARDIZED_ERRORS = {"ENABLE_IN_DEBUG_FOR_UNHANDLED_EXCEPTIONS": True} TAILWIND_APP_NAME = "metadata_catalogue.theme" + +IPT_SOURCES = env.list("IPT_SOURCES", default=[]) +IPT_SOURCES_CRON = env("IPT_SOURCES_CRON", default="0 0 * * 0") diff --git a/docker-compose.yml b/docker-compose.yml index f9c2724..1b8746f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,7 @@ x-django-env: &django-env DJANGO_ACCOUNT_ALLOW_REGISTRATION: "True" USE_DOCKER: "yes" DJANGO_BASE_SCHEMA_URL: "http://django:8000" + IPT_SOURCES: "https://ipt.nina.no" x-django-prod-env: &django-prod-env <<: *django-env @@ -93,19 +94,19 @@ services: <<: *django-dev-env DJANGO_TAILWIND: 1 - # queue: - # <<: *django-prod - # environment: - # <<: *django-prod-env - # WAIT_FOR_HTTP: http://django:8000/ht/ - # command: python manage.py qcluster - - # queue-dev: - # <<: *django-dev - # environment: - # <<: *django-dev-env - # WAIT_FOR_HTTP: http://django:8000/ht/ - # command: python manage.py qcluster + queue: + <<: *django-prod + environment: + <<: *django-prod-env + WAIT_FOR_HTTP: http://django:8000/ht/ + command: python manage.py procrastinate worker + + queue-dev: + <<: *django-dev + environment: + <<: *django-dev-env + WAIT_FOR_HTTP: http://django:8000/ht/ + command: python manage.py procrastinate worker postgres: image: postgis/postgis:16-3.4 diff --git a/metadata_catalogue/datasets/libs/ipt.py b/metadata_catalogue/datasets/libs/ipt.py index 17b32d0..45120c6 100644 --- a/metadata_catalogue/datasets/libs/ipt.py +++ b/metadata_catalogue/datasets/libs/ipt.py @@ -3,8 +3,8 @@ from bs4 import BeautifulSoup from django.db import transaction -from metadata_catalogue.core.utils import async_task from metadata_catalogue.datasets.models import Dataset +from procrastinate.contrib.django import app logger = logging.getLogger(__name__) @@ -24,10 +24,9 @@ def rss_to_datasets(rss_content): }, fetch_url=archive.text, ) - async_task( - "metadata_catalogue.datasets.libs.harvesters.harvest_dataset", - d.id, - ) + app.configure_task( + name="harvest_dataset", + ).defer(dataset_id=d.id) else: logger.warn(f'no archive url found for {item.find("title").text}') except Exception: diff --git a/metadata_catalogue/datasets/models.py b/metadata_catalogue/datasets/models.py index 282bb4a..6935a2d 100644 --- a/metadata_catalogue/datasets/models.py +++ b/metadata_catalogue/datasets/models.py @@ -13,7 +13,7 @@ hook, ) from django.urls import reverse -from metadata_catalogue.core.utils import async_task +from procrastinate.contrib.django import app from solo.models import SingletonModel from .libs.iso.mapping import ISOMapping @@ -498,7 +498,7 @@ class Content(LifecycleModel): @hook(AFTER_SAVE, when_any=["gdal_vrt_definition"], has_changed=True) def check_is_valid(self): - async_task("metadata_catalogue.datasets.libs.checks.validate_vrt", self.id) + app.configure_task(name="validate_vrt").defer(content_id=self.id) def __str__(self): return str(self.dataset) diff --git a/metadata_catalogue/datasets/tasks.py b/metadata_catalogue/datasets/tasks.py new file mode 100644 index 0000000..ecb47ce --- /dev/null +++ b/metadata_catalogue/datasets/tasks.py @@ -0,0 +1,24 @@ +from procrastinate.contrib.django import app +from metadata_catalogue.datasets.libs.harvesters import harvest_ipt, harvest_dataset +from metadata_catalogue.datasets.libs.checks import validate_vrt + +from django.conf import settings +import logging + + +@app.periodic(cron=settings.IPT_SOURCES_CRON) +@app.task(name="harvest_ipt") +def harvest_ipt_task(timestamp: int): + for url in settings.IPT_SOURCES: + logging.info(f"fetching {url}") + harvest_ipt(ipt_url=url) + + +@app.task(name="harvest_dataset") +def harvest_dataset_task(dataset_id: int): + harvest_dataset(dataset_id=dataset_id) + + +@app.task(name="validate_vrt") +def validate_vrt_task(content_id): + validate_vrt(content_id=content_id) diff --git a/pdm.lock b/pdm.lock index 91a47f3..f016233 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "production"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:dbd53cf233703c9ca96ff1f38159e7373b38c894d9f44270eb848fa32596a1db" +content_hash = "sha256:dba625f0f33d152a457deba5a2030896a5ece9061965a6ae8924b6e719c45645" [[package]] name = "affine" @@ -43,6 +43,23 @@ files = [ {file = "ansimarkup-2.0.0.tar.gz", hash = "sha256:ffd040e822e6d329d42d250179bd3d9a9c9f6ed6936a30d17b5f7d56a8f03ef0"}, ] +[[package]] +name = "anyio" +version = "4.6.2.post1" +requires_python = ">=3.9" +summary = "High level compatibility layer for multiple asynchronous event loop implementations" +groups = ["default"] +dependencies = [ + "exceptiongroup>=1.0.2; python_version < \"3.11\"", + "idna>=2.8", + "sniffio>=1.1", + "typing-extensions>=4.1; python_version < \"3.11\"", +] +files = [ + {file = "anyio-4.6.2.post1-py3-none-any.whl", hash = "sha256:6d170c36fba3bdd840c73d3868c1e777e33676a69c3a72cf0a0d5d6d8009b61d"}, + {file = "anyio-4.6.2.post1.tar.gz", hash = "sha256:4c8bc31ccdb51c7f7bd251f51c609e038d63e34219b44aa86e47576389880b4c"}, +] + [[package]] name = "argon2-cffi" version = "23.1.0" @@ -137,7 +154,7 @@ name = "async-timeout" version = "4.0.3" requires_python = ">=3.7" summary = "Timeout context manager for asyncio programs" -groups = ["default"] +groups = ["production"] marker = "python_full_version < \"3.11.3\"" files = [ {file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"}, @@ -518,6 +535,21 @@ files = [ {file = "crispy_tailwind-1.0.3-py3-none-any.whl", hash = "sha256:31427f66b1c4fd0d6fb040f4197cfb97d104cdbe7641ea2dea940c0057c4db4b"}, ] +[[package]] +name = "croniter" +version = "3.0.3" +requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6" +summary = "croniter provides iteration for datetime object with cron like format" +groups = ["default"] +dependencies = [ + "python-dateutil", + "pytz>2021.1", +] +files = [ + {file = "croniter-3.0.3-py2.py3-none-any.whl", hash = "sha256:b3bd11f270dc54ccd1f2397b813436015a86d30ffc5a7a9438eec1ed916f2101"}, + {file = "croniter-3.0.3.tar.gz", hash = "sha256:34117ec1741f10a7bd0ec3ad7d8f0eb8fa457a2feb9be32e6a2250e158957668"}, +] + [[package]] name = "cssbeautifier" version = "1.15.1" @@ -903,7 +935,7 @@ name = "django-redis" version = "5.4.0" requires_python = ">=3.6" summary = "Full featured redis cache backend for Django." -groups = ["default"] +groups = ["production"] dependencies = [ "Django>=3.2", "redis!=4.0.0,!=4.0.1,>=3", @@ -1177,7 +1209,7 @@ name = "exceptiongroup" version = "1.2.2" requires_python = ">=3.7" summary = "Backport of PEP 654 (exception groups)" -groups = ["dev"] +groups = ["default", "dev"] marker = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, @@ -2092,6 +2124,41 @@ files = [ {file = "pre_commit-4.0.1.tar.gz", hash = "sha256:80905ac375958c0444c65e9cebebd948b3cdb518f335a091a670a89d652139d2"}, ] +[[package]] +name = "procrastinate" +version = "2.15.0" +requires_python = "<4.0,>=3.8" +summary = "Postgres-based distributed task processing library" +groups = ["default"] +dependencies = [ + "anyio", + "asgiref", + "attrs", + "croniter", + "psycopg[pool]", + "python-dateutil", +] +files = [ + {file = "procrastinate-2.15.0-py3-none-any.whl", hash = "sha256:7e2f0980ac1e1a9bda4fdc6e9580f89029844f1691602142a1e533ef5eb3c8cd"}, + {file = "procrastinate-2.15.0.tar.gz", hash = "sha256:b75d2361e0210bffbad9394f8b7ef9486a366f2a8b50590b23e6952d673228b0"}, +] + +[[package]] +name = "procrastinate" +version = "2.15.0" +extras = ["django"] +requires_python = "<4.0,>=3.8" +summary = "Postgres-based distributed task processing library" +groups = ["default"] +dependencies = [ + "django>=2.2", + "procrastinate==2.15.0", +] +files = [ + {file = "procrastinate-2.15.0-py3-none-any.whl", hash = "sha256:7e2f0980ac1e1a9bda4fdc6e9580f89029844f1691602142a1e533ef5eb3c8cd"}, + {file = "procrastinate-2.15.0.tar.gz", hash = "sha256:b75d2361e0210bffbad9394f8b7ef9486a366f2a8b50590b23e6952d673228b0"}, +] + [[package]] name = "prompt-toolkit" version = "3.0.48" @@ -2176,6 +2243,20 @@ files = [ {file = "psycopg_binary-3.2.3-cp313-cp313-win_amd64.whl", hash = "sha256:e90352d7b610b4693fad0feea48549d4315d10f1eba5605421c92bb834e90170"}, ] +[[package]] +name = "psycopg-pool" +version = "3.2.3" +requires_python = ">=3.8" +summary = "Connection Pool for Psycopg" +groups = ["default"] +dependencies = [ + "typing-extensions>=4.6", +] +files = [ + {file = "psycopg_pool-3.2.3-py3-none-any.whl", hash = "sha256:53bd8e640625e01b2927b2ad96df8ed8e8f91caea4597d45e7673fc7bbb85eb1"}, + {file = "psycopg_pool-3.2.3.tar.gz", hash = "sha256:bb942f123bef4b7fbe4d55421bd3fb01829903c95c0f33fd42b7e94e5ac9b52a"}, +] + [[package]] name = "psycopg" version = "3.2.3" @@ -2192,6 +2273,22 @@ files = [ {file = "psycopg-3.2.3.tar.gz", hash = "sha256:a5764f67c27bec8bfac85764d23c534af2c27b893550377e37ce59c12aac47a2"}, ] +[[package]] +name = "psycopg" +version = "3.2.3" +extras = ["pool"] +requires_python = ">=3.8" +summary = "PostgreSQL database adapter for Python" +groups = ["default"] +dependencies = [ + "psycopg-pool", + "psycopg==3.2.3", +] +files = [ + {file = "psycopg-3.2.3-py3-none-any.whl", hash = "sha256:644d3973fe26908c73d4be746074f6e5224b03c1101d302d9a53bf565ad64907"}, + {file = "psycopg-3.2.3.tar.gz", hash = "sha256:a5764f67c27bec8bfac85764d23c534af2c27b893550377e37ce59c12aac47a2"}, +] + [[package]] name = "ptyprocess" version = "0.7.0" @@ -2773,7 +2870,7 @@ name = "redis" version = "5.1.1" requires_python = ">=3.8" summary = "Python client for Redis database and key-value store" -groups = ["default"] +groups = ["production"] dependencies = [ "async-timeout>=4.0.3; python_full_version < \"3.11.3\"", ] @@ -3076,6 +3173,17 @@ files = [ {file = "slippers-0.6.2.tar.gz", hash = "sha256:4cb555b8822ba0d404e5405723f5d723994022c29046008ee917081031bc0cf1"}, ] +[[package]] +name = "sniffio" +version = "1.3.1" +requires_python = ">=3.7" +summary = "Sniff out which async library your code is running under" +groups = ["default"] +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + [[package]] name = "soupsieve" version = "2.6" diff --git a/pyproject.toml b/pyproject.toml index c07feb2..fab76f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -168,6 +168,7 @@ dependencies = [ "django-tables2", "django-leaflet", "setuptools>=75.2.0", + "procrastinate[django]>=2.15.0", ] requires-python = ">=3.10" name = ""