Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager cleanup #3148

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open

Eager cleanup #3148

wants to merge 17 commits into from

Conversation

wild-endeavor
Copy link
Contributor

@wild-endeavor wild-endeavor commented Feb 20, 2025

Why are the changes needed?

Eager tasks operate outside of the fundamental assumptions made when Flyte was designed. This leads to potentially orphaned executions - executions that were launched by an eager task that aren't terminated when a parent eager task fails.

What changes were proposed in this pull request?

This PR adds an imperative workflow for each eager task, similar to what Admin does for single task executions. However it also attaches a failure node handler in cases where an eager task fails.

  • Added a new task type EagerFailureHandlerTask that's similar to the EchoTask. It is used to mimic the input interface of an eager task since the inputs to the failure node have to match that of the parent workflow/eager task. This failure task leverages the execution tags functionality as well as the tags that eager tasks add to child executions to make sure they're all cleaned up.
  • Added a new task resolver for that task type that always returns an EagerFailureHandlerTask instance with no input interface at runtime. Since we don't need to transform the inputs at failure handling runtime, we don't need to store a path to the original eager task to retrieve the Python types.
  • Added failure node for imperative workflows. (mostly takes care of add on failure parameter to imperative workflow  flyte#5913)
  • Noticed a corresponding ValueNotIn filter object was missing but ended up not using it.

Todo: Before merging, two changes need to be reverted: requesting of a default secret, and setting of the failure task container image.

How was this patch tested?

Setup process

This was tested on Union internal clusters. (Testing on Flyte clusters is not yet possible due to missing single-task-execution labels.) Wrote an eager task that created some downstream executions that take varying amounts of time. One of the downstream tasks is set to fail, triggering a cleanup of the remaining ones.

@task(container_image=image)
def add_one(x: int) -> int:
    print(os.environ)
    return x + 1

@task(container_image=image)
async def double(x: int) -> int:
    print_env()
    if x == 2:
        return 42
    if x == 5:
        raise AssertionError("x is 5")
    await asyncio.sleep(20 + x)
    return x * 2

@eager(container_image=image)
async def level_1() -> typing.Tuple[int, int]:
    a1 = asyncio.create_task(double(x=1))
    a2 = asyncio.create_task(double(x=2))
    a3 = asyncio.create_task(double(x=3))
    a4 = asyncio.create_task(double(x=4))
    a5 = asyncio.create_task(double(x=5))

    (i1, i2, i3, i4, i5) = await asyncio.gather(a1, a2, a3, a4, a5)
    x, y = i1 + i2, i3 + i4 + i5
    print(f"Final result: {x}, {y}")
    return x, y

Screenshots

image

Failure node tasks do not run if everything succeeds.
image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

Implementation of eager task cleanup mechanism with EagerFailureHandlerTask, introducing configurable delay settings and failure handler system. The changes include updating failure node IDs from 'nfail' to 'efn', adding task resolver and failure node support in imperative workflows. These modifications ensure proper cleanup of downstream executions and orphaned tasks when parent eager tasks fail.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 3

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Copy link

codecov bot commented Feb 20, 2025

Codecov Report

Attention: Patch coverage is 25.33333% with 56 lines in your changes missing coverage. Please review.

Project coverage is 41.23%. Comparing base (66d4aed) to head (be37cd5).
Report is 11 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/core/python_function_task.py 42.50% 23 Missing ⚠️
flytekit/core/workflow.py 5.00% 19 Missing ⚠️
flytekit/tools/serialize_helpers.py 0.00% 7 Missing ⚠️
flytekit/core/node_creation.py 0.00% 5 Missing ⚠️
flytekit/tools/translator.py 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #3148       +/-   ##
===========================================
- Coverage   76.85%   41.23%   -35.63%     
===========================================
  Files         206      211        +5     
  Lines       21851    22060      +209     
  Branches     2837     2868       +31     
===========================================
- Hits        16794     9096     -7698     
- Misses       4269    12815     +8546     
+ Partials      788      149      -639     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 22, 2025

Code Review Agent Run #ce446d

Actionable Suggestions - 3
  • flytekit/tools/serialize_helpers.py - 1
    • Consider extracting task workflow creation logic · Line 64-67
  • tests/flytekit/unit/core/test_imperative.py - 1
    • Consider assigning default DC object creation · Line 159-160
  • flytekit/core/node_creation.py - 1
    • Consider adding error handling for async execution · Line 82-85
Review Details
  • Files reviewed - 10 · Commit Range: f31e823..cdb5189
    • flytekit/core/constants.py
    • flytekit/core/node_creation.py
    • flytekit/core/python_function_task.py
    • flytekit/core/workflow.py
    • flytekit/models/filters.py
    • flytekit/tools/serialize_helpers.py
    • flytekit/tools/translator.py
    • tests/flytekit/unit/core/test_eager_cleanup.py
    • tests/flytekit/unit/core/test_imperative.py
    • tests/flytekit/unit/core/test_serialization.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 22, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
New Feature - Eager Task Cleanup System Implementation

constants.py - Added default failure node ID constant

node_creation.py - Added async support for entity execution

python_function_task.py - Implemented EagerFailureHandlerTask and task resolver

workflow.py - Added failure handler support in ImperativeWorkflow

filters.py - Added ValueNotIn filter class

serialize_helpers.py - Added serialization support for eager async tasks

translator.py - Added validation for failure node ID

Testing - Test Coverage for Eager Cleanup System

test_eager_cleanup.py - Added tests for EagerFailureHandlerTask and resolver

test_imperative.py - Added tests for failure handling in imperative workflows

test_serialization.py - Added tests for failure node naming validation

Comment on lines +82 to +85
if inspect.iscoroutinefunction(entity.__call__):
outputs = run_sync(entity, **kwargs)
else:
outputs = entity(**kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for async execution

Consider handling potential exceptions from run_sync when executing coroutine functions. The current implementation may silently fail if the async execution encounters issues.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if inspect.iscoroutinefunction(entity.__call__):
outputs = run_sync(entity, **kwargs)
else:
outputs = entity(**kwargs)
if inspect.iscoroutinefunction(entity.__call__):
try:
outputs = run_sync(entity, **kwargs)
except Exception as e:
raise RuntimeError(f"Async execution failed for {entity.name}: {str(e)}") from e
else:
outputs = entity(**kwargs)

Code Review Run #ce446d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Copy link
Collaborator

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few nits.

Comment on lines +650 to +652
# todo: remove this before merging
# this is actually bad, but useful for developing
cleanup._container_image = self._container_image
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 27, 2025

Code Review Agent Run #e76f06

Actionable Suggestions - 0
Review Details
  • Files reviewed - 4 · Commit Range: cdb5189..3db4249
    • flytekit/core/constants.py
    • flytekit/core/python_function_task.py
    • tests/flytekit/unit/core/test_imperative.py
    • tests/flytekit/unit/core/test_serialization.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants