Skip to content

Commit

Permalink
fix sync for map task in dynamic
Browse files Browse the repository at this point in the history
Signed-off-by: Troy Chiu <[email protected]>
  • Loading branch information
troychiu committed Feb 19, 2025
1 parent 66d4aed commit 8ba253c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
49 changes: 45 additions & 4 deletions flytekit/remote/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,33 @@ def promote_from_model(cls, model: _workflow_model.GateNode):


class FlyteArrayNode(_workflow_model.ArrayNode):
def __init__(self, node, parallelism, min_successes, min_success_ratio, flyte_task_node=None, flyte_workflow_node=None):
super().__init__(node, parallelism, min_successes, min_success_ratio)
self._flyte_task_node = flyte_task_node
self._flyte_workflow_node = flyte_workflow_node

Check warning on line 354 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L352-L354

Added lines #L352 - L354 were not covered by tests

@property
def flyte_task_node(self):
return self._flyte_task_node

Check warning on line 358 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L358

Added line #L358 was not covered by tests

@property
def flyte_workflow_node(self):
return self._flyte_workflow_node

Check warning on line 362 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L362

Added line #L362 was not covered by tests

@classmethod
def promote_from_model(cls, model: _workflow_model.ArrayNode):
def promote_from_model(
cls,
model: _workflow_model.ArrayNode,
flyte_task_node: Optional[FlyteTaskNode] = None,
flyte_workflow_node: Optional[FlyteWorkflowNode] = None,
):
return cls(
node=model._node,
parallelism=model._parallelism,
min_successes=model._min_successes,
min_success_ratio=model._min_success_ratio,
flyte_task_node=flyte_task_node,
flyte_workflow_node=flyte_workflow_node,
)


Expand Down Expand Up @@ -406,7 +426,7 @@ def task_node(self) -> Optional[FlyteTaskNode]:
return self._flyte_task_node

@property
def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode]:
def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode, FlyteArrayNode]:
return self._flyte_entity

@classmethod
Expand Down Expand Up @@ -477,8 +497,29 @@ def promote_from_model(
elif model.gate_node is not None:
flyte_gate_node = FlyteGateNode.promote_from_model(model.gate_node)
elif model.array_node is not None:
flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node)
# TODO: validate task in tasks
# map over task
if model.array_node.node.task_node is not None:
if model.array_node.node.task_node.reference_id not in tasks:
raise RuntimeError(

Check warning on line 503 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L503

Added line #L503 was not covered by tests
f"Remote Workflow closure does not have task with id {model.array_node.node.task_node.reference_id}."
)
flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node, flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id]))

Check warning on line 506 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L506

Added line #L506 was not covered by tests
# map over launch plan
elif model.array_node.node.workflow_node is not None:
workflow_node, converted_sub_workflows = cls._promote_workflow_node(

Check warning on line 509 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L509

Added line #L509 was not covered by tests
model.array_node.node.workflow_node,
sub_workflows,
node_launch_plans,
tasks,
converted_sub_workflows,
)
flyte_array_node = FlyteArrayNode.promote_from_model(

Check warning on line 516 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L516

Added line #L516 was not covered by tests
model.array_node, flyte_workflow_node=workflow_node
)
else:
raise _system_exceptions.FlyteSystemException(

Check warning on line 520 in flytekit/remote/entities.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/entities.py#L520

Added line #L520 was not covered by tests
"Array node must have either task or workflow node specified"
)
else:
raise _system_exceptions.FlyteSystemException(
f"Bad Node model, neither task nor workflow detected, node: {model}"
Expand Down
5 changes: 2 additions & 3 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2615,10 +2615,9 @@ def sync_node_execution(
if execution._node.array_node is None:
logger.error("Array node not found")
return execution
# if there's a task node underneath the array node, let's fetch the interface for it
# if there's a task node underneath the array node
if execution._node.array_node.node.task_node is not None:
tid = execution._node.array_node.node.task_node.reference_id
t = self.fetch_task(tid.project, tid.domain, tid.name, tid.version)
t = execution._node.flyte_entity.flyte_task_node.flyte_task

Check warning on line 2620 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L2620

Added line #L2620 was not covered by tests
execution._task_executions = [
self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t)
for task_execution in iterate_task_executions(self.client, execution.id)
Expand Down

0 comments on commit 8ba253c

Please sign in to comment.