Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slurm agent fn task #3150

Merged
merged 85 commits into from
Feb 22, 2025
Merged

Conversation

JiangJiaWei1103
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 commented Feb 21, 2025

Tracking issue

Why are the changes needed?

What changes were proposed in this pull request?

How was this patch tested?

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This PR implements multiple enhancements including Slurm function task plugin for cluster execution and WebhookTask feature for HTTP requests within workflows. The implementation includes improved SSH connection handling to resolve host key verification issues and prevent 'asyncssh.misc.HostKeyNotVerifiable' errors. Changes involve streamlined shell task execution logic, enhanced base agent framework, and improved robustness when connecting to new cluster nodes. The updates include configuration handling improvements and compatibility updates for newer versions of flytekit, flyteidl, and Python 3.12.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

JiangJiaWei1103 and others added 30 commits December 14, 2024 19:18
Signed-off-by: jiangjiawei1103 <[email protected]>
Signed-off-by: jiangjiawei1103 <[email protected]>
Successfully submit and run the user-defined task as a normal python
function on a remote Slurm cluster.

1. Inherit from PythonFunctionTask instead of PythonTask
2. Transfer the task module through sftp
3. Interact with amazon s3 bucket on both localhost and Slurm cluster

Signed-off-by: JiaWei Jiang <[email protected]>
Specifying `--raw-output-data-prefix` option handles task_module download.

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Add `ssh_conf` filed to let users specify connection secret

Note that reconnection is done in both `get` and `delete`. This is just
a temporary workaround.

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
For data scientists and MLEs developing flyte wf with Slurm agent,
they don't actually need to know ssh connection details. We assume
they only need to specify which Slurm cluster to use by hostname.

Signed-off-by: JiaWei Jiang <[email protected]>
1. Write user-defined batch script to a tmp file
2. Transfer the batch script through sftp
3. Construct sbatch command to run on Slurm cluster

Signed-off-by: JiaWei Jiang <[email protected]>
1. Remove SFTP for batch script transfer
    * Assume Slurm batch script is present on Slurm cluster
2. Support directly specifying a remote batch script path

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
`SlurmTask` and `SlurmShellTask` now share the same agent.

Signed-off-by: JiaWei Jiang <[email protected]>
Comment on lines 98 to 105
job_state = "running"
for o in job_res.stdout.split(" "):
if "JobState" in o:
job_state = o.split("=")[1].strip().lower()
elif "StdOut" in o:
stdout_path = o.split("=")[1].strip()
msg_res = await conn.run(f"cat {stdout_path}", check=True)
msg = msg_res.stdout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for stdout

Consider adding error handling for the case when stdout_path is not found in the job output. Currently, the code assumes msg will always be initialized which could lead to potential UnboundLocalError if StdOut is not found in the job output.

Code suggestion
Check the AI-generated fix before applying
Suggested change
job_state = "running"
for o in job_res.stdout.split(" "):
if "JobState" in o:
job_state = o.split("=")[1].strip().lower()
elif "StdOut" in o:
stdout_path = o.split("=")[1].strip()
msg_res = await conn.run(f"cat {stdout_path}", check=True)
msg = msg_res.stdout
job_state = "running"
msg = "No stdout available"
for o in job_res.stdout.split(" "):
if "JobState" in o:
job_state = o.split("=")[1].strip().lower()
elif "StdOut" in o:
stdout_path = o.split("=")[1].strip()
msg_res = await conn.run(f"cat {stdout_path}", check=True)
msg = msg_res.stdout

Code Review Run #2afb6d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +180 to +182
script = f"""#!/bin/bash -i
{entrypoint}
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider more portable shebang line

Consider using a more robust shebang line #!/usr/bin/env bash instead of #!/bin/bash -i as it provides better portability across different systems where bash may not be in the standard location. Additionally, the -i flag for interactive mode may not be necessary for a batch script.

Code suggestion
Check the AI-generated fix before applying
Suggested change
script = f"""#!/bin/bash -i
{entrypoint}
"""
script = f"""#!/usr/bin/env bash
{entrypoint}
"""

Code Review Run #2afb6d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +95 to +97
private_key_path = os.path.abspath("./slurm_private_key")
with open(private_key_path, "w") as f:
f.write(default_client_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async-unsafe file operations detected

Use Path from pathlib instead of os.path and avoid blocking open() in async functions. Consider using aiofiles.

Code suggestion
Check the AI-generated fix before applying
Suggested change
private_key_path = os.path.abspath("./slurm_private_key")
with open(private_key_path, "w") as f:
f.write(default_client_key)
from pathlib import Path
import aiofiles
private_key_path = Path("./slurm_private_key").resolve()
async with aiofiles.open(private_key_path, "w") as f:
await f.write(default_client_key)

Code Review Run #2afb6d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>1.13.8", "flyteidl>=1.11.0b1", "asyncssh"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding version constraints for asyncssh

Consider adding version constraints for asyncssh dependency to ensure compatibility and prevent potential breaking changes. For example: asyncssh>=2.0.0,<3.0.0

Code suggestion
Check the AI-generated fix before applying
Suggested change
plugin_requires = ["flytekit>1.13.8", "flyteidl>=1.11.0b1", "asyncssh"]
plugin_requires = ["flytekit>1.13.8", "flyteidl>=1.11.0b1", "asyncssh>=2.0.0,<3.0.0"]

Code Review Run #2afb6d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Signed-off-by: JiangJiaWei1103 <[email protected]>
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function_0.py
image

function_1.py
image

function_2.py
image

JiangJiaWei1103 and others added 4 commits February 21, 2025 23:21
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 21, 2025

Code Review Agent Run #0c867b

Actionable Suggestions - 3
  • plugins/flytekit-slurm/flytekitplugins/slurm/function/agent.py - 2
  • flytekit/extras/tasks/shell.py - 1
Review Details
  • Files reviewed - 6 · Commit Range: 31fc564..acfaa76
    • flytekit/extend/backend/base_agent.py
    • flytekit/extras/tasks/shell.py
    • plugins/flytekit-slurm/flytekitplugins/slurm/function/agent.py
    • plugins/flytekit-slurm/flytekitplugins/slurm/function/task.py
    • plugins/flytekit-slurm/flytekitplugins/slurm/ssh_utils.py
    • plugins/flytekit-slurm/setup.py
  • Files skipped - 1
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

"""Get an existing SSH connection or create a new one if needed.

Args:
ssh_config (Dict[str, Union[str, List[str], Tuple[str, ...]]]): SSH configuration dictionary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider more specific ssh_config type hints

Consider adding more specific type hints for the ssh_config dictionary keys. The current type hint Dict[str, Union[str, List[str], Tuple[str, ...]]] could be more precise by specifying the expected keys like host and username that are used in the code.

Code suggestion
Check the AI-generated fix before applying
 -from typing import Dict, List, Optional, Tuple, Union
 +from typing import Dict, List, Optional, Tuple, Union, TypedDict
 +
 +class SshConfig(TypedDict, total=False):
 +    host: str
 +    username: Optional[str]
 +    known_hosts: Optional[List[str]]
 +    client_keys: Optional[List[str]]

 @@ -112,2 +118,2 @@
 -        self, ssh_config: Dict[str, Union[str, List[str], Tuple[str, ...]]]
 +        self, ssh_config: SshConfig

Code Review Run #0c867b


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

entrypoint: str,
script: Optional[str] = None,
batch_script_path: str = "/tmp/task.slurm",
) -> Tuple[str, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding descriptive type hints

Consider adding type hints for the returned tuple elements in the function signature to improve code clarity and type safety. The return type annotation could be more specific like Tuple[str, str] with descriptive aliases.

Code suggestion
Check the AI-generated fix before applying
Suggested change
) -> Tuple[str, str]:
SbatchCommand = str
ScriptContent = str
) -> Tuple[SbatchCommand, ScriptContent]:

Code Review Run #0c867b


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

@@ -367,7 +367,7 @@
return None

