-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slurm agent #3005
base: master
Are you sure you want to change the base?
Slurm agent #3005
Conversation
Signed-off-by: jiangjiawei1103 <[email protected]>
Signed-off-by: jiangjiawei1103 <[email protected]>
Signed-off-by: jiangjiawei1103 <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3005 +/- ##
==========================================
+ Coverage 76.85% 78.31% +1.45%
==========================================
Files 206 249 +43
Lines 21851 24062 +2211
Branches 2837 2839 +2
==========================================
+ Hits 16794 18844 +2050
- Misses 4269 4422 +153
- Partials 788 796 +8 ☔ View full report in Codecov by Sentry. |
Successfully submit and run the user-defined task as a normal python function on a remote Slurm cluster. 1. Inherit from PythonFunctionTask instead of PythonTask 2. Transfer the task module through sftp 3. Interact with amazon s3 bucket on both localhost and Slurm cluster Signed-off-by: JiaWei Jiang <[email protected]>
Specifying `--raw-output-data-prefix` option handles task_module download. Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run Status
|
Add `ssh_conf` filed to let users specify connection secret Note that reconnection is done in both `get` and `delete`. This is just a temporary workaround. Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run Status
|
For data scientists and MLEs developing flyte wf with Slurm agent, they don't actually need to know ssh connection details. We assume they only need to specify which Slurm cluster to use by hostname. Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run Status
|
1. Write user-defined batch script to a tmp file 2. Transfer the batch script through sftp 3. Construct sbatch command to run on Slurm cluster Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run Status
|
1. Remove SFTP for batch script transfer * Assume Slurm batch script is present on Slurm cluster 2. Support directly specifying a remote batch script path Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: pryce-turner <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Future-Outlier <[email protected]>
follow-up: provide output prefix in from flytekit.core import context_manager
from flytekit.core.data_persistence import FileAccessProvider
# output_prefix from task template
# This is for shell task write output to the output_prefix
ctx = FlyteContextManager.current_context()
builder = ctx.with_file_access(
FileAccessProvider(
local_sandbox_dir=ctx.file_access.local_sandbox_dir,
raw_output_prefix=request.output_prefix,
data_config=ctx.file_access.data_config,
)
)
with context_manager.FlyteContextManager.with_context(builder):
logger.info(f"{agent.name} start checking the status of the job")
res = await mirror_async_methods(agent.get, resource_meta=agent.metadata_type.decode(request.resource_meta))
resource = await res.to_flyte_idl()
return GetTaskResponse(resource=resource) |
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #469409Actionable Suggestions - 3
Review Details
|
# SSH connection pool for multi-host environment | ||
_conn: Optional[SSHClientConnection] = None | ||
|
||
ssh_config_to_ssh_conn: Dict[SSHConfig, SSHClientConnection] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more thread-safe data structure like asyncio.Lock()
to protect the shared ssh_config_to_ssh_conn
dictionary in concurrent scenarios. The current implementation might lead to race conditions if multiple tasks try to modify the connection pool simultaneously.
Code suggestion
Check the AI-generated fix before applying
from typing import Any, Dict, Optional
+import asyncio
@@ -33,3 +33,4 @@
_conn: Optional[SSHClientConnection] = None
- ssh_config_to_ssh_conn: Dict[SSHConfig, SSHClientConnection] = {}
+ _lock: asyncio.Lock = asyncio.Lock()
+ ssh_config_to_ssh_conn: Dict[SSHConfig, SSHClientConnection] = {}
Code Review Run #469409
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
unique_script_name = f"/tmp/task_{uuid.uuid4().hex}.slurm" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more secure temporary file location by leveraging Python's tempfile.mkdtemp()
instead of hardcoding /tmp/
. This would help avoid potential security risks in shared environments.
Code suggestion
Check the AI-generated fix before applying
unique_script_name = f"/tmp/task_{uuid.uuid4().hex}.slurm" | |
temp_dir = tempfile.mkdtemp(prefix="flyte-slurm-") | |
unique_script_name = os.path.join(temp_dir, f"task_{uuid.uuid4().hex}.slurm") | |
Code Review Run #469409
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
function will be executed directly. | ||
""" | ||
|
||
ssh_config: Dict[str, Union[str, list[str]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for the ssh_config
dictionary values since they now have a more specific type annotation of Union[str, list[str]]
. This could help catch type mismatches early.
Code suggestion
Check the AI-generated fix before applying
@@ -33,4 +33,8 @@
def __post_init__(self):
# TODO: assert ssh_config["host"] is not None
+ for key, value in self.ssh_config.items():
+ if not isinstance(value, (str, list)) or (isinstance(value, list) and not all(isinstance(x, str) for x in value)):
+ raise ValueError(f"ssh_config value for {key} must be either str or list[str]")
+
if self.sbatch_conf is None:
self.sbatch_conf = {}
Code Review Run #469409
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #7a5ec3Actionable Suggestions - 0Review Details
|
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #ab6b57Actionable Suggestions - 4
Review Details
|
@dataclass(frozen=True) | ||
class SSHConfig: | ||
"""A customized version of SSHClientConnectionOptions, tailored to specific needs. | ||
|
||
This config is based on the official SSHClientConnectionOptions but includes | ||
only a subset of options, with some fields adjusted to be optional or required. | ||
For the official options, please refer to: | ||
https://asyncssh.readthedocs.io/en/latest/api.html#asyncssh.SSHClientConnectionOptions | ||
|
||
Args: | ||
host: The hostname or address to connect to. | ||
username: The username to authenticate as on the server. | ||
client_keys: File paths to private keys which will be used to authenticate the | ||
client via public key authentication. The default value is not None since | ||
client public key authentication is mandatory. | ||
known_hosts: The list of keys which will be used to validate the server host key | ||
presented during the SSH handshake. If this is not specified, the keys will | ||
be looked up in the file .ssh/known_hosts. If this is explicitly set to None, | ||
server host key validation will be disabled. | ||
""" | ||
|
||
host: str | ||
username: str | ||
client_keys: Union[str, List[str], Tuple[str, ...]] = () | ||
known_hosts: Optional[KnownHostsArg] = None | ||
|
||
@classmethod | ||
def from_dict(cls: Type[T], ssh_config: Dict[str, Any]) -> T: | ||
return cls(**ssh_config) | ||
|
||
def to_dict(self) -> Dict[str, Any]: | ||
return asdict(self) | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, SSHConfig): | ||
return False | ||
return ( | ||
self.host == other.host and | ||
self.username == other.username and | ||
self.client_keys == other.client_keys and | ||
self.known_hosts == other.known_hosts | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should remove this part, right?
cc @JiangJiaWei1103
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we can use post_init
to assert the value
host: str | ||
username: Optional[str] = None | ||
|
||
def __hash__(self): | ||
return hash((self.host, self.username)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider implementing __eq__
method along with __hash__
since SlurmCluster
is used as a dictionary key in ssh_config_to_ssh_conn
. Objects that are equal should have the same hash value for dictionary operations to work correctly.
Code suggestion
Check the AI-generated fix before applying
host: str | |
username: Optional[str] = None | |
def __hash__(self): | |
return hash((self.host, self.username)) | |
host: str | |
username: Optional[str] = None | |
def __eq__(self, other): | |
return isinstance(other, SlurmCluster) and (self.host, self.username) == (other.host, other.username) | |
def __hash__(self): | |
return hash((self.host, self.username)) |
Code Review Run #ab6b57
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
with tempfile.NamedTemporaryFile("w") as f: | ||
f.write(script) | ||
f.flush() | ||
async with conn.start_sftp_client() as sftp: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider catching specific exceptions instead of using a broad except Exception
clause.
Code suggestion
Check the AI-generated fix before applying
async with conn.start_sftp_client() as sftp: | |
except (ConnectionError, TimeoutError, OSError) as e: |
Code Review Run #ab6b57
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
return Resource(phase=cur_phase, message=msg) | ||
|
||
async def delete(self, resource_meta: SlurmJobMetadata, **kwargs) -> None: | ||
conn = await self._get_or_create_ssh_connection(resource_meta.ssh_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling around the SSH connection management. The _get_or_create_ssh_connection
method could fail if the SSH connection cannot be established or if there are network issues. Consider wrapping the call with a try-catch block and implementing appropriate error handling.
Code suggestion
Check the AI-generated fix before applying
- conn = await self._get_or_create_ssh_connection(resource_meta.ssh_config)
- _ = await conn.run(f"scancel {resource_meta.job_id}", check=True)
+ try:
+ conn = await self._get_or_create_ssh_connection(resource_meta.ssh_config)
+ _ = await conn.run(f"scancel {resource_meta.job_id}", check=True)
+ except Exception as e:
+ logger.error(f"Failed to establish SSH connection: {e}")
+ raise
Code Review Run #ab6b57
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
def __eq__(self, other): | ||
if not isinstance(other, SSHConfig): | ||
return False | ||
return ( | ||
self.host == other.host and | ||
self.username == other.username and | ||
self.client_keys == other.client_keys and | ||
self.known_hosts == other.known_hosts | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a __hash__
method when implementing __eq__
to maintain the object's hash contract. Objects that are equal should have the same hash value.
Code suggestion
Check the AI-generated fix before applying
def __eq__(self, other): | |
if not isinstance(other, SSHConfig): | |
return False | |
return ( | |
self.host == other.host and | |
self.username == other.username and | |
self.client_keys == other.client_keys and | |
self.known_hosts == other.known_hosts | |
) | |
def __eq__(self, other): | |
if not isinstance(other, SSHConfig): | |
return False | |
return ( | |
self.host == other.host and | |
self.username == other.username and | |
self.client_keys == other.client_keys and | |
self.known_hosts == other.known_hosts | |
) | |
def __hash__(self): | |
return hash((self.host, self.username, self.client_keys, self.known_hosts)) |
Code Review Run #ab6b57
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Signed-off-by: Future-Outlier <[email protected]>
1. Ensure `"host"` must be provided in `__post_init__` 2. Explicitly set `known_hosts` to `None` 3. Make `username` optional 4. Remove legacy code snippets 5. Make docstring clear Signed-off-by: JiangJiaWei1103 <[email protected]>
Code Review Agent Run #25f976Actionable Suggestions - 6
Review Details
|
else: | ||
conn = self.ssh_config_to_ssh_conn[ssh_cluster_config] | ||
try: | ||
await conn.run("echo [TEST] SSH connection", check=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a timeout parameter to the SSH connection test command to prevent potential hanging. The echo
test command could be enhanced with a timeout like timeout 5s echo [TEST] SSH connection
.
Code suggestion
Check the AI-generated fix before applying
await conn.run("echo [TEST] SSH connection", check=True) | |
await conn.run("timeout 5s echo [TEST] SSH connection", check=True) |
Code Review Run #25f976
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
try: | ||
await conn.run("echo [TEST] SSH connection", check=True) | ||
logger.info("re-using new connection") | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider catching specific exceptions instead of using a broad Exception
catch. This helps in better error handling and debugging.
Code suggestion
Check the AI-generated fix before applying
except Exception as e: | |
except (ConnectionError, asyncssh.SSHError) as e: |
Code Review Run #25f976
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
""" | ||
|
||
host: str | ||
username: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for username
when it's None
. The username
parameter is now optional but there's no clear indication of how None
values are handled in the SSH connection logic.
Code suggestion
Check the AI-generated fix before applying
@@ -63,2 +63,6 @@
# Validate ssh_config
ssh_config = SSHConfig.from_dict(ssh_config).to_dict()
+ # Ensure username is set
+ if ssh_config['username'] is None:
+ ssh_config['username'] = os.getenv('USER')
+
Code Review Run #25f976
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
def __eq__(self, other): | ||
if not isinstance(other, SSHConfig): | ||
return False | ||
return self.host == other.host and self.username == other.username and self.client_keys == other.client_keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider including known_hosts
comparison in the __eq__
method for complete equality check between SSHConfig
objects. The current implementation may lead to unexpected behavior when two objects with different known_hosts
values are considered equal.
Code suggestion
Check the AI-generated fix before applying
return self.host == other.host and self.username == other.username and self.client_keys == other.client_keys | |
return self.host == other.host and self.username == other.username and self.client_keys == other.client_keys and self.known_hosts == other.known_hosts |
Code Review Run #25f976
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
""" | ||
# Validate ssh_config | ||
ssh_config = SSHConfig.from_dict(ssh_config).to_dict() | ||
ssh_config["known_hosts"] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting known_hosts
to None
disables SSH host key verification, which could expose the connection to man-in-the-middle attacks. Consider using proper host key verification by either providing a known_hosts file or documenting why this security measure is disabled.
Code suggestion
Check the AI-generated fix before applying
ssh_config["known_hosts"] = None | |
ssh_config["known_hosts"] = os.path.expanduser("~/.ssh/known_hosts") |
Code Review Run #25f976
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
script: Optional[str] = None | ||
|
||
def __post_init__(self): | ||
assert self.ssh_config["host"] is not None, "'host' must be specified in ssh_config." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider replacing assert
statement with a proper validation check. Using assert
statements in production code is not recommended as they can be disabled with Python's -O flag.
Code suggestion
Check the AI-generated fix before applying
assert self.ssh_config["host"] is not None, "'host' must be specified in ssh_config." | |
if self.ssh_config["host"] is None: | |
raise ValueError("'host' must be specified in ssh_config.") |
Code Review Run #25f976
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Tracking issue
flyteorg/flyte#5634
Why are the changes needed?
What changes were proposed in this pull request?
Implement the Slurm agent, which submits the user-defined flytekit task to a remote Slurm cluster to run. Following describe three core methods:
create
: Submit a Slurm job withsbatch
to run a batch script on Slurm clusterget
: Check the Slurm job statedelete
(haven't been tested): Cancel the Slurm jobHow was this patch tested?
We test
create
andget
in the development environment described as follows:flytekit
installedslurmctld
andslurmd
runningasyncssh
Suppose we have a batch script to run on Slurm cluster:
We use the following python script to test Slurm agent on the client side:
The test result is shown as follows:

Setup process
As stated above
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
Implementation of a Slurm agent for Flytekit with enhanced SSH connection handling, featuring connection pooling, caching, and auto-reconnection. The agent implements robust authentication and shared memory support through asyncssh, with improved thread safety and SSH configuration validation. The changes introduce proper error handling, interactive bash script execution, and a SlurmCluster dataclass with centralized connection management. Enhanced logging system replaces print statements with proper logger calls.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5