Skip to content

Commit

Permalink
:rocket add RayRunLauncher
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 8, 2024
1 parent 8696cff commit c3240f0
Show file tree
Hide file tree
Showing 21 changed files with 654 additions and 47 deletions.
79 changes: 68 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

`dagster-ray` allows creating Ray clusters and running distributed computations from Dagster code. Features include:

- `ray_execturo` - an Executor which runs Dagster steps a jobs submitted to a Ray cluster.
- `RayRunLauncher` - a RunLauncher which runs Dagster runs as Ray jobs (cluster mode) submitted to an existing Ray cluster.

- `PipesRayJobClient`, a [Dagster Pipes](https://docs.dagster.io/concepts/dagster-pipes) client for launching and monitoring `RayJob` resources in Kubernetes via [KubeRay](https://github.com/ray-project/kuberay). Most suitable for submitting long-running jobs (via external Python scripts) with no direct Ray access from Dagster code. Allows receiving rich logs, events and metadata from the job. Implemented for the `KubeRay` backend.
- `ray_executor` - an Executor which runs individual Dagster steps as separate Ray jobs (cluster mode) submitted to an existing Ray cluster.

- `PipesKubeRayJobClient`, a [Dagster Pipes](https://docs.dagster.io/concepts/dagster-pipes) client for launching and monitoring `RayJob` resources in Kubernetes via [KubeRay](https://github.com/ray-project/kuberay). Executes external Pythons cripts. Allows receiving rich logs, events and metadata from the job.

- `RayResource`, a resource representing a Ray cluster. Interactions are performed in client mode (requires stable persistent connection), so it's most suitable for relatively short jobs. Provide direct Ray access from the Dagster Python process. It has implementations for `KubeRay` and local (mostly for testing) backends. `dagster_ray.RayResource` defines the common interface shared by all backends and can be used for backend-agnostic type annotations.

Expand All @@ -29,6 +31,23 @@ Documentation can be found below.
> [!NOTE]
> This project is in early development. APIs are unstable and can change at any time. Contributions are very welcome! See the [Development](#development) section below.
# Feature Matrix

There are different options available for running Dagster code on Ray. The following table summarizes the features of each option:

| Feature | `RayRunLauncher` | `ray_executor` | `PipesKubeRayJobClient` | `KubeRayCluster` |
| --- | --- | --- | --- | --- |
| Creates Ray cluster |||||
| Executes in cluster mode |||||
| For long-running jobs |||||
| Enabled per-asset |||||
| Configurable per-asset |||||
| Automatically runs asset/op body |||||

# Examples

See the [examples](examples) directory.

# Installation

```shell
Expand All @@ -41,6 +60,42 @@ To install with extra dependencies for a particular backend (like `kuberay`), ru
pip install 'dagster-ray[kuberay]'
```

# RunLauncher

> [!WARNING]
> The `RayRunLauncher` is a work in progress
The `RayRunLauncher` can be configured via `dagster.yaml`:

```yaml
run_launcher:
module: dagster_ray
class: RayRunLauncher
config:
num_cpus: 1
num_gpus: 0
```
Individual Runs can override Ray configuration:
```python
from dagster import job


@job(
tags={
"dagster-ray/config": {
"num_cpus": 16,
"num_gpus": 1,
}
}
)
def my_job():
return my_op()
```
# Executor
> [!WARNING]
Expand Down Expand Up @@ -86,27 +141,29 @@ This backend requires a Kubernetes cluster with `KubeRay Operator` installed.
Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and Kubernetes labels.

To run `ray` code in client mode (from the Dagster Python process directly), use the `KubeRayClient` resource (see the [KubeRayCluster](#KubeRayCluster) section).
To run `ray` code in job mode, use the `PipesRayJobClient` with Dagster Pipes (see the [Pipes](#pipes) section).
To run `ray` code in job mode, use the `PipesKubeRayJobClient` with Dagster Pipes (see the [Pipes](#pipes) section).

The public objects can be imported from `dagster_ray.kuberay` module.

### Pipes

`dagster-ray` provides the `PipesRayJobClient` which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
`dagster-ray` provides the `PipesKubeRayJobClient` which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits. Doesn't require a persistent connection to the Ray cluster.

Examples:

In Dagster code, import `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:
In Dagster code, import `PipesKubeRayJobClient` and invoke it inside an `@op` or an `@asset`:

```python
from dagster import AssetExecutionContext, Definitions, asset
from dagster_ray.kuberay import PipesRayJobClient
from dagster_ray.kuberay import PipesKubeRayJobClient
@asset
def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobClient):
def my_asset(
context: AssetExecutionContext, pipes_rayjob_client: PipesKubeRayJobClient
):
pipes_rayjob_client.run(
context=context,
ray_job={
Expand All @@ -130,7 +187,7 @@ def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobCli


definitions = Definitions(
resources={"pipes_rayjob_client": PipesRayJobClient()}, assets=[my_asset]
resources={"pipes_rayjob_client": PipesKubeRayJobClient()}, assets=[my_asset]
)
```

Expand All @@ -157,16 +214,16 @@ import yaml
ray_job = {"spec": {"runtimeEnvYaml": yaml.safe_dump({"pip": ["dagster-pipes"]})}}
```

Events emitted by the Ray job will be captured by `PipesRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.
Events emitted by the Ray job will be captured by `PipesKubeRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.

**Running locally**

When running locally, the `port_forward` option has to be set to `True` in the `PipesRayJobClient` resource in order to interact with the Ray job. For convenience, it can be set automatically with:
When running locally, the `port_forward` option has to be set to `True` in the `PipesKubeRayJobClient` resource in order to interact with the Ray job. For convenience, it can be set automatically with:

```python
from dagster_ray.kuberay.configs import in_k8s

pipes_rayjob_client = PipesRayJobClient(..., port_forward=not in_k8s)
pipes_rayjob_client = PipesKubeRayJobClient(..., port_forward=not in_k8s)
```

### Resources
Expand Down
3 changes: 2 additions & 1 deletion dagster_ray/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dagster_ray._base.resources import BaseRayResource
from dagster_ray.executor import ray_executor
from dagster_ray.run_launcher import RayRunLauncher

RayResource = BaseRayResource


__all__ = ["RayResource", "ray_executor"]
__all__ = ["RayResource", "RayRunLauncher", "ray_executor"]
10 changes: 5 additions & 5 deletions dagster_ray/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any, Dict, Optional
from typing import Any, Dict, Mapping, Optional

from dagster import Config
from pydantic import Field
Expand All @@ -16,27 +16,27 @@ class RayExecutionConfig(Config):
resources: Optional[Dict[str, float]] = Field(default=None, description="Custom resources to allocate.")

@classmethod
def from_tags(cls, tags: Dict[str, str]) -> RayExecutionConfig:
def from_tags(cls, tags: Mapping[str, str]) -> RayExecutionConfig:
if USER_DEFINED_RAY_KEY in tags:
return cls.parse_raw(tags[USER_DEFINED_RAY_KEY])
else:
return cls()


class RayJobSubmissionClientConfig(Config):
address: str = Field(..., description="The address of the Ray cluster to connect to.")
address: str = Field(default=None, description="The address of the Ray cluster to connect to.")
metadata: Optional[Dict[str, Any]] = Field(
default=None,
description="""Arbitrary metadata to store along with all jobs. New metadata
specified per job will be merged with the global metadata provided here
via a simple dict update.""",
)
headers: Optional[Dict[str, str]] = Field(
headers: Optional[Dict[str, Any]] = Field(
default=None,
description="""Headers to use when sending requests to the HTTP job server, used
for cases like authentication to a remote cluster.""",
)
cookies: Optional[Dict[str, str]] = Field(
cookies: Optional[Dict[str, Any]] = Field(
default=None, description="Cookies to use when sending requests to the HTTP job server."
)

Expand Down
62 changes: 37 additions & 25 deletions dagster_ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@
StepHandlerContext,
)
from dagster._utils.merger import merge_dicts
from dagster_k8s.job import (
get_k8s_job_name,
)
from dagster_k8s.launcher import K8sRunLauncher
from pydantic import Field

from dagster_ray.config import RayExecutionConfig, RayJobSubmissionClientConfig
from dagster_ray.kuberay.resources import get_k8s_object_name
from dagster_ray.run_launcher import RayRunLauncher

if TYPE_CHECKING:
pass
from ray.job_submission import JobSubmissionClient


class RayExecutorConfig(RayExecutionConfig, RayJobSubmissionClientConfig): ...
class RayExecutorConfig(RayExecutionConfig, RayJobSubmissionClientConfig):
address: Optional[str] = Field(default=None, description="The address of the Ray cluster to connect to.") # type: ignore
# sorry for the long name, but it has to be very clear what this is doing
inherit_job_submission_client_from_ray_run_launcher: bool = True


_RAY_CONFIG_SCHEMA = RayExecutorConfig.to_config_schema().as_field()
Expand All @@ -51,18 +53,33 @@ def ray_executor(init_context: InitExecutorContext) -> Executor:
"""Executes steps by submitting them as Ray jobs.
The steps are started inside the Ray cluster directly.
When used together with the `RayRunLauncher`, the executor can inherit the job submission client configuration.
This behavior can be disabled by setting `inherit_job_submission_client_from_ray_run_launcher` to `False`.
"""
# TODO: some RunLauncher config values can be automatically passed to the executor
run_launcher = ( # noqa
init_context.instance.run_launcher if isinstance(init_context.instance.run_launcher, K8sRunLauncher) else None
)
from ray.job_submission import JobSubmissionClient

exc_cfg = init_context.executor_config

ray_cfg = RayExecutorConfig(**exc_cfg["ray"]) # type: ignore

if ray_cfg.inherit_job_submission_client_from_ray_run_launcher and isinstance(
init_context.instance.run_launcher, RayRunLauncher
):
# TODO: some RunLauncher config values can be automatically passed to the executor
client = init_context.instance.run_launcher.client
else:
client = JobSubmissionClient(
ray_cfg.address, metadata=ray_cfg.metadata, headers=ray_cfg.headers, cookies=ray_cfg.cookies
)

return StepDelegatingExecutor(
RayStepHandler(address=ray_cfg.address, runtime_env=ray_cfg.runtime_env),
RayStepHandler(
client=client,
runtime_env=ray_cfg.runtime_env,
num_cpus=ray_cfg.num_cpus,
num_gpus=ray_cfg.num_gpus,
memory=ray_cfg.memory,
resources=ray_cfg.resources,
),
retries=RetryMode.from_config(exc_cfg["retries"]), # type: ignore
max_concurrent=check.opt_int_elem(exc_cfg, "max_concurrent"),
tag_concurrency_limits=check.opt_list_elem(exc_cfg, "tag_concurrency_limits"),
Expand All @@ -77,21 +94,16 @@ def name(self):

def __init__(
self,
address: str,
runtime_env: Optional[Dict[str, Any]] = None,
num_cpus: Optional[int] = None,
num_gpus: Optional[int] = None,
memory: Optional[int] = None,
resources: Optional[Dict[str, float]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
client: "JobSubmissionClient",
runtime_env: Optional[Dict[str, Any]],
num_cpus: Optional[int],
num_gpus: Optional[int],
memory: Optional[int],
resources: Optional[Dict[str, float]],
):
super().__init__()

from ray.job_submission import JobSubmissionClient

self.client = JobSubmissionClient(address, metadata=metadata, headers=headers, cookies=cookies)
self.client = client
self.runtime_env = runtime_env or {}
self.num_cpus = num_cpus
self.num_gpus = num_gpus
Expand All @@ -106,7 +118,7 @@ def _get_step_key(self, step_handler_context: StepHandlerContext) -> str:
def _get_ray_job_submission_id(self, step_handler_context: StepHandlerContext):
step_key = self._get_step_key(step_handler_context)

name_key = get_k8s_job_name(
name_key = get_k8s_object_name(
step_handler_context.execute_step_args.run_id,
step_key,
)
Expand Down
2 changes: 1 addition & 1 deletion dagster_ray/kuberay/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


@experimental
class PipesRayJobClient(PipesClient, TreatAsResourceParam):
class PipesKubeRayJobClient(PipesClient, TreatAsResourceParam):
"""A pipes client for running ``RayJob`` on Kubernetes.
Args:
Expand Down
Loading

0 comments on commit c3240f0

Please sign in to comment.