Skip to content

Commit

Permalink
Use flyteremote to access subnodes information of array nodes (#3152)
Browse files Browse the repository at this point in the history
* fix sync for map task in dynamic

Signed-off-by: Troy Chiu <[email protected]>

* remove unnecessary

Signed-off-by: Troy Chiu <[email protected]>

* remove raise error

Signed-off-by: Troy Chiu <[email protected]>

* add integration tests

Signed-off-by: Troy Chiu <[email protected]>

* lint

Signed-off-by: Troy Chiu <[email protected]>

* type hint

Signed-off-by: Troy Chiu <[email protected]>

* wip

Signed-off-by: Troy Chiu <[email protected]>

* use flyte node

Signed-off-by: Troy Chiu <[email protected]>

* lint

Signed-off-by: Troy Chiu <[email protected]>

* wip

Signed-off-by: Troy Chiu <[email protected]>

* wip

Signed-off-by: Troy Chiu <[email protected]>

* use interface to list

Signed-off-by: Troy Chiu <[email protected]>

* add tests

Signed-off-by: Troy Chiu <[email protected]>

* lint

Signed-off-by: Troy Chiu <[email protected]>

* nit

Signed-off-by: Troy Chiu <[email protected]>

* expose external resources

Signed-off-by: Troy Chiu <[email protected]>

* lint

Signed-off-by: Troy Chiu <[email protected]>

* add tests

Signed-off-by: Troy Chiu <[email protected]>

---------

Signed-off-by: Troy Chiu <[email protected]>
  • Loading branch information
troychiu authored Feb 24, 2025
1 parent 5a17d74 commit 9ddc8d7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
13 changes: 13 additions & 0 deletions flytekit/models/admin/task_execution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from flyteidl.admin import task_execution_pb2 as _task_execution_pb2

from flytekit.models import common as _common
from flytekit.models import event as _event
from flytekit.models.core import execution as _execution
from flytekit.models.core import identifier as _identifier

Expand All @@ -16,6 +17,7 @@ def __init__(
updated_at,
output_uri=None,
error=None,
metadata=None,
):
"""
:param int phase: Enum value from flytekit.models.core.execution.TaskExecutionPhase
Expand All @@ -28,6 +30,7 @@ def __init__(
literals.
:param flytekit.models.core.execution.ExecutionError error: If task has failed and in terminal state, this will
be set to the error encountered.
:param flytekit.models.event.TaskExecutionMetadata metadata: Metadata associated with the task execution.
"""
self._phase = phase
self._logs = logs
Expand All @@ -37,6 +40,7 @@ def __init__(
self._updated_at = updated_at
self._output_uri = output_uri
self._error = error
self._metadata = metadata

@property
def phase(self):
Expand Down Expand Up @@ -95,6 +99,13 @@ def error(self):
"""
return self._error

@property
def metadata(self):
"""
:rtype: flytekit.models.event.TaskExecutionMetadata
"""
return self._metadata

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.task_execution_pb2.TaskExecutionClosure
Expand All @@ -104,6 +115,7 @@ def to_flyte_idl(self):
logs=[l.to_flyte_idl() for l in self.logs],
output_uri=self.output_uri,
error=self.error.to_flyte_idl() if self.error is not None else None,
metadata=self.metadata.to_flyte_idl() if self.metadata is not None else None,
)
p.started_at.FromDatetime(self.started_at)
p.created_at.FromDatetime(self.created_at)
Expand All @@ -126,6 +138,7 @@ def from_flyte_idl(cls, p):
created_at=p.created_at.ToDatetime(),
updated_at=p.updated_at.ToDatetime(),
duration=p.duration.ToTimedelta(),
metadata=_event.TaskExecutionMetadata.from_flyte_idl(p.metadata) if p.HasField("metadata") else None,
)


Expand Down
37 changes: 37 additions & 0 deletions flytekit/models/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from flyteidl.event import event_pb2 as _event_pb2

from flytekit.models import common as _common


class TaskExecutionMetadata(_common.FlyteIdlEntity):
"""
:param google.protobuf.internal.containers.RepeatedCompositeFieldContainer external_resources:
"""

def __init__(self, external_resources=None):
self._external_resources = external_resources

@property
def external_resources(self):
"""
:rtype: google.protobuf.internal.containers.RepeatedCompositeFieldContainer
"""
return self._external_resources

def to_flyte_idl(self):
"""
:rtype: flyteidl.event.TaskExecutionMetadata
"""
return _event_pb2.TaskExecutionMetadata(
external_resources=self._external_resources,
)

@classmethod
def from_flyte_idl(cls, proto):
"""
:param flyteidl.event.event_pb2.TaskExecutionMetadata proto:
:rtype: TaskExecutionMetadata
"""
return cls(
external_resources=proto.external_resources,
)
6 changes: 6 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ def test_execute_workflow_with_maptask(register):
)
assert execution.outputs["o0"] == [4, 5, 6]
assert len(execution.node_executions["n0"].task_executions) == 1
assert len(execution.node_executions["n0"].task_executions[0].closure.metadata.external_resources) == len(d)
for i in range(len(d)):
assert execution.node_executions["n0"].task_executions[0].closure.metadata.external_resources[i].phase == 3 # SUCCEEDED

def test_execution_workflow_with_maptask_in_dynamic(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
Expand All @@ -680,6 +683,9 @@ def test_execution_workflow_with_maptask_in_dynamic(register):
assert execution.node_executions["n0"].subworkflow_node_executions is not None
assert "n0-0-dn0" in execution.node_executions["n0"].subworkflow_node_executions
assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions) == 1
assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions[0].closure.metadata.external_resources) == len(d)
for i in range(len(d)):
assert execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions[0].closure.metadata.external_resources[i].phase == 3 # SUCCEEDED


def test_executes_nested_workflow_dictating_interruptible(register):
Expand Down

0 comments on commit 9ddc8d7

Please sign in to comment.