Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Job HTML response #789

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
7 changes: 5 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ Changes

Changes:
--------
- No change.
- Add `Job` status `HTML` response (resolves `#779 <https://github.com/crim-ca/weaver/issues/779>`_).
- Add the ``process`` property to `Job` status response when requesting ``profile=openEO``,
with a direct reference to the underlying `CWL` `Application Package` of the main `Process` ran by the `Job`.

Fixes:
------
- No change.
- Fix reported ``$schema`` to point at the `openEO` *Batch Job* `OenAPI` definition when requesting ``profile=openEO``.
- Fix `Job` statistics not reported by the API in case of execution failure, although they might be partially available.

.. _changes_6.2.0:

Expand Down
10 changes: 8 additions & 2 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,11 @@ def user_id(self, user_id):
raise TypeError(f"Type 'int', 'str' or a UUID is required for '{self.__name__}.user_id'")
self["user_id"] = user_id

@property
def success(self):
# type: () -> bool
return map_status(self.status, category=True) == StatusCategory.SUCCESS

@property
def status(self):
# type: () -> Status
Expand Down Expand Up @@ -1624,8 +1629,8 @@ def links(self, container=None, self_link=None):
link.setdefault(meta, param)
return job_links

def json(self, container=None): # pylint: disable=W0221,arguments-differ
# type: (Optional[AnySettingsContainer]) -> JSON
def json(self, container=None, **kwargs): # pylint: disable=W0221,arguments-differ
# type: (Optional[AnySettingsContainer], **JSON) -> JSON
"""
Obtains the :term:`JSON` data representation for :term:`Job` response body.

Expand Down Expand Up @@ -1659,6 +1664,7 @@ def json(self, container=None): # pylint: disable=W0221,arguments-differ
"progress": int(self.progress),
"links": self.links(settings, self_link="status")
}
job_json.update(**kwargs)
return sd.JobStatusInfo().deserialize(job_json)

def params(self):
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ def submit_job_dispatch_task(
job = store.fetch_by_id(job.id)
# when sync is successful, it must return the results direct instead of status info
# see: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response
if job.status in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]:
if job.success:
_, _, sync_applied = parse_prefer_header_execute_mode(req_headers, [ExecuteControlOption.SYNC])
if sync_applied:
resp_headers.update(sync_applied)
Expand Down
62 changes: 55 additions & 7 deletions weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from weaver.processes.wps_package import mask_process_inputs
from weaver.status import JOB_STATUS_CATEGORIES, StatusCategory, StatusCompliant, map_status
from weaver.store.base import StoreJobs
from weaver.utils import get_header, get_settings, make_link_header
from weaver.utils import get_header, get_path_kvp, get_settings, make_link_header
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.jobs.utils import (
dismiss_job_task,
Expand Down Expand Up @@ -317,6 +317,16 @@ def trigger_job_execution(request):
return submit_job_dispatch_task(job, container=request, force_submit=True)


@sd.provider_jobs_service.get(
tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROVIDERS],
schema=sd.GetProviderJobEndpoint(),
accept=ContentType.TEXT_HTML,
renderer="weaver.wps_restapi:templates/responses/job_status.mako",
response_schemas=sd.derive_responses(
sd.get_provider_single_job_status_responses,
sd.GenericHTMLResponse(name="HTMLProviderJobStatus", description="Job status.")
),
)
@sd.provider_job_service.get(
tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROVIDERS],
schema=sd.GetProviderJobEndpoint(),
Expand All @@ -328,7 +338,17 @@ def trigger_job_execution(request):
response_schemas=sd.get_provider_single_job_status_responses,
)
@sd.process_job_service.get(
tags=[sd.TAG_PROCESSES, sd.TAG_JOBS, sd.TAG_STATUS],
tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROCESSES],
schema=sd.GetProcessJobEndpoint(),
accept=ContentType.TEXT_HTML,
renderer="weaver.wps_restapi:templates/responses/job_status.mako",
response_schemas=sd.derive_responses(
sd.get_single_job_status_responses,
sd.GenericHTMLResponse(name="HTMLProcessJobStatus", description="Job status.")
),
)
@sd.process_job_service.get(
tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROCESSES],
schema=sd.GetProcessJobEndpoint(),
accept=[ContentType.APP_JSON] + [
f"{ContentType.APP_JSON}; profile={profile}"
Expand All @@ -337,6 +357,16 @@ def trigger_job_execution(request):
renderer=OutputFormat.JSON,
response_schemas=sd.get_single_job_status_responses,
)
@sd.job_service.get(
tags=[sd.TAG_JOBS, sd.TAG_STATUS],
schema=sd.GetJobEndpoint(),
accept=ContentType.TEXT_HTML,
renderer="weaver.wps_restapi:templates/responses/job_status.mako",
response_schemas=sd.derive_responses(
sd.get_single_job_status_responses,
sd.GenericHTMLResponse(name="HTMLJobStatus", description="Job status.")
),
)
@sd.job_service.get(
tags=[sd.TAG_JOBS, sd.TAG_STATUS],
schema=sd.GetJobEndpoint(),
Expand All @@ -349,16 +379,34 @@ def trigger_job_execution(request):
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
def get_job_status(request):
# type: (PyramidRequest) -> HTTPOk
# type: (PyramidRequest) -> AnyViewResponse
"""
Retrieve the status of a job.
"""
job = get_job(request)
job_body = job.json(request)
# resolve the job and the requested profile/schema representation
schema, headers = get_job_status_schema(request)
job = get_job(request)

# apply additional properties that are profile-dependant
# properties applied in 'job_prop' must succeed schema validation as well
job_prop = {}
if schema == JobStatusSchema.OPENEO:
cwl_url = get_path_kvp(job.process_url(request) + "/package", f=OutputFormat.JSON)
job_prop = {"process": {"title": "CWL Application Package", "href": cwl_url, "type": ContentType.APP_CWL_JSON}}
job_body = job.json(request, **job_prop)
if schema == JobStatusSchema.OPENEO:
# additional properties that are not validated explicitly
# align the content with metadata schema
# status is defined here to limit the combinations reported in OpenAPI as OGC-only statuses
job_body["$schema"] = sd.OPENEO_API_SCHEMA_JOB_STATUS_URL
job_body["type"] = JobStatusSchema.OPENEO
job_body["status"] = map_status(job_body["status"], StatusCompliant.OPENEO)
return HTTPOk(json=job_body, headers=headers)

# adjust response contents according to rendering
# provide 'job' object directly for HTML templating to allow extra operations dynamically
if ContentType.APP_JSON in str(headers.get("Content-Type")):
return HTTPOk(json=job_body, headers=headers)
return Box(**job_body, job=job, box_intact_types=[Job])


@sd.provider_job_service.patch(
Expand Down Expand Up @@ -741,7 +789,7 @@ def get_job_stats(request):
"""
job = get_job(request)
raise_job_dismissed(job, request)
if job.status not in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]:
if not job.is_finished:
raise JobStatisticsNotFound(json={
"title": "NoJobStatistics",
"type": "no-job-statistics", # unofficial
Expand Down
10 changes: 5 additions & 5 deletions weaver/wps_restapi/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,8 @@ def get_job_status_schema(request):
def make_headers(resolved_schema):
# type: (JobStatusSchemaType) -> HeadersType
content_type = clean_media_type_format(content_accept.split(",")[0], strip_parameters=True)
# FIXME: support HTML or XML
# (allow transparently for browsers types since Accept did not raise earlier, and no other supported yet)
if content_type in ContentType.ANY_XML | {ContentType.TEXT_HTML}:
content_type = ContentType.APP_JSON
return {"Content-Type": content_type}
content_profile = f"{content_type}; profile={resolved_schema}"
content_headers = {"Content-Type": content_profile}
if resolved_schema == JobStatusSchema.OGC:
Expand All @@ -356,7 +354,9 @@ def make_headers(resolved_schema):
return schema, headers
ctype = get_header("Accept", request.headers)
if not ctype:
return JobStatusSchema.OGC, {}
schema = JobStatusSchema.OGC
headers = make_headers(schema)
return schema, headers
params = parse_kvp(ctype)
profile = params.get("profile")
if not profile:
Expand Down Expand Up @@ -1287,7 +1287,7 @@ def raise_job_bad_status_success(job, container=None):
"""
Raise the appropriate message for :term:`Job` not ready or unable to retrieve output results due to status.
"""
if job.status not in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]:
if not job.success:
links = job.links(container=container)
headers = [("Link", make_link_header(link)) for link in links]
if job.status == Status.FAILED:
Expand Down
37 changes: 36 additions & 1 deletion weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,30 @@
]
PROVIDER_DESCRIPTION_FIELD_AFTER = ["links"]

