Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sync for map over lp in dynamic #3155

Conversation

troychiu
Copy link
Member

@troychiu troychiu commented Feb 25, 2025

Why are the changes needed?

Currently if we call sync_node_execution on an execution contains map over lp in dynamic workflow, we'll get error like

KeyError:
LAUNCH_PLAN:flytesnacks:development:yt_dbg.scratchpad.array_node_lp_source.hello_world_w
f_for_array:hOMsbgLXveLF2C15vDjQQA

because we didn't inspect launch plan inside an array node in dynamic case.

What changes were proposed in this pull request?

  • Fix sync_node_execution for map over lp in dynamic.
  • Refactor a bit to remove duplicate codes.

How was this patch tested?

tested with a dynamic workflow which contains map over launch plan and I was able to call sync_node_execution and get the correct data.

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Summary by Bito

This PR addresses a bug in sync_node_execution where launch plans within array nodes of dynamic workflows were causing KeyError exceptions. The implementation has been refactored to properly inspect and handle launch plans inside array nodes, with improved code organization through method separation and enhanced support for map operations over launch plans.

Unit tests added: False

Estimated effort to review (1-5, lower is better): 1

@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Copy link

codecov bot commented Feb 25, 2025

Codecov Report

Attention: Patch coverage is 52.63158% with 18 lines in your changes missing coverage. Please review.

Project coverage is 77.45%. Comparing base (9ddc8d7) to head (70f979b).
Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/remote/remote.py 52.63% 11 Missing and 7 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3155      +/-   ##
==========================================
- Coverage   78.58%   77.45%   -1.14%     
==========================================
  Files         284      212      -72     
  Lines       25356    22027    -3329     
  Branches     2865     2864       -1     
==========================================
- Hits        19926    17060    -2866     
+ Misses       4650     4134     -516     
- Partials      780      833      +53     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@eapolinario eapolinario merged commit 87fb3c6 into master Feb 25, 2025
113 of 114 checks passed
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 25, 2025

Code Review Agent Run #df4fb4

Actionable Suggestions - 2
  • flytekit/remote/remote.py - 2
Additional Suggestions - 1
  • flytekit/remote/remote.py - 1
Review Details
  • Files reviewed - 1 · Commit Range: 70f979b..70f979b
    • flytekit/remote/remote.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Bug Fix - Fix Launch Plan Synchronization in Dynamic Workflows

remote.py - Refactored launch plan discovery logic and fixed sync_node_execution for map over launch plans in dynamic workflows

node_launch_plans[node.workflow_node.launchplan_ref] = self.client.get_launch_plan(
node.workflow_node.launchplan_ref
).spec
self.find_launch_plan_for_node(node, node_launch_plans)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for launch plans

Consider adding error handling for find_launch_plan_for_node to handle potential exceptions when fetching launch plans. The method may fail silently if there are issues accessing the launch plans.

Code suggestion
Check the AI-generated fix before applying
Suggested change
self.find_launch_plan_for_node(node, node_launch_plans)
try:
self.find_launch_plan_for_node(node, node_launch_plans)
except FlyteEntityNotExistException as e:
logger.warning(f"Failed to fetch launch plan for node {node.id}: {str(e)}")

Code Review Run #df4fb4


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +513 to +515
def find_launch_plan_for_node(
self, node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider simplifying complex nested logic

Consider breaking down the find_launch_plan_for_node method into smaller, more focused methods. The current implementation has multiple nested functions and complex branching logic which could be simplified for better maintainability. Consider extracting the branch node handling logic into a separate method.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def find_launch_plan_for_node(
self, node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
):
def _handle_then_node(
self, child_then_node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
if child_then_node.branch_node:
self._handle_branch_node(child_then_node.branch_node, node_launch_plans)
elif child_then_node.workflow_node and child_then_node.workflow_node.launchplan_ref:
lp_ref = child_then_node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)
def _handle_branch_node(
self, branch_node: BranchNode, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
if branch_node and branch_node.if_else:
branch = branch_node.if_else
if branch.case and branch.case.then_node:
self._handle_then_node(branch.case.then_node, node_launch_plans)
if branch.other:
for o in branch.other:
if o.then_node:
self._handle_then_node(o.then_node, node_launch_plans)
if branch.else_node:
if branch.else_node.branch_node:
self._handle_branch_node(branch.else_node.branch_node, node_launch_plans)
elif branch.else_node.workflow_node and branch.else_node.workflow_node.launchplan_ref:
lp_ref = branch.else_node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)
def find_launch_plan_for_node(
self, node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
):

Code Review Run #df4fb4


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants