Skip to content

Commit

Permalink
remove debugging, add metaclass
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor committed Feb 22, 2025
1 parent 8f4cd24 commit da401fc
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 17 deletions.
1 change: 0 additions & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ def run_remote(
with p:
p.update(t, visible=True)
p.start_task(t)
print(run_level_params.tags)
execution = remote.execute(
entity,
inputs=inputs,
Expand Down
20 changes: 9 additions & 11 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@
from flytekit.models import task as task_models
from flytekit.models.admin import common as admin_common_models
from flytekit.models.admin import workflow as admin_workflow_models
from flytekit.models.filters import ValueIn, ValueNotIn
from flytekit.models.filters import ValueIn
from flytekit.models.literals import LiteralMap
from flytekit.models.security import Secret
from flytekit.utils.asyn import loop_manager

if typing.TYPE_CHECKING:
pass

T = TypeVar("T")


Expand Down Expand Up @@ -650,6 +647,8 @@ def get_as_workflow(self):
from flytekit.core.workflow import ImperativeWorkflow

cleanup = EagerFailureHandlerTask(name=f"{self.name}-cleanup", inputs=self.python_interface.inputs)
# todo: remove this before merging
# this is actually bad, but useful for developing
cleanup._container_image = self._container_image
wb = ImperativeWorkflow(name=self.name)

Expand Down Expand Up @@ -696,11 +695,12 @@ def get_all_tasks(self) -> List[Task]:
eager_failure_task_resolver = EagerFailureTaskResolver()


class EagerFailureHandlerTask(PythonAutoContainerTask):
class EagerFailureHandlerTask(PythonAutoContainerTask, metaclass=FlyteTrackedABC):
_TASK_TYPE = "eager_failure_handler_task"

def __init__(self, name: str, inputs: typing.Dict[str, typing.Type] = None, **kwargs):
def __init__(self, name: str, inputs: typing.Optional[typing.Dict[str, typing.Type]] = None, **kwargs):
""" """
inputs = inputs or {}
super().__init__(
task_type=self._TASK_TYPE,
name=name,
Expand All @@ -719,7 +719,6 @@ def dispatch_execute(self, ctx: FlyteContext, input_literal_map: LiteralMap) ->
This task is responsible only for ensuring that all executions are terminated.
"""
# Recursive imports
print("here! 1", flush=True)
from flytekit import current_context
from flytekit.configuration.plugin import get_plugin

Expand All @@ -729,12 +728,10 @@ def dispatch_execute(self, ctx: FlyteContext, input_literal_map: LiteralMap) ->
domain = current_exec_id.domain
name = current_exec_id.name
logger.info(f"Cleaning up potentially still running tasks for execution {name} in {project}/{domain}")
print("here! 2", flush=True)
remote = get_plugin().get_remote(config=None, project=project, domain=domain)
key_filter = ValueIn("execution_tag.key", ["eager-exec"])
value_filter = ValueIn("execution_tag.value", [name])
phase_filter = ValueNotIn("phase", ["ABORTED", "SUCCEEDED", "FAILED", "TIMED_OUT"])
print("here! 3", flush=True)
phase_filter = ValueIn("phase", ["UNDEFINED", "QUEUED", "RUNNING"])
# This should be made more robust, currently lacking retries and exception handling
while True:
exec_models, _ = remote.client.list_executions_paginated(
Expand All @@ -744,9 +741,10 @@ def dispatch_execute(self, ctx: FlyteContext, input_literal_map: LiteralMap) ->
filters=[key_filter, value_filter, phase_filter],
sort_by=most_recent,
)
logger.info(f"Found {len(exec_models)} executions this round for termination")
if not exec_models:
break
print(exec_models, flush=True)
logger.info(exec_models)
for exec_model in exec_models:
logger.info(f"Terminating execution {exec_model.id}, phase {exec_model.closure.phase}")
remote.client.terminate_execution(exec_model.id, f"clean up by parent eager execution {name}")
Expand Down
1 change: 0 additions & 1 deletion flytekit/core/worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ def launch_execution(self, wi: WorkItem, idx: int) -> FlyteWorkflowExecution:
l = self.get_labels()
e = self.get_env()
options = Options(labels=l)
logger.warning(f"Options {options}")
exec_name = self.get_execution_name(wi.entity, idx, wi.input_kwargs)
logger.info(f"Generated execution name {exec_name} for {idx}th call of {wi.entity.name}")
from flytekit.remote.remote_callable import RemoteEntity
Expand Down
2 changes: 0 additions & 2 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,6 @@ def _execute(
# and domain, which is specified in the first two arguments of client.create_execution. This is useful
# in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a
# different project "B". For now, this method doesn't support this use case.
print(f"In _execute options {options}")

exec_id = self.client.create_execution(
project or self.default_project,
Expand Down Expand Up @@ -1700,7 +1699,6 @@ def execute(
The ``name`` and ``version`` arguments do not apply to ``FlyteTask``, ``FlyteLaunchPlan``, and
``FlyteWorkflow`` entity inputs. These values are determined by referencing the entity identifier values.
"""
print(f"remote.execute Options: {options}")
if entity.python_interface:
type_hints = type_hints or entity.python_interface.inputs
if isinstance(entity, FlyteTask) or isinstance(entity, FlyteLaunchPlan):
Expand Down
2 changes: 0 additions & 2 deletions flytekit/tools/serialize_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ def get_registrable_entities(
wf = entity.get_as_workflow()
lp = LaunchPlan.get_default_launch_plan(ctx, wf)
get_serializable(new_api_serializable_entities, ctx.serialization_settings, lp, options)
print("EagerAsyncPythonFunctionTask")
print(wf)

new_api_model_values = list(new_api_serializable_entities.values())
entities_to_be_serialized = list(filter(_should_register_with_admin, new_api_model_values))
Expand Down

0 comments on commit da401fc

Please sign in to comment.