diff --git a/ami/main/migrations/0045_add_processing_services_status_check_celery_beat_task.py b/ami/main/migrations/0045_add_processing_services_status_check_celery_beat_task.py new file mode 100644 index 00000000..a169573c --- /dev/null +++ b/ami/main/migrations/0045_add_processing_services_status_check_celery_beat_task.py @@ -0,0 +1,33 @@ +from django.db import migrations +from django_celery_beat.models import PeriodicTask, CrontabSchedule + + +def create_periodic_task(apps, schema_editor): + crontab_schedule, _ = CrontabSchedule.objects.get_or_create( + minute="*/5", # Every 5 minutes + hour="*", # Every hour + day_of_week="*", # Every day + day_of_month="*", # Every day of month + month_of_year="*", # Every month + ) + + PeriodicTask.objects.get_or_create( + name="celery.check_processing_services_online", + task="ami.tasks.check_processing_services_online", + crontab=crontab_schedule, + ) + + +def delete_periodic_task(apps, schema_editor): + # Delete the task if rolling back + PeriodicTask.objects.filter(name="celery.check_processing_services_online").delete() + + +class Migration(migrations.Migration): + dependencies = [ + ("main", "0044_merge_20250124_2333"), + ] + + operations = [ + migrations.RunPython(create_periodic_task, delete_periodic_task), + ] diff --git a/ami/tasks.py b/ami/tasks.py index 47abfbd0..fce4883e 100644 --- a/ami/tasks.py +++ b/ami/tasks.py @@ -125,3 +125,24 @@ def save_model_instances(app_label: str, model_name: str, pks: list[int | str], result = all(results) if not result: logger.error(f"Failed to save all {len(instance_pks)} instances of {app_label}.{model_name}") + + +@celery_app.task(soft_time_limit=10, time_limit=20) +def check_processing_services_online(): + """ + Check the status of all processing services and update last checked. + """ + from ami.ml.models import ProcessingService + + logger.info("Checking if processing services are online.") + + services = ProcessingService.objects.all() + + for service in services: + logger.info(f"Checking service {service}") + try: + status_response = service.get_status() + logger.info(status_response) + except Exception as e: + logger.error(f"Error checking service {service}: {e}") + continue