Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill tasks during job prep #6535

Open
wants to merge 8 commits into
base: 8.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6535.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure tasks can be killed while in the preparing state.
70 changes: 48 additions & 22 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections import deque
from contextlib import suppress
import itertools
import logging
import os
from pathlib import Path
from queue import (
Expand Down Expand Up @@ -82,6 +83,7 @@
FLOW_NONE,
FlowMgr,
repr_flow_nums,
stringify_flow_nums,
)
from cylc.flow.host_select import (
HostSelectException,
Expand Down Expand Up @@ -1078,18 +1080,21 @@ def kill_tasks(
to_kill: List[TaskProxy] = []
unkillable: List[TaskProxy] = []
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_state(itask)
if not itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
unkillable.append(itask)
continue
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_state(itask)
if itask.state(TASK_STATUS_PREPARING):
self.task_job_mgr.kill_prep_task(itask)
else:
to_kill.append(itask)
if jobless:
# Directly set failed in sim mode:
self.task_events_mgr.process_message(
itask, 'CRITICAL', TASK_STATUS_FAILED,
flag=self.task_events_mgr.FLAG_RECEIVED
)
else:
unkillable.append(itask)
if warn and unkillable:
LOG.warning(
"Tasks not killable: "
Expand Down Expand Up @@ -1250,6 +1255,7 @@ def get_contact_data(self) -> Dict[str, str]:
"""
fields = workflow_files.ContactFileFields
proc = psutil.Process()
platform = get_platform()
# fmt: off
return {
fields.API:
Expand All @@ -1275,11 +1281,11 @@ def get_contact_data(self) -> Dict[str, str]:
fields.VERSION:
CYLC_VERSION,
fields.SCHEDULER_SSH_COMMAND:
str(get_platform()['ssh command']),
str(platform['ssh command']),
fields.SCHEDULER_CYLC_PATH:
str(get_platform()['cylc path']),
str(platform['cylc path']),
fields.SCHEDULER_USE_LOGIN_SHELL:
str(get_platform()['use login shell'])
str(platform['use login shell'])
}
# fmt: on

Expand Down Expand Up @@ -1531,29 +1537,49 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())

log = LOG.debug
submitted = self.submit_task_jobs(itasks)
if not submitted:
return False

log_lvl = logging.DEBUG
if self.options.reftest or self.options.genref:
log = LOG.info
log_lvl = logging.INFO

for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
run_mode=self.get_run_mode()
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
else:
flow = FLOW_NONE
log(
for itask in submitted:
flow = stringify_flow_nums(itask.flow_nums) or FLOW_NONE
LOG.log(
log_lvl,
f"{itask.identity} -triggered off "
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
)

# one or more tasks were passed through the submission pipeline
return True

def submit_task_jobs(
self, itasks: 'Iterable[TaskProxy]'
) -> 'List[TaskProxy]':
"""Prepare for job submission and submit task jobs.

Return: tasks that attempted submission.
"""
# submit "simulation/skip" mode tasks, modify "dummy" task configs:
itasks, submitted_nonlive_tasks = (
self.task_job_mgr.submit_nonlive_task_jobs(
self.workflow, itasks, self.get_run_mode()
)
)

# submit "live" mode tasks (and "dummy" mode tasks)
submitted_live_tasks = self.task_job_mgr.submit_livelike_task_jobs(
self.workflow,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
)
Comment on lines +1567 to +1579
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of moving this task submission logic into the scheduler.

I'm guessing you did this to avoid having to populate the extra args in integration tests, fair enough.

Maybe just leave a shim in scheduler.py to do this, would rather keep submission logic in one place, Cylc logic has a habit of sprawling.


return submitted_nonlive_tasks + submitted_live_tasks

def process_workflow_db_queue(self):
"""Update workflow DB."""
self.workflow_db_mgr.process_queued_ops()
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/subprocctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from shlex import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union


from cylc.flow.wallclock import get_current_time_string

if TYPE_CHECKING:
Expand Down Expand Up @@ -137,6 +138,9 @@
'mesg': mesg}
return ret.rstrip()

def __repr__(self) -> str:
return f"<{type(self).__name__} {self.cmd_key}>"

Check warning on line 142 in cylc/flow/subprocctx.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/subprocctx.py#L142

Added line #L142 was not covered by tests


class SubFuncContext(SubProcContext):
"""Represent the context of a Python function to run as a subprocess.
Expand Down
84 changes: 45 additions & 39 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@


if TYPE_CHECKING:
# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
from typing_extensions import Literal

from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager

Expand Down Expand Up @@ -156,10 +161,10 @@ class TaskJobManager:

def __init__(self, workflow, proc_pool, workflow_db_mgr,
task_events_mgr, data_store_mgr, bad_hosts):
self.workflow = workflow
self.workflow: str = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
self.task_events_mgr = task_events_mgr
self.task_events_mgr: TaskEventsManager = task_events_mgr
self.data_store_mgr = data_store_mgr
self.job_file_writer = JobFileWriter()
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
Expand Down Expand Up @@ -196,6 +201,15 @@ def kill_task_jobs(
self._kill_task_jobs_callback_255
)

def kill_prep_task(self, itask: 'TaskProxy') -> None:
"""Kill a preparing task."""
itask.waiting_on_job_prep = False
itask.local_job_file_path = None # reset for retry
self._set_retry_timers(itask)
self._prep_submit_task_job_error(
self.workflow, itask, '(killed in job prep)', ''
)
Comment on lines +204 to +211
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking around at other _prep_submit_task_job_error calls, we might want to consider adding:

itask.summary['platforms_used'][itask.submit_num] = ''


def poll_task_jobs(self, workflow, itasks, msg=None):
"""Poll jobs of specified tasks.

Expand All @@ -220,14 +234,19 @@ def poll_task_jobs(self, workflow, itasks, msg=None):
self._poll_task_jobs_callback_255
)

def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
def prep_submit_task_jobs(
self,
workflow: str,
itasks: 'Iterable[TaskProxy]',
check_syntax: bool = True,
) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
"""Prepare task jobs for submit.

Prepare tasks where possible. Ignore tasks that are waiting for host
select command to complete. Bad host select command or error writing to
a job file will cause a bad task - leading to submission failure.

Return [list, list]: list of good tasks, list of bad tasks
Return (good_tasks, bad_tasks)
"""
prepared_tasks = []
bad_tasks = []
Expand All @@ -244,17 +263,12 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
prepared_tasks.append(itask)
elif prep_task is False:
bad_tasks.append(itask)
return [prepared_tasks, bad_tasks]
return (prepared_tasks, bad_tasks)

def submit_task_jobs(
self,
workflow,
itasks,
curve_auth,
client_pub_key_dir,
run_mode: RunMode = RunMode.LIVE,
):
"""Prepare for job submission and submit task jobs.
def submit_livelike_task_jobs(
self, workflow, itasks, curve_auth, client_pub_key_dir
) -> 'List[TaskProxy]':
"""Submission for live tasks and dummy tasks.

Preparation (host selection, remote host init, and remote install)
is done asynchronously. Newly released tasks may be sent here several
Expand All @@ -264,24 +278,9 @@ def submit_task_jobs(
Once preparation has completed or failed, reset .waiting_on_job_prep in
task instances so the scheduler knows to stop sending them back here.

This method uses prep_submit_task_job() as helper.
This method uses prep_submit_task_jobs() as helper.

Return (list): list of tasks that attempted submission.
"""
# submit "simulation/skip" mode tasks, modify "dummy" task configs:
itasks, submitted_nonlive_tasks = self.submit_nonlive_task_jobs(
workflow, itasks, run_mode)

# submit "live" mode tasks (and "dummy" mode tasks)
submitted_live_tasks = self.submit_livelike_task_jobs(
workflow, itasks, curve_auth, client_pub_key_dir)

return submitted_nonlive_tasks + submitted_live_tasks

def submit_livelike_task_jobs(
self, workflow, itasks, curve_auth, client_pub_key_dir
) -> 'List[TaskProxy]':
"""Submission for live tasks and dummy tasks.
Return: tasks that attempted submission.
"""
done_tasks: 'List[TaskProxy]' = []
# Mapping of platforms to task proxies:
Expand Down Expand Up @@ -327,7 +326,7 @@ def submit_livelike_task_jobs(
bc_mgr = self.task_events_mgr.broadcast_mgr
rtconf = bc_mgr.get_updated_rtconfig(itask)
try:
platform = get_platform(
platform = get_platform( # type: ignore[assignment]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #6564

rtconf,
bad_hosts=self.bad_hosts
)
Expand Down Expand Up @@ -1029,7 +1028,7 @@ def _set_retry_timers(
def submit_nonlive_task_jobs(
self: 'TaskJobManager',
workflow: str,
itasks: 'List[TaskProxy]',
itasks: 'Iterable[TaskProxy]',
workflow_run_mode: RunMode,
) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
"""Identify task mode and carry out alternative submission
Expand Down Expand Up @@ -1152,7 +1151,7 @@ def _prep_submit_task_job(
workflow: str,
itask: 'TaskProxy',
check_syntax: bool = True
):
) -> 'Union[TaskProxy, None, Literal[False]]':
"""Prepare a task job submission.

Returns:
Expand Down Expand Up @@ -1217,7 +1216,7 @@ def _prep_submit_task_job(
else:
# host/platform select not ready
if host_n is None and platform_name is None:
return
return None
elif (
host_n is None
and rtconfig['platform']
Expand Down Expand Up @@ -1259,7 +1258,7 @@ def _prep_submit_task_job(
workflow, itask, '(platform not defined)', exc)
return False
else:
itask.platform = platform
itask.platform = platform # type: ignore[assignment]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #6564

# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)

Expand Down Expand Up @@ -1292,7 +1291,13 @@ def _prep_submit_task_job(
itask.local_job_file_path = local_job_file_path
return itask

def _prep_submit_task_job_error(self, workflow, itask, action, exc):
def _prep_submit_task_job_error(
self,
workflow: str,
itask: 'TaskProxy',
action: str,
exc: Union[Exception, str],
) -> None:
"""Helper for self._prep_submit_task_job. On error."""
log_task_job_activity(
SubProcContext(self.JOBS_SUBMIT, action, err=exc, ret_code=1),
Expand All @@ -1306,11 +1311,12 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
# than submit-failed
# provide a dummy job config - this info will be added to the data
# store
try_num = itask.get_try_num()
itask.jobs.append({
'task_id': itask.identity,
'platform': itask.platform,
'submit_num': itask.submit_num,
'try_num': itask.get_try_num(),
'try_num': try_num,
})
# create a DB entry for the submit-failed job
self.workflow_db_mgr.put_insert_task_jobs(
Expand All @@ -1319,7 +1325,7 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
'flow_nums': serialise_set(itask.flow_nums),
'job_id': itask.summary.get('submit_method_id'),
'is_manual_submit': itask.is_manual_submit,
'try_num': itask.get_try_num(),
'try_num': try_num,
'time_submit': get_current_time_string(),
'platform_name': itask.platform['name'],
'job_runner_name': itask.summary['job_runner_name'],
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class TaskProxy:
graph children: {msg: [(name, point), ...]}
.flow_nums:
flows I belong to (if empty, belongs to 'none' flow)
flow_wait:
.flow_wait:
wait for flow merge before spawning children
.waiting_on_job_prep:
True whilst task is awaiting job prep, reset to False once the
Expand Down Expand Up @@ -316,7 +316,7 @@ def __init__(
)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} {self.identity}>"
return f"<{type(self).__name__} {self.identity} {self.state}>"

def __str__(self) -> str:
"""Stringify with tokens, state, submit_num, and flow_nums.
Expand Down
12 changes: 0 additions & 12 deletions tests/functional/cylc-kill/03-simulation/flow.cylc

This file was deleted.

Loading
Loading