Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor committed Feb 22, 2025
1 parent a7e1bdf commit 8f4cd24
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def get_as_workflow(self):
from flytekit.core.workflow import ImperativeWorkflow

cleanup = EagerFailureHandlerTask(name=f"{self.name}-cleanup", inputs=self.python_interface.inputs)
cleanup._container_image = self._container_image
wb = ImperativeWorkflow(name=self.name)

input_kwargs = {}
Expand All @@ -663,12 +664,6 @@ def get_as_workflow(self):

wb.add_on_failure_handler(cleanup)
return wb
# wb.add_workflow_input("in1", str)
#
# node = wb.add_entity(self, a=wb.inputs["in1"])
# wb.add_entity(t2)
# # This is analogous to a return statement
# wb.add_workflow_output("from_n0t1", node.outputs["o0"])


class EagerFailureTaskResolver(TaskResolverMixin):
Expand Down Expand Up @@ -717,7 +712,14 @@ def __init__(self, name: str, inputs: typing.Dict[str, typing.Type] = None, **kw
)

def dispatch_execute(self, ctx: FlyteContext, input_literal_map: LiteralMap) -> LiteralMap:
"""
This task should only be called during remote execution. Because when rehydrating this task at execution
time, we don't have access to the python interface of the corresponding eager task/workflow, we don't
have the Python types to convert the input literal map, but nor do we need them.
This task is responsible only for ensuring that all executions are terminated.
"""
# Recursive imports
print("here! 1", flush=True)
from flytekit import current_context
from flytekit.configuration.plugin import get_plugin

Expand All @@ -727,11 +729,12 @@ def dispatch_execute(self, ctx: FlyteContext, input_literal_map: LiteralMap) ->
domain = current_exec_id.domain
name = current_exec_id.name
logger.info(f"Cleaning up potentially still running tasks for execution {name} in {project}/{domain}")

print("here! 2", flush=True)
remote = get_plugin().get_remote(config=None, project=project, domain=domain)
key_filter = ValueIn("execution_tag.key", ["eager-exec"])
value_filter = ValueIn("execution_tag.value", [name])
phase_filter = ValueNotIn("phase", ["ABORTED", "SUCCEEDED", "FAILED", "TIMED_OUT"])
print("here! 3", flush=True)
# This should be made more robust, currently lacking retries and exception handling
while True:
exec_models, _ = remote.client.list_executions_paginated(
Expand All @@ -743,10 +746,10 @@ def dispatch_execute(self, ctx: FlyteContext, input_literal_map: LiteralMap) ->
)
if not exec_models:
break
print(exec_models)
print(exec_models, flush=True)
for exec_model in exec_models:
logger.info(f"Terminating execution {exec_model.id}, phase {exec_model.closure.phase}")
remote.client.terminate_execution(exec_model.id)
remote.client.terminate_execution(exec_model.id, f"clean up by parent eager execution {name}")
time.sleep(0.5)

# Just echo back
Expand Down

0 comments on commit 8f4cd24

Please sign in to comment.