def post_execute(self, user_params: ExecutionParameters, rval: typing.Any) -> typing.Any:
return self._config_task_instance.post_execute(user_params, rval)
return self._config_task_instance.pre_execute(user_params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect execution lifecycle method call

The post_execute method appears to be incorrectly calling pre_execute instead of post_execute on the config task instance. This could lead to incorrect execution flow since pre and post execute serve different purposes.

Code suggestion
Check the AI-generated fix before applying
Suggested change
return self._config_task_instance.pre_execute(user_params)
return self._config_task_instance.post_execute(user_params, rval)

Code Review Run #0c867b


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Signed-off-by: JiangJiaWei1103 <[email protected]>
@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 22, 2025

Code Review Agent Run #57d709

Actionable Suggestions - 0
Additional Suggestions - 10
  • flytekit/remote/remote.py - 1
  • flytekit/extras/webhook/task.py - 1
  • plugins/flytekit-openai/flytekitplugins/openai/batch/agent.py - 1
    • Consider direct LiteralMap creation optimization · Line 108-108
  • flytekit/remote/entities.py - 1
    • Consider removing redundant flyte_node assignment · Line 359-359
  • plugins/flytekit-ray/flytekitplugins/ray/task.py - 1
    • Consider extracting head pod template logic · Line 107-122
  • tests/flytekit/unit/extras/webhook/test_end_to_end.py - 1
    • Use standard HTTP status constants · Line 51-51
  • flytekit/clis/sdk_in_container/serve.py - 1
    • Consider using sys.path.insert for precedence · Line 74-75
  • tests/flytekit/unit/core/test_references.py - 2
  • plugins/flytekit-aws-sagemaker/setup.py - 1
    • Consider less strict version constraint · Line 8-8
Review Details
  • Files reviewed - 32 · Commit Range: acfaa76..f0f7930
    • dev-requirements.in
    • flytekit/clis/sdk_in_container/serve.py
    • flytekit/core/reference_entity.py
    • flytekit/core/type_engine.py
    • flytekit/extend/backend/base_agent.py
    • flytekit/extras/tasks/shell.py
    • flytekit/extras/webhook/__init__.py
    • flytekit/extras/webhook/agent.py
    • flytekit/extras/webhook/constants.py
    • flytekit/extras/webhook/task.py
    • flytekit/remote/entities.py
    • flytekit/remote/remote.py
    • flytekit/utils/dict_formatter.py
    • plugins/flytekit-aws-sagemaker/flytekitplugins/awssagemaker_inference/boto3_agent.py
    • plugins/flytekit-aws-sagemaker/flytekitplugins/awssagemaker_inference/boto3_mixin.py
    • plugins/flytekit-aws-sagemaker/setup.py
    • plugins/flytekit-aws-sagemaker/tests/test_boto3_agent.py
    • plugins/flytekit-aws-sagemaker/tests/test_boto3_mixin.py
    • plugins/flytekit-greatexpectations/tests/test_schema.py
    • plugins/flytekit-openai/flytekitplugins/openai/batch/agent.py
    • plugins/flytekit-openai/tests/openai_batch/test_agent.py
    • plugins/flytekit-ray/flytekitplugins/ray/task.py
    • plugins/flytekit-ray/tests/test_ray.py
    • plugins/flytekit-slurm/flytekitplugins/slurm/function/agent.py
    • plugins/flytekit-slurm/tests/test_slurm_fn_task.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/deep_child_workflow.py
    • tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py
    • tests/flytekit/unit/core/test_references.py
    • tests/flytekit/unit/extras/webhook/test_agent.py
    • tests/flytekit/unit/extras/webhook/test_end_to_end.py
    • tests/flytekit/unit/extras/webhook/test_task.py
  • Files skipped - 3
    • .github/workflows/build_image.yml - Reason: Filter setting
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
    • .github/workflows/pythonpublish.yml - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 22, 2025

Code Review Agent Run #179ed1

Actionable Suggestions - 0
Review Details
  • Files reviewed - 1 · Commit Range: f0f7930..d0d59d3
    • plugins/flytekit-slurm/flytekitplugins/slurm/ssh_utils.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants