Skip to content

Commit

Permalink
⭐ 💥 support RayJob via Dagster Pipes; rework of everything
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Sep 16, 2024
1 parent 740087f commit 88ad80b
Show file tree
Hide file tree
Showing 25 changed files with 1,229 additions and 589 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ jobs:
os:
- Ubuntu
py:
# - "3.12"
- "3.12"
- "3.11"
- "3.10"
- "3.9"
# - "3.8"
kubernetes:
- "1.31.0"
- "1.30.0"
- "1.29.0"
steps:
- name: Setup python for test ${{ matrix.py }}
uses: actions/setup-python@v4
Expand All @@ -54,7 +57,8 @@ jobs:
run: poetry install --all-extras --sync
- name: Run tests
env:
PYTEST_KUBERAY_VERSIONS: "1.0.0,1.1.0" # will run tests for all these KubeRay versions
PYTEST_KUBERAY_VERSIONS: "1.0.0,1.1.0,1.2.0" # will run tests for all these KubeRay versions
PYTEST_KUBERNETES_VERSION: ${{ matrix.kubernetes }}
run: pytest -v .

lint:
Expand Down
70 changes: 65 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,70 @@ definitions = Definitions(

This backend requires a Kubernetes cluster with the `KubeRay Operator` installed.

Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and `RayCluster` labels.
Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and Kubernetes labels.

To run `ray` code in client mode (from the Dagster Python process directly), use the `KubeRayClient` resource (see the [KubeRayCluster](#KubeRayCluster) section).
To run `ray` code in job mode, use the `PipesRayJobClient` with Dagster Pipes (see the [Pipes](#pipes) section).

The public objects can be imported from `dagster_ray.kuberay` module.

### Pipes

`dagster-ray` provides the `PipesRayJobClient` which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits.

Examples:

On the orchestration side, import the `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:

```python
from dagster import asset, Definitions

from dagster_ray.kuberay import PipesRayJobClient


@asset
def my_asset(pipes_ray_job_client: PipesRayJobClient):
pipes_ray_job_client.run(
ray_job={
# RayJob manifest goes here, only .metadata.name is not required and will be generated if not provided
# full reference: https://ray-project.github.io/kuberay/reference/api/#rayjob
...
},
extra={"foo": "bar"},
)


definitions = Definitions(
resources={"pipes_ray_job_client": PipesRayJobClient()}, assets=[my_asset]
)
```

In the Ray job, import `dagster_pipes` (must be provided as a dependency) and emit regular Dagster events such as logs or asset materializations:

```python
from dagster_pipes import open_dagster_pipes


with open_dagster_pipes() as pipes:
pipes.log("Hello from Ray!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)
```

A convenient way to provide `dagster-pipes` to the Ray job is with `runtimeEnvYaml` field:

```python
import yaml

ray_job = {"spec": {"runtimeEnvYaml": yaml.safe_dump({"pip": ["dagster-pipes"]})}}
```

The logs and events emitted by the Ray job will be captured by the `PipesRayJobClient` and will become available in the Dagster event log.
Normal stdout & stderr will be forwarded to stdout.

### Resources

#### `KubeRayCluster`
Expand Down Expand Up @@ -175,7 +235,7 @@ ray_cluster = KubeRayCluster(
)
)
```
#### `KubeRayAPI`
#### `KubeRayClient`

This resource can be used to interact with the Kubernetes API Server.

Expand All @@ -185,14 +245,14 @@ Listing currently running `RayClusters`:

```python
from dagster import op, Definitions
from dagster_ray.kuberay import KubeRayAPI
from dagster_ray.kuberay import KubeRayClient


@op
def list_ray_clusters(
kube_ray_api: KubeRayAPI,
kube_ray_client: KubeRayClient,
):
return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace="kuberay")
return kube_ray_client.client.list(namespace="kuberay")
```

### Jobs
Expand Down
9 changes: 9 additions & 0 deletions dagster_ray/_base/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os

DEFAULT_DEPLOYMENT_NAME = (
os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME")
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "0"
else os.getenv("DAGSTER_CLOUD_GIT_BRANCH")
) or "dev"

IS_PROD = DEFAULT_DEPLOYMENT_NAME == "prod"
14 changes: 6 additions & 8 deletions dagster_ray/_base/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import sys
import uuid
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional, Union, cast
from typing import TYPE_CHECKING, Dict, Optional, Union, cast

from dagster import ConfigurableResource, InitResourceContext, OpExecutionContext
from pydantic import Field, PrivateAttr
Expand All @@ -11,14 +10,9 @@
from requests.exceptions import ConnectionError
from tenacity import retry, retry_if_exception_type, stop_after_delay

from dagster_ray._base.utils import get_dagster_tags
from dagster_ray.config import RayDataExecutionOptions

if sys.version_info >= (3, 11):
pass
else:
pass


if TYPE_CHECKING:
from ray._private.worker import BaseContext as RayBaseContext # noqa

Expand Down Expand Up @@ -86,6 +80,10 @@ def init_ray(self, context: Union[OpExecutionContext, InitResourceContext]) -> "
context.log.info("Initialized Ray!")
return cast("RayBaseContext", self._context)

def get_dagster_tags(self, context: InitResourceContext) -> Dict[str, str]:
tags = get_dagster_tags(context)
return tags

def _get_step_key(self, context: InitResourceContext) -> str:
# just return a random string
# since we want a fresh cluster every time
Expand Down
30 changes: 30 additions & 0 deletions dagster_ray/_base/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
from typing import Dict, Union, cast

from dagster import InitResourceContext, OpExecutionContext

from dagster_ray._base.constants import DEFAULT_DEPLOYMENT_NAME


def get_dagster_tags(context: Union[InitResourceContext, OpExecutionContext]) -> Dict[str, str]:
"""
Returns a dictionary with common Dagster tags.
"""
assert context.dagster_run is not None

labels = {
"dagster.io/run_id": cast(str, context.run_id),
"dagster.io/deployment": DEFAULT_DEPLOYMENT_NAME,
# TODO: add more labels
}

if context.dagster_run.tags.get("user"):
labels["dagster.io/user"] = context.dagster_run.tags["user"]

if os.getenv("DAGSTER_CLOUD_GIT_BRANCH"):
labels["dagster.io/git-branch"] = os.environ["DAGSTER_CLOUD_GIT_BRANCH"]

if os.getenv("DAGSTER_CLOUD_GIT_SHA"):
labels["dagster.io/git-sha"] = os.environ["DAGSTER_CLOUD_GIT_SHA"]

return labels
7 changes: 5 additions & 2 deletions dagster_ray/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

from typing import Optional

import ray
from dagster import Config
from pydantic import Field
from ray.data import ExecutionResources


class ExecutionOptionsConfig(Config):
Expand All @@ -23,6 +21,9 @@ class RayDataExecutionOptions(Config):
use_polars: bool = True

def apply(self):
import ray
from ray.data import ExecutionResources

ctx = ray.data.DatasetContext.get_current()

ctx.execution_options.resource_limits = ExecutionResources.for_limits(
Expand All @@ -35,6 +36,8 @@ def apply(self):
ctx.use_polars = self.use_polars

def apply_remote(self):
import ray

@ray.remote
def apply():
self.apply()
Expand Down
4 changes: 2 additions & 2 deletions dagster_ray/kuberay/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from dagster_ray.kuberay.configs import RayClusterConfig
from dagster_ray.kuberay.jobs import cleanup_kuberay_clusters, delete_kuberay_clusters
from dagster_ray.kuberay.ops import cleanup_kuberay_clusters_op, delete_kuberay_clusters_op
from dagster_ray.kuberay.resources import KubeRayAPI, KubeRayCluster
from dagster_ray.kuberay.resources import KubeRayCluster, RayClusterClientResource
from dagster_ray.kuberay.schedules import cleanup_kuberay_clusters_daily

__all__ = [
"KubeRayCluster",
"RayClusterConfig",
"KubeRayAPI",
"RayClusterClientResource",
"cleanup_kuberay_clusters",
"delete_kuberay_clusters",
"cleanup_kuberay_clusters_op",
Expand Down
4 changes: 4 additions & 0 deletions dagster_ray/kuberay/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from dagster_ray.kuberay.client.raycluster import RayClusterClient
from dagster_ray.kuberay.client.rayjob import RayJobClient

__all__ = ["RayClusterClient", "RayJobClient"]
Loading

0 comments on commit 88ad80b

Please sign in to comment.