JOB_STATUS_FIELD_FIRST = ["jobID", "processID", "providerID"]
JOB_STATUS_FIELD_AFTER = [
"jobID",
"processID",
"providerID",
"type",
"status",
"message",
"created",
"started",
"finished",
"updated",
"duration",
"runningDuration",
"runningSeconds",
"expirationDate",
"estimatedCompletion",
"nextPoll",
"percentCompleted",
"progress",
"process",
"links",
]

JOBS_LISTING_FIELD_FIRST = ["description", "jobs", "groups"]
JOBS_LISTING_FIELD_AFTER = ["page", "limit", "count", "total", "links"]

Expand Down Expand Up @@ -3853,8 +3877,18 @@ def deserialize(self, cstruct):
return cstruct


class JobProcess(AnyOfKeywordSchema):
_any_of = [
ReferenceURL(),
PermissiveMappingSchema(),
]


class JobStatusInfo(ExtendedMappingSchema):
_schema = OGC_API_SCHEMA_JOB_STATUS_URL
_sort_first = JOB_STATUS_FIELD_FIRST
_sort_after = JOB_STATUS_FIELD_AFTER

jobID = JobID()
processID = ProcessIdentifierTag(missing=None, default=None,
description="Process identifier corresponding to the job execution.")
Expand Down Expand Up @@ -3889,6 +3923,7 @@ class JobStatusInfo(ExtendedMappingSchema):
description="Completion percentage of the job as indicated by the process.")
progress = ExtendedSchemaNode(Integer(), example=100, validator=Range(0, 100),
description="Completion progress of the job (alias to 'percentCompleted').")
process = JobProcess(missing=drop, description="Representation or reference of the underlying job process.")
links = LinkList(missing=drop)


