Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sync for map task in dynamic #3141

Merged
merged 8 commits into from
Feb 21, 2025
Merged

Conversation

troychiu
Copy link
Member

@troychiu troychiu commented Feb 19, 2025

Why are the changes needed?

Currently if you put map task inside dynamic and sync the execution, you will get an error.

Example

from flytekit import workflow, task, map_task, dynamic

threshold = 11

@task(retries=2)
def detect_anomalies(data_point: int) -> bool:
    return data_point > threshold


@dynamic
def detect_anomalies_wrapper(data: list[int]):
    map_task(detect_anomalies)(data_point=data)

@task
def return_data(i: int) -> list[int]:
    return [x for x in range(i)]


@workflow
def wf(i: int = 13):
    data = return_data(i=i)
    detect_anomalies_wrapper(data=data)
r = FlyteRemote(config=Config.for_endpoint(endpoint=endpoint))

execs = r.recent_executions(project=project, domain=domain, limit=limit)
a = r.sync(execs[0], sync_nodes=True)
FlyteEntityNotExistException: USER:EntityNotExist: error=None, cause=<_InactiveRpcError of RPC that terminated with:
        status = StatusCode.NOT_FOUND
        details = "missing entity of type TASK with identifier project:"troy"  domain:"development"
name:"dynamic_map.map_detect_anomalies_6b3bd0353da5de6e84d7982921ead2b3-arraynode"  version:"Cp3YAs4ZP6F-RMF18m5ioA""
        debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-02-19T13:51:10.932195-08:00", grpc_status:5, grpc_message:"missing entity of type TASK
with identifier project:\"troy\"  domain:\"development\"  name:\"dynamic_map.map_detect_anomalies_6b3bd0353da5de6e84d7982921ead2b3-arraynode\"
version:\"Cp3YAs4ZP6F-RMF18m5ioA\""}"

This is because we try to fetch task but we don't register task inside a dynamic.

What changes were proposed in this pull request?

Store flyte_task_node in array_node so that we can use later

How was this patch tested?

  • Use the example above and I am able to get the correct data.
  • Integration test

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This PR fixes a critical synchronization bug in map tasks within dynamic workflows where task references were missing. The changes enhance FlyteArrayNode class by adding type hints and storage for flyte_task_node references, improving error handling, and updating execution synchronization logic to use correct node access paths. The implementation eliminates the need for fetching unregistered tasks and includes proper validation of workflow execution phases.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 2

@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Copy link

codecov bot commented Feb 19, 2025

Codecov Report

Attention: Patch coverage is 84.61538% with 2 lines in your changes missing coverage. Please review.

Project coverage is 58.26%. Comparing base (66d4aed) to head (32e4142).
Report is 11 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/remote/entities.py 83.33% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #3141       +/-   ##
===========================================
- Coverage   76.85%   58.26%   -18.60%     
===========================================
  Files         206      206               
  Lines       21851    21788       -63     
  Branches     2837     2837               
===========================================
- Hits        16794    12695     -4099     
- Misses       4269     8310     +4041     
+ Partials      788      783        -5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Troy Chiu <[email protected]>
@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 19, 2025

Code Review Agent Run #462f2a

Actionable Suggestions - 4
  • tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py - 1
    • Consider more descriptive function name · Line 9-11
  • tests/flytekit/integration/remote/test_remote.py - 1
    • Consider validating execution structure before access · Line 630-630
  • flytekit/remote/entities.py - 2
Review Details
  • Files reviewed - 4 · Commit Range: 8ba253c..19b1562
    • flytekit/remote/entities.py
    • flytekit/remote/remote.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 19, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Bug Fix - Fix Map Task Synchronization in Dynamic Workflows

entities.py - Enhanced FlyteArrayNode to store and handle flyte_node references properly

remote.py - Updated task fetching logic to use stored flyte_task reference instead of fetching

Testing - Integration Tests for Map Task in Dynamic Workflows

test_remote.py - Added test case for map task execution within dynamic workflow

dynamic_array_map.py - Created test workflow demonstrating map task usage in dynamic context

Signed-off-by: Troy Chiu <[email protected]>
pingsutw
pingsutw previously approved these changes Feb 19, 2025
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 20, 2025

Code Review Agent Run #944657

Actionable Suggestions - 1
  • tests/flytekit/integration/remote/test_remote.py - 1
    • Consider validating execution status before assertions · Line 630-632
Review Details
  • Files reviewed - 2 · Commit Range: 19b1562..f395792
    • flytekit/remote/entities.py
    • tests/flytekit/integration/remote/test_remote.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

parallelism: int,
min_successes: int,
min_success_ratio: float,
flyte_task_node: Optional[FlyteTaskNode] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my only question is, how is this flyte_task_node different from the task node that's inside the inner node object on line 353? I think @pvditt should take a look btw, he definitely knows more about the inner workings of map and where it's going than i do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The types are different. The node object on line 353 (array_node.node.task_node) is TaskNode type, while this flyte_task_node is FlyteTaskNode type. Or they are fundamentally identical?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah FlyteTaskNode inherits from TaskNode - if it's possible i'd put it there instead, just to avoid confusion? what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout. I used FlyteTaskNode to replace the original node object. Mind taking a look? TY

Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 20, 2025

Code Review Agent Run #ff09ed

Actionable Suggestions - 3
  • flytekit/remote/entities.py - 3
    • Consider removing redundant instance variable assignment · Line 358-359
    • Consider impact of method name change · Line 362-363
    • Consider keeping parameter optional for compatibility · Line 369-369
Review Details
  • Files reviewed - 2 · Commit Range: f395792..32e4142
    • flytekit/remote/entities.py
    • flytekit/remote/remote.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +358 to +359
super().__init__(flyte_node, parallelism, min_successes, min_success_ratio)
self._flyte_node = flyte_node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing redundant instance variable assignment

Consider removing the redundant assignment of _flyte_node in line 359 since it's already being set in the parent class through super().__init__() call.

Code suggestion
Check the AI-generated fix before applying
Suggested change
super().__init__(flyte_node, parallelism, min_successes, min_success_ratio)
self._flyte_node = flyte_node
super().__init__(flyte_node, parallelism, min_successes, min_success_ratio)

Code Review Run #ff09ed


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +362 to +363
def flyte_node(self) -> FlyteNode:
return self._flyte_node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider impact of method name change

Consider keeping the method name consistent. The method name is being changed from flyte_task_node to flyte_node which could break backward compatibility. If this is an intentional API change, it might be worth documenting the change and ensuring all callers are updated.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def flyte_node(self) -> FlyteNode:
return self._flyte_node
def flyte_task_node(self) -> FlyteNode:
return self._flyte_task_node

Code Review Run #ff09ed


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

def promote_from_model(
cls,
model: _workflow_model.ArrayNode,
flyte_node: FlyteNode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider keeping parameter optional for compatibility

Consider making the flyte_node parameter optional with a default value of None. The parameter type change from Optional[FlyteTaskNode] to FlyteNode may break existing code that passes None.

Code suggestion
Check the AI-generated fix before applying
Suggested change
flyte_node: FlyteNode,
flyte_node: Optional[FlyteNode] = None,

Code Review Run #ff09ed


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

@wild-endeavor
Copy link
Contributor

thank you troy

cls,
model: _workflow_model.ArrayNode,
flyte_node: FlyteNode,
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add -> FlyteArrayNode for the return type annotation here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add this in the following PR. Thank you!

return cls(
node=model._node,
flyte_node=flyte_node,
parallelism=model._parallelism,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is outside the scope of this change, but I noticed we aren't setting a few fields here (

execution_mode=None,
is_original_sub_node_interface=False,
data_mode=None,
)

I wonder if there's a way we could error in places like here if the fields aren't updated when ArrayNode/any node's IDL changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's easy to do since there are lots of models that don't match the IDL.

Copy link
Contributor

@pvditt pvditt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes look good. Thanks for fixing this

@troychiu troychiu merged commit d7a3cef into master Feb 21, 2025
111 of 112 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants