Skip to content

Commit

Permalink
iterable node executions
Browse files Browse the repository at this point in the history
Signed-off-by: Troy Chiu <[email protected]>
  • Loading branch information
troychiu committed Feb 27, 2025
1 parent 43a940f commit 4c9c435
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
13 changes: 13 additions & 0 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from flyteidl.core import tasks_pb2 as _core_task

from flytekit.configuration import SerializationSettings
from flytekit.core import constants
from flytekit.core.pod_template import PodTemplate
from flytekit.loggers import logger

Expand Down Expand Up @@ -408,3 +409,15 @@ def str2bool(value: typing.Optional[str]) -> bool:
if value is None:
return False
return value.lower() in ("true", "t", "1")


def is_start_node(name: str) -> bool:
return constants.START_NODE_ID in name


def is_end_node(name: str) -> bool:
return constants.END_NODE_ID in name


def is_start_or_end_node(name: str) -> bool:
return is_start_node(name) or is_end_node(name)
34 changes: 32 additions & 2 deletions flytekit/remote/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import timedelta
from typing import Dict, List, Optional, Union

from flytekit.core import utils
from flytekit.core.type_engine import LiteralsResolver
from flytekit.exceptions import user as user_exceptions
from flytekit.models import execution as execution_models
Expand Down Expand Up @@ -105,7 +106,7 @@ def __init__(
**kwargs,
):
super(FlyteWorkflowExecution, self).__init__(*args, **kwargs)
self._node_executions = None
self._node_executions: Optional[Dict[str, FlyteNodeExecution]] = None
self._flyte_workflow: Optional[FlyteWorkflow] = None
self._remote = remote
self._type_hints = type_hints
Expand All @@ -119,6 +120,18 @@ def node_executions(self) -> Dict[str, FlyteNodeExecution]:
"""Get a dictionary of node executions that are a part of this workflow execution."""
return self._node_executions or {}

@property
def node_executions_list(self, contains_start_and_end_node=False) -> List[FlyteNodeExecution]:
if self._node_executions is None:
return []
node_executions_list = (
self._node_executions.values()
if contains_start_and_end_node
else list(filter(lambda x: not utils.is_start_or_end_node(x.id.node_id), self._node_executions.values()))
)
node_executions_list.sort(key=lambda x: x.id.node_id)
return node_executions_list

@property
def error(self) -> core_execution_models.ExecutionError:
"""
Expand Down Expand Up @@ -219,10 +232,27 @@ def __init__(self, *args, **kwargs):
super(FlyteNodeExecution, self).__init__(*args, **kwargs)
self._task_executions = None
self._workflow_executions = []
self._underlying_node_executions = None
self._underlying_node_executions: typing.Optional[List[FlyteNodeExecution]] = None
self._interface: typing.Optional[TypedInterface] = None
self._flyte_node = None

def __iter__(self):
self.idx = 0
if self._underlying_node_executions:
self._underlying_node_executions.sort(key=lambda x: x.id.node_id)
return self

def __next__(self) -> FlyteNodeExecution:
if self._underlying_node_executions is None:
raise StopIteration
while self.idx < len(self._underlying_node_executions):
x = self._underlying_node_executions[self.idx]
self.idx += 1
if utils.is_start_or_end_node(x.id.node_id):
continue
return x
raise StopIteration

@property
def task_executions(self) -> List[FlyteTaskExecution]:
return self._task_executions or []
Expand Down

0 comments on commit 4c9c435

Please sign in to comment.