Expand All @@ -3911,7 +3946,7 @@ class CreatedJobStatusSchema(DescriptionSchema):
processID = ProcessIdentifierTag(description="Identifier of the process that will be executed.")
providerID = AnyIdentifier(description="Remote provider identifier if applicable.", missing=drop)
status = ExtendedSchemaNode(String(), example=Status.ACCEPTED)
location = ExtendedSchemaNode(String(), example="http://{host}/weaver/processes/{my-process-id}/jobs/{my-job-id}")
location = ExtendedSchemaNode(String(), example="https://{host}/weaver/processes/{my-process-id}/jobs/{my-job-id}")


class PagingBodySchema(ExtendedMappingSchema):
Expand Down
11 changes: 6 additions & 5 deletions weaver/wps_restapi/templates/responses/head.mako
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
<!--
Requirements for rendering JSON contents.
-->
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.4.0/styles/default.min.css">
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.4.0/highlight.min.js"></script>
<script charset="UTF-8" src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.4.0/languages/json.min.js"></script>
<script charset="UTF-8" src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.4.0/languages/yaml.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.11.1/styles/stackoverflow-light.min.css">
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.11.1/highlight.min.js"></script>
<script charset="UTF-8" src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.11.1/languages/accesslog.min.js"></script>
<script charset="UTF-8" src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.11.1/languages/json.min.js"></script>
<script charset="UTF-8" src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.11.1/languages/yaml.min.js"></script>
<script>hljs.highlightAll();</script>
</head>
</%block>
Loading
Loading