-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slurm agent fn task #3150
Slurm agent fn task #3150
Conversation
Signed-off-by: jiangjiawei1103 <[email protected]>
Signed-off-by: jiangjiawei1103 <[email protected]>
Signed-off-by: jiangjiawei1103 <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Successfully submit and run the user-defined task as a normal python function on a remote Slurm cluster. 1. Inherit from PythonFunctionTask instead of PythonTask 2. Transfer the task module through sftp 3. Interact with amazon s3 bucket on both localhost and Slurm cluster Signed-off-by: JiaWei Jiang <[email protected]>
Specifying `--raw-output-data-prefix` option handles task_module download. Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Add `ssh_conf` filed to let users specify connection secret Note that reconnection is done in both `get` and `delete`. This is just a temporary workaround. Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
For data scientists and MLEs developing flyte wf with Slurm agent, they don't actually need to know ssh connection details. We assume they only need to specify which Slurm cluster to use by hostname. Signed-off-by: JiaWei Jiang <[email protected]>
1. Write user-defined batch script to a tmp file 2. Transfer the batch script through sftp 3. Construct sbatch command to run on Slurm cluster Signed-off-by: JiaWei Jiang <[email protected]>
1. Remove SFTP for batch script transfer * Assume Slurm batch script is present on Slurm cluster 2. Support directly specifying a remote batch script path Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: pryce-turner <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
`SlurmTask` and `SlurmShellTask` now share the same agent. Signed-off-by: JiaWei Jiang <[email protected]>
job_state = "running" | ||
for o in job_res.stdout.split(" "): | ||
if "JobState" in o: | ||
job_state = o.split("=")[1].strip().lower() | ||
elif "StdOut" in o: | ||
stdout_path = o.split("=")[1].strip() | ||
msg_res = await conn.run(f"cat {stdout_path}", check=True) | ||
msg = msg_res.stdout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the case when stdout_path
is not found in the job output. Currently, the code assumes msg
will always be initialized which could lead to potential UnboundLocalError
if StdOut
is not found in the job output.
Code suggestion
Check the AI-generated fix before applying
job_state = "running" | |
for o in job_res.stdout.split(" "): | |
if "JobState" in o: | |
job_state = o.split("=")[1].strip().lower() | |
elif "StdOut" in o: | |
stdout_path = o.split("=")[1].strip() | |
msg_res = await conn.run(f"cat {stdout_path}", check=True) | |
msg = msg_res.stdout | |
job_state = "running" | |
msg = "No stdout available" | |
for o in job_res.stdout.split(" "): | |
if "JobState" in o: | |
job_state = o.split("=")[1].strip().lower() | |
elif "StdOut" in o: | |
stdout_path = o.split("=")[1].strip() | |
msg_res = await conn.run(f"cat {stdout_path}", check=True) | |
msg = msg_res.stdout |
Code Review Run #2afb6d
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
script = f"""#!/bin/bash -i | ||
{entrypoint} | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more robust shebang line #!/usr/bin/env bash
instead of #!/bin/bash -i
as it provides better portability across different systems where bash may not be in the standard location. Additionally, the -i
flag for interactive mode may not be necessary for a batch script.
Code suggestion
Check the AI-generated fix before applying
script = f"""#!/bin/bash -i | |
{entrypoint} | |
""" | |
script = f"""#!/usr/bin/env bash | |
{entrypoint} | |
""" |
Code Review Run #2afb6d
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
private_key_path = os.path.abspath("./slurm_private_key") | ||
with open(private_key_path, "w") as f: | ||
f.write(default_client_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Path
from pathlib
instead of os.path
and avoid blocking open()
in async functions. Consider using aiofiles
.
Code suggestion
Check the AI-generated fix before applying
private_key_path = os.path.abspath("./slurm_private_key") | |
with open(private_key_path, "w") as f: | |
f.write(default_client_key) | |
from pathlib import Path | |
import aiofiles | |
private_key_path = Path("./slurm_private_key").resolve() | |
async with aiofiles.open(private_key_path, "w") as f: | |
await f.write(default_client_key) |
Code Review Run #2afb6d
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
plugins/flytekit-slurm/setup.py
Outdated
|
||
microlib_name = f"flytekitplugins-{PLUGIN_NAME}" | ||
|
||
plugin_requires = ["flytekit>1.13.8", "flyteidl>=1.11.0b1", "asyncssh"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding version constraints for asyncssh
dependency to ensure compatibility and prevent potential breaking changes. For example: asyncssh>=2.0.0,<3.0.0
Code suggestion
Check the AI-generated fix before applying
plugin_requires = ["flytekit>1.13.8", "flyteidl>=1.11.0b1", "asyncssh"] | |
plugin_requires = ["flytekit>1.13.8", "flyteidl>=1.11.0b1", "asyncssh>=2.0.0,<3.0.0"] |
Code Review Run #2afb6d
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Signed-off-by: JiangJiaWei1103 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #0c867bActionable Suggestions - 3
Review Details
|
"""Get an existing SSH connection or create a new one if needed. | ||
|
||
Args: | ||
ssh_config (Dict[str, Union[str, List[str], Tuple[str, ...]]]): SSH configuration dictionary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding more specific type hints for the ssh_config
dictionary keys. The current type hint Dict[str, Union[str, List[str], Tuple[str, ...]]]
could be more precise by specifying the expected keys like host
and username
that are used in the code.
Code suggestion
Check the AI-generated fix before applying
-from typing import Dict, List, Optional, Tuple, Union
+from typing import Dict, List, Optional, Tuple, Union, TypedDict
+
+class SshConfig(TypedDict, total=False):
+ host: str
+ username: Optional[str]
+ known_hosts: Optional[List[str]]
+ client_keys: Optional[List[str]]
@@ -112,2 +118,2 @@
- self, ssh_config: Dict[str, Union[str, List[str], Tuple[str, ...]]]
+ self, ssh_config: SshConfig
Code Review Run #0c867b
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
entrypoint: str, | ||
script: Optional[str] = None, | ||
batch_script_path: str = "/tmp/task.slurm", | ||
) -> Tuple[str, str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints for the returned tuple elements in the function signature to improve code clarity and type safety. The return type annotation could be more specific like Tuple[str, str]
with descriptive aliases.
Code suggestion
Check the AI-generated fix before applying
) -> Tuple[str, str]: | |
SbatchCommand = str | |
ScriptContent = str | |
) -> Tuple[SbatchCommand, ScriptContent]: |
Code Review Run #0c867b
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
flytekit/extras/tasks/shell.py
Outdated
@@ -367,7 +367,7 @@ | |||
return None | |||
|
|||
def post_execute(self, user_params: ExecutionParameters, rval: typing.Any) -> typing.Any: | |||
return self._config_task_instance.post_execute(user_params, rval) | |||
return self._config_task_instance.pre_execute(user_params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The post_execute
method appears to be incorrectly calling pre_execute
instead of post_execute
on the config task instance. This could lead to incorrect execution flow since pre and post execute serve different purposes.
Code suggestion
Check the AI-generated fix before applying
return self._config_task_instance.pre_execute(user_params) | |
return self._config_task_instance.post_execute(user_params, rval) |
Code Review Run #0c867b
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Signed-off-by: JiangJiaWei1103 <[email protected]>
6227801
to
46b7b62
Compare
Code Review Agent Run Status
|
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #57d709Actionable Suggestions - 0Additional Suggestions - 10
Review Details
|
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #179ed1Actionable Suggestions - 0Review Details
|
Tracking issue
Why are the changes needed?
What changes were proposed in this pull request?
How was this patch tested?
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This PR implements multiple enhancements including Slurm function task plugin for cluster execution and WebhookTask feature for HTTP requests within workflows. The implementation includes improved SSH connection handling to resolve host key verification issues and prevent 'asyncssh.misc.HostKeyNotVerifiable' errors. Changes involve streamlined shell task execution logic, enhanced base agent framework, and improved robustness when connecting to new cluster nodes. The updates include configuration handling improvements and compatibility updates for newer versions of flytekit, flyteidl, and Python 3.12.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5