-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix sync for map task in dynamic #3141
Conversation
Signed-off-by: Troy Chiu <[email protected]>
Code Review Agent Run Status
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3141 +/- ##
===========================================
- Coverage 76.85% 58.26% -18.60%
===========================================
Files 206 206
Lines 21851 21788 -63
Branches 2837 2837
===========================================
- Hits 16794 12695 -4099
- Misses 4269 8310 +4041
+ Partials 788 783 -5 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Troy Chiu <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Code Review Agent Run #462f2aActionable Suggestions - 4
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
Signed-off-by: Troy Chiu <[email protected]>
Code Review Agent Run #944657Actionable Suggestions - 1
Review Details
|
flytekit/remote/entities.py
Outdated
parallelism: int, | ||
min_successes: int, | ||
min_success_ratio: float, | ||
flyte_task_node: Optional[FlyteTaskNode] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my only question is, how is this flyte_task_node
different from the task node that's inside the inner node
object on line 353? I think @pvditt should take a look btw, he definitely knows more about the inner workings of map and where it's going than i do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The types are different. The node object on line 353 (array_node.node.task_node) is TaskNode type, while this flyte_task_node is FlyteTaskNode type. Or they are fundamentally identical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah FlyteTaskNode inherits from TaskNode - if it's possible i'd put it there instead, just to avoid confusion? what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout. I used FlyteTaskNode to replace the original node object. Mind taking a look? TY
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Code Review Agent Run #ff09edActionable Suggestions - 3
Review Details
|
super().__init__(flyte_node, parallelism, min_successes, min_success_ratio) | ||
self._flyte_node = flyte_node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider removing the redundant assignment of _flyte_node
in line 359 since it's already being set in the parent class through super().__init__()
call.
Code suggestion
Check the AI-generated fix before applying
super().__init__(flyte_node, parallelism, min_successes, min_success_ratio) | |
self._flyte_node = flyte_node | |
super().__init__(flyte_node, parallelism, min_successes, min_success_ratio) |
Code Review Run #ff09ed
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
def flyte_node(self) -> FlyteNode: | ||
return self._flyte_node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider keeping the method name consistent. The method name is being changed from flyte_task_node
to flyte_node
which could break backward compatibility. If this is an intentional API change, it might be worth documenting the change and ensuring all callers are updated.
Code suggestion
Check the AI-generated fix before applying
def flyte_node(self) -> FlyteNode: | |
return self._flyte_node | |
def flyte_task_node(self) -> FlyteNode: | |
return self._flyte_task_node |
Code Review Run #ff09ed
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
def promote_from_model( | ||
cls, | ||
model: _workflow_model.ArrayNode, | ||
flyte_node: FlyteNode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making the flyte_node
parameter optional with a default value of None
. The parameter type change from Optional[FlyteTaskNode]
to FlyteNode
may break existing code that passes None
.
Code suggestion
Check the AI-generated fix before applying
flyte_node: FlyteNode, | |
flyte_node: Optional[FlyteNode] = None, |
Code Review Run #ff09ed
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
thank you troy |
cls, | ||
model: _workflow_model.ArrayNode, | ||
flyte_node: FlyteNode, | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add -> FlyteArrayNode for the return type annotation here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add this in the following PR. Thank you!
return cls( | ||
node=model._node, | ||
flyte_node=flyte_node, | ||
parallelism=model._parallelism, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is outside the scope of this change, but I noticed we aren't setting a few fields here (
flytekit/flytekit/models/core/workflow.py
Lines 394 to 396 in 6d7c738
execution_mode=None, | |
is_original_sub_node_interface=False, | |
data_mode=None, |
I wonder if there's a way we could error in places like here if the fields aren't updated when ArrayNode/any node's IDL changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's easy to do since there are lots of models that don't match the IDL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes look good. Thanks for fixing this
Why are the changes needed?
Currently if you put map task inside dynamic and sync the execution, you will get an error.
Example
This is because we try to fetch task but we don't register task inside a dynamic.
What changes were proposed in this pull request?
Store flyte_task_node in array_node so that we can use later
How was this patch tested?
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This PR fixes a critical synchronization bug in map tasks within dynamic workflows where task references were missing. The changes enhance FlyteArrayNode class by adding type hints and storage for flyte_task_node references, improving error handling, and updating execution synchronization logic to use correct node access paths. The implementation eliminates the need for fetching unregistered tasks and includes proper validation of workflow execution phases.Unit tests added: True
Estimated effort to review (1-5, lower is better): 2