Skip to content

Commit

Permalink
✨ add PipesRayJobClient
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 18, 2024
1 parent 1346beb commit 213e944
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 104 deletions.
228 changes: 140 additions & 88 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

- `RayIOManager` - an `IOManager` which allows storing and retrieving intermediate values in Ray's object store. Ideal in conjunction with `RayRunLauncher` and `ray_executor`.

- `PipesKubeRayJobClient`, a [Dagster Pipes](https://docs.dagster.io/concepts/dagster-pipes) client for launching and monitoring [KubeRay](https://github.com/ray-project/kuberay)'s `RayJob` CR in Kubernetes. Typically used with external Pythons scripts. Allows receiving rich logs, events and metadata from the job.
- `PipesRayjobClient`, a [Dagster Pipes](https://docs.dagster.io/concepts/dagster-pipes) client for launching and monitoring Ray jobs on remote clusters. Typically used with external Pythons scripts. Allows receiving rich logs, events and metadata from the job. Doesn't handle cluster management, can be used with any Ray cluster.

- `PipesKubeRayJobClient`, a [Dagster Pipes](https://docs.dagster.io/concepts/dagster-pipes) client for launching and monitoring [KubeRay](https://github.com/ray-project/kuberay)'s `RayJob` CR on Kubernetes. Typically used with external Pythons scripts. Allows receiving rich logs, events and metadata from the job.

- `RayResource`, a resource representing a Ray cluster. Interactions with Ray are performed in **client mode** (requires stable persistent connection), so it's most suitable for relatively short-lived jobs. It has implementations for `KubeRay` and local (mostly for testing) backends. `dagster_ray.RayResource` defines the common interface shared by all backends and can be used for backend-agnostic type annotations.

Expand All @@ -37,15 +39,15 @@ Documentation can be found below.

There are different options available for running Dagster code on Ray. The following table summarizes the features of each option:

| Feature | `RayRunLauncher` | `ray_executor` | `PipesKubeRayJobClient` | `KubeRayCluster` |
| Feature | `RayRunLauncher` | `ray_executor` | `PipesRayJobClient` | `PipesKubeRayJobClient` | `KubeRayCluster` |
| --- | --- | --- | --- | --- |
| Creates Ray cluster |||||
| Cluster mode |||||
| For long-running jobs |||||
| Enabled per-asset |||||
| Configurable per-asset |||||
| Runs asset/op body on Ray |||||
| Requires configuring Dagster in the Ray cluster |||||
| Creates Ray cluster ||| | ||
| Cluster mode |||| | |
| For long-running jobs |||| | |
| Enabled per-asset ||||||
| Configurable per-asset ||||||
| Doesn't need an external script |||| | |
| Requires configuring Dagster in the Ray cluster ||||||

# Examples

Expand All @@ -63,7 +65,9 @@ To install with extra dependencies for a particular backend (like `kuberay`), ru
pip install 'dagster-ray[kuberay]'
```

# RunLauncher
# Features

## RunLauncher

> [!WARNING]
> The `RayRunLauncher` is a work in progress
Expand Down Expand Up @@ -103,7 +107,7 @@ def my_job():
return my_op()
```
# Executor
## Executor
> [!WARNING]
> The `ray_executor` is a work in progress
Expand Down Expand Up @@ -150,13 +154,11 @@ Fields in the `dagster-ray/config` tag **override** corresponding fields in the

## IOManager

`RayIOManager` allows storing and retrieving intermediate values in Ray's object store. It can be used in conjunction with `RayRunLauncher` and `ray_executor` to store and retrieve intermediate values in a Ray cluster.
`RayIOManager` allows storing and retrieving intermediate values in Ray's object store. Most useful in conjunction with `RayRunLauncher` and `ray_executor`.

It works by storing Dagster step keys in a global Ray actor. This actor contains a mapping between step keys and Ray `ObjectRef`s. It can be used with any pickable Python objects.




```python
from dagster import asset, Definitions
from dagster_ray import RayIOManager
Expand Down Expand Up @@ -230,9 +232,120 @@ def downstream_unpartitioned(upstream: Dict[str, str]) -> None:
assert upstream == {"A": "a", "B": "b", "C": "c"}
```

# Backends
## Pipes

### `PipesRayJobClient`

A general-purpose Ray job client that can be used to submit Ray jobs and receive logs and Dagster events from them. It doesn't manage the cluser lifecycle and can be used with any Ray cluster.

Examples:

In Dagster code, import `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:

```python
from dagster import AssetExecutionContext, Definitions, asset
from dagster_ray import PipesRayJobClient
from ray.job_submission import JobSubmissionClient
@asset
def my_asset(context: AssetExecutionContext, pipes_ray_job_client: PipesRayJobClient):
pipes_ray_job_client.run(
context=context,
submit_job_params={
"entrypoint": "python /app/my_script.py",
},
extra={"param": "value"},
)
definitions = Definitions(
resources={"pipes_ray_job_client": PipesRayJobClient(client=JobSubmissionClient())},
assets=[my_asset],
)
```

In the Ray job, import `dagster_pipes` (must be provided as a dependency) and emit regular Dagster events such as logs or asset materializations:

```python
from dagster_pipes import open_dagster_pipes
with open_dagster_pipes() as context:
assert context.get_extra("param") == "value"
context.log.info("Hello from Ray Pipes!")
context.report_asset_materialization(
metadata={"some_metric": {"raw_value": 57, "type": "int"}},
data_version="alpha",
)
```

A convenient way to provide `dagster-pipes` to the Ray job is [runtime_env](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html):

```python
submit_job_params = {
"entrypoint": "python /app/my_script.py",
"runtime_env": {"pip": ["dagster-pipes"]},
}
```

Events emitted by the Ray job will be captured by `PipesRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.

## Resources

### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import LocalRay, RayResource
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
## KubeRay
definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import LocalRay, RayResource
from dagster_ray.kuberay import KubeRayCluster
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
IN_K8s = ...
definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```

# KubeRay

```shell
pip install dagster-ray[kuberay]
Expand All @@ -247,7 +360,9 @@ To run `ray` code in job mode, use the `PipesKubeRayJobClient` with Dagster Pipe

The public objects can be imported from `dagster_ray.kuberay` module.

### Pipes
## Pipes

### `PipesKubeRayJobClient`

`dagster-ray` provides the `PipesKubeRayJobClient` which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits. Doesn't require a persistent connection to the Ray cluster.
Expand Down Expand Up @@ -328,9 +443,9 @@ from dagster_ray.kuberay.configs import in_k8s
pipes_rayjob_client = PipesKubeRayJobClient(..., port_forward=not in_k8s)
```

### Resources
## Resources

#### `KubeRayCluster`
### `KubeRayCluster`

`KubeRayCluster` can be used for running Ray computations on Kubernetes in client (interactive) mode. Requires stable persistent connection through the duration of the Dagster step.

Expand Down Expand Up @@ -384,7 +499,7 @@ ray_cluster = KubeRayCluster(
)
)
```
#### `KubeRayClient`
### `KubeRayClient`

This resource can be used to interact with the Kubernetes API Server.

Expand All @@ -404,87 +519,24 @@ def list_ray_clusters(
return kube_ray_client.client.list(namespace="kuberay")
```

### Jobs
## Jobs

#### `delete_kuberay_clusters`
### `delete_kuberay_clusters`

This `job` can be used to delete `RayClusters` from a given list of names.

#### `cleanup_old_ray_clusters`
### `cleanup_old_ray_clusters`

This `job` can be used to delete old `RayClusters` which no longer correspond to any active Dagster Runs.
They may be left behind if the automatic cluster cleanup was disabled or failed.

### Schedules
## Schedules

Cleanup schedules can be trivially created using the `cleanup_old_ray_clusters` or `delete_kuberay_clusters` jobs.

#### `cleanup_old_ray_clusters`
### `cleanup_old_ray_clusters`
`dagster-ray` provides an example daily cleanup schedule.


## Local

These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.

The public objects can be imported from `dagster_ray.local` module.

### Resources

#### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


IN_K8s = ...


definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```

# Development

```shell
Expand Down
10 changes: 9 additions & 1 deletion dagster_ray/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from dagster_ray._base.resources import BaseRayResource
from dagster_ray.executor import ray_executor
from dagster_ray.io_manager import RayIOManager
from dagster_ray.pipes import PipesRayJobClient, PipesRayJobMessageReader
from dagster_ray.run_launcher import RayRunLauncher

RayResource = BaseRayResource


__all__ = ["RayResource", "RayRunLauncher", "RayIOManager", "ray_executor"]
__all__ = [
"RayResource",
"RayRunLauncher",
"RayIOManager",
"ray_executor",
"PipesRayJobMessageReader",
"PipesRayJobClient",
]
10 changes: 5 additions & 5 deletions dagster_ray/kuberay/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from dagster_ray._base.utils import get_dagster_tags
from dagster_ray.kuberay.client import RayJobClient
from dagster_ray.kuberay.client.rayjob.client import RayJobStatus
from dagster_ray.pipes import PipesRayJobSubmissionClientMessageReader
from dagster_ray.pipes import PipesRayJobMessageReader, generate_job_id

if TYPE_CHECKING:
from ray.job_submission import JobSubmissionClient
Expand All @@ -33,7 +33,7 @@ class PipesKubeRayJobClient(PipesClient, TreatAsResourceParam):
context_injector (Optional[PipesContextInjector]): A context injector to use to inject
context into the ``RayJob``. Defaults to :py:class:`PipesEnvContextInjector`.
message_reader (Optional[PipesMessageReader]): A message reader to use to read messages
from the glue job run. Defaults to :py:class:`PipesRayJobSubmissionClientMessageReader`.
from the glue job run. Defaults to :py:class:`PipesRayJobMessageReader`.
client (Optional[boto3.client]): The Kubernetes API client.
forward_termination (bool): Whether to cancel the `RayJob` job run when the Dagster process receives a termination signal.
timeout (int): Timeout for various internal interactions with the Kubernetes RayJob.
Expand All @@ -55,7 +55,7 @@ def __init__(
self.client: RayJobClient = client or RayJobClient()

self._context_injector = context_injector or PipesEnvContextInjector()
self._message_reader = message_reader or PipesRayJobSubmissionClientMessageReader()
self._message_reader = message_reader or PipesRayJobMessageReader()

self.forward_termination = check.bool_param(forward_termination, "forward_termination")
self.timeout = check.int_param(timeout, "timeout")
Expand Down Expand Up @@ -130,7 +130,7 @@ def _enrich_ray_job(
ray_job["metadata"] = ray_job.get("metadata", {})
ray_job["metadata"]["labels"] = ray_job["metadata"].get("labels", {})

ray_job["metadata"]["name"] = ray_job["metadata"].get("name", f"dg-{context.run.run_id[:8]}")
ray_job["metadata"]["name"] = ray_job["metadata"].get("name", f"pipes-{generate_job_id()}")
ray_job["metadata"]["labels"].update(self.get_dagster_tags(context))

# update env vars in runtimeEnv
Expand Down Expand Up @@ -179,7 +179,7 @@ def _start(self, context: OpExecutionContext, ray_job: Dict[str, Any]) -> Dict[s
def _read_messages(self, context: OpExecutionContext, start_response: Dict[str, Any]) -> None:
status = cast(RayJobStatus, start_response["status"])

if isinstance(self._message_reader, PipesRayJobSubmissionClientMessageReader):
if isinstance(self._message_reader, PipesRayJobMessageReader):
# starts a thread
self._message_reader.consume_job_logs(
# TODO: investigate why some messages aren't being handled with blocking=False
Expand Down
3 changes: 0 additions & 3 deletions dagster_ray/local/__init__.py

This file was deleted.

Loading

0 comments on commit 213e944

Please sign in to comment.