-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eager cleanup #3148
Open
wild-endeavor
wants to merge
17
commits into
master
Choose a base branch
from
eager-cleanup
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Eager cleanup #3148
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
f31e823
testing
wild-endeavor 5770a5e
add failure handler
wild-endeavor 943ca5c
nit
wild-endeavor be37cd5
comments and unit test, add failure node name
wild-endeavor b6dbc73
debug
wild-endeavor 3937e53
debug worker
wild-endeavor 54a56e5
more print
wild-endeavor 367a393
add valuenotin filter, add snippet to cancel tasks
wild-endeavor a7e1bdf
add name
wild-endeavor 8f4cd24
debug
wild-endeavor da401fc
remove debugging, add metaclass
wild-endeavor d5be81d
change to warning
wild-endeavor 9d221fc
nit
wild-endeavor f12b1f4
fix test
wild-endeavor cdb5189
nit
wild-endeavor 6ac8010
change test
wild-endeavor 3db4249
use constant and fix test
wild-endeavor File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ | |
import inspect | ||
import os | ||
import signal | ||
import time | ||
import typing | ||
from abc import ABC | ||
from collections import OrderedDict | ||
from contextlib import suppress | ||
|
@@ -32,7 +34,7 @@ | |
from flytekit.core.constants import EAGER_ROOT_ENV_NAME | ||
from flytekit.core.context_manager import ExecutionState, FlyteContext, FlyteContextManager | ||
from flytekit.core.docstring import Docstring | ||
from flytekit.core.interface import transform_function_to_interface | ||
from flytekit.core.interface import Interface, transform_function_to_interface | ||
from flytekit.core.promise import ( | ||
Promise, | ||
VoidPromise, | ||
|
@@ -59,11 +61,17 @@ | |
from flytekit.models import dynamic_job as _dynamic_job | ||
from flytekit.models import literals as _literal_models | ||
from flytekit.models import task as task_models | ||
from flytekit.models.admin import common as admin_common_models | ||
from flytekit.models.admin import workflow as admin_workflow_models | ||
from flytekit.models.filters import ValueIn | ||
from flytekit.models.literals import LiteralMap | ||
from flytekit.models.security import Secret | ||
from flytekit.utils.asyn import loop_manager | ||
|
||
T = TypeVar("T") | ||
|
||
CLEANUP_LOOP_DELAY_SECONDS = 1 | ||
|
||
|
||
class PythonInstanceTask(PythonAutoContainerTask[T], ABC): # type: ignore | ||
""" | ||
|
@@ -636,3 +644,116 @@ def run(self, remote: "FlyteRemote", ss: SerializationSettings, **kwargs): # ty | |
|
||
with FlyteContextManager.with_context(builder): | ||
return loop_manager.run_sync(self.async_execute, self, **kwargs) | ||
|
||
def get_as_workflow(self): | ||
from flytekit.core.workflow import ImperativeWorkflow | ||
|
||
cleanup = EagerFailureHandlerTask(name=f"{self.name}-cleanup", inputs=self.python_interface.inputs) | ||
# todo: remove this before merging | ||
# this is actually bad, but useful for developing | ||
cleanup._container_image = self._container_image | ||
Comment on lines
+652
to
+654
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
wb = ImperativeWorkflow(name=self.name) | ||
|
||
input_kwargs = {} | ||
for input_name, input_python_type in self.python_interface.inputs.items(): | ||
wb.add_workflow_input(input_name, input_python_type) | ||
input_kwargs[input_name] = wb.inputs[input_name] | ||
|
||
node = wb.add_entity(self, **input_kwargs) | ||
for output_name, output_python_type in self.python_interface.outputs.items(): | ||
wb.add_workflow_output(output_name, node.outputs[output_name]) | ||
|
||
wb.add_on_failure_handler(cleanup) | ||
return wb | ||
|
||
|
||
class EagerFailureTaskResolver(TaskResolverMixin): | ||
@property | ||
def location(self) -> str: | ||
return f"{EagerFailureTaskResolver.__module__}.eager_failure_task_resolver" | ||
|
||
def name(self) -> str: | ||
return "eager_failure_task_resolver" | ||
|
||
def load_task(self, loader_args: List[str]) -> Task: | ||
""" | ||
Given the set of identifier keys, should return one Python Task or raise an error if not found | ||
""" | ||
return EagerFailureHandlerTask(name="no_input_default_cleanup_task", inputs={}) | ||
|
||
def loader_args(self, settings: SerializationSettings, t: Task) -> List[str]: | ||
""" | ||
Return a list of strings that can help identify the parameter Task | ||
""" | ||
return ["eager", "failure", "handler"] | ||
|
||
def get_all_tasks(self) -> List[Task]: | ||
""" | ||
Future proof method. Just making it easy to access all tasks (Not required today as we auto register them) | ||
""" | ||
return [] | ||
|
||
|
||
eager_failure_task_resolver = EagerFailureTaskResolver() | ||
|
||
|
||
class EagerFailureHandlerTask(PythonAutoContainerTask, metaclass=FlyteTrackedABC): | ||
_TASK_TYPE = "eager_failure_handler_task" | ||
|
||
def __init__(self, name: str, inputs: typing.Optional[typing.Dict[str, typing.Type]] = None, **kwargs): | ||
""" """ | ||
inputs = inputs or {} | ||
super().__init__( | ||
task_type=self._TASK_TYPE, | ||
name=name, | ||
interface=Interface(inputs=inputs, outputs=None), | ||
task_config=None, | ||
task_resolver=eager_failure_task_resolver, | ||
secret_requests=[Secret(group="", key="EAGER_API_KEY")], # todo: remove this before merging | ||
**kwargs, | ||
) | ||
|
||
def dispatch_execute(self, ctx: FlyteContext, input_literal_map: LiteralMap) -> LiteralMap: | ||
""" | ||
This task should only be called during remote execution. Because when rehydrating this task at execution | ||
time, we don't have access to the python interface of the corresponding eager task/workflow, we don't | ||
have the Python types to convert the input literal map, but nor do we need them. | ||
This task is responsible only for ensuring that all executions are terminated. | ||
""" | ||
# Recursive imports | ||
from flytekit import current_context | ||
from flytekit.configuration.plugin import get_plugin | ||
|
||
most_recent = admin_common_models.Sort("created_at", admin_common_models.Sort.Direction.DESCENDING) | ||
current_exec_id = current_context().execution_id | ||
project = current_exec_id.project | ||
domain = current_exec_id.domain | ||
name = current_exec_id.name | ||
logger.warning(f"Cleaning up potentially still running tasks for execution {name} in {project}/{domain}") | ||
remote = get_plugin().get_remote(config=None, project=project, domain=domain) | ||
key_filter = ValueIn("execution_tag.key", ["eager-exec"]) | ||
value_filter = ValueIn("execution_tag.value", [name]) | ||
phase_filter = ValueIn("phase", ["UNDEFINED", "QUEUED", "RUNNING"]) | ||
# This should be made more robust, currently lacking retries and exception handling | ||
while True: | ||
exec_models, _ = remote.client.list_executions_paginated( | ||
project, | ||
domain, | ||
limit=100, | ||
filters=[key_filter, value_filter, phase_filter], | ||
sort_by=most_recent, | ||
) | ||
logger.warning(f"Found {len(exec_models)} executions this round for termination") | ||
eapolinario marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not exec_models: | ||
break | ||
logger.warning(exec_models) | ||
for exec_model in exec_models: | ||
logger.warning(f"Terminating execution {exec_model.id}, phase {exec_model.closure.phase}") | ||
remote.client.terminate_execution(exec_model.id, f"clean up by parent eager execution {name}") | ||
time.sleep(CLEANUP_LOOP_DELAY_SECONDS) | ||
|
||
# Just echo back | ||
return input_literal_map | ||
|
||
def execute(self, **kwargs) -> Any: | ||
raise AssertionError("this task shouldn't need to call execute") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from collections import OrderedDict | ||
|
||
import flytekit.configuration | ||
from flytekit.configuration import Image, ImageConfig | ||
from flytekit.core.python_function_task import EagerFailureHandlerTask | ||
from flytekit.tools.translator import get_serializable | ||
|
||
default_img = Image(name="default", fqn="test", tag="tag") | ||
serialization_settings = flytekit.configuration.SerializationSettings( | ||
project="project", | ||
domain="domain", | ||
version="version", | ||
env=None, | ||
image_config=ImageConfig(default_image=default_img, images=[default_img]), | ||
) | ||
|
||
|
||
def test_failure(): | ||
t = EagerFailureHandlerTask(name="tester", inputs={"a": int}) | ||
|
||
spec = get_serializable(OrderedDict(), serialization_settings, t) | ||
print(spec) | ||
|
||
assert spec.template.container.args == ['pyflyte-execute', '--inputs', '{{.input}}', '--output-prefix', '{{.outputPrefix}}', '--raw-output-data-prefix', '{{.rawOutputDataPrefix}}', '--checkpoint-path', '{{.checkpointOutputPrefix}}', '--prev-checkpoint', '{{.prevCheckpointPrefix}}', '--resolver', 'flytekit.core.python_function_task.eager_failure_task_resolver', '--', 'eager', 'failure', 'handler'] | ||
|
||
|
||
def test_loading(): | ||
from flytekit.tools.module_loader import load_object_from_module | ||
|
||
resolver = load_object_from_module("flytekit.core.python_function_task.eager_failure_task_resolver") | ||
print(resolver) | ||
t = resolver.load_task([]) | ||
assert isinstance(t, EagerFailureHandlerTask) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling potential exceptions from
run_sync
when executing coroutine functions. The current implementation may silently fail if the async execution encounters issues.Code suggestion
Code Review Run #ce446d
Should Bito avoid suggestions like this for future reviews? (Manage Rules)