Skip to content

Commit

Permalink
wait until RayCluster is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Sep 16, 2024
1 parent 5c09f36 commit 589af20
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 30 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN --mount=type=cache,target=/var/cache/apt \
apt-get update && apt-get install -y git jq curl gcc python3-dev libpq-dev wget

COPY --from=bitnami/kubectl:1.30.3 /opt/bitnami/kubectl/bin/kubectl /usr/local/bin/

# install poetry
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ The following backends are implemented:
Documentation can be found below.

> [!NOTE]
> This project is in early development. Contributions are very welcome! See the [Development](#development) section below.
> This project is in early development. APIs are unstable and can change at any time. Contributions are very welcome! See the [Development](#development) section below.
# Backends

Expand Down Expand Up @@ -142,7 +142,9 @@ def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobCli
pipes_rayjob_client.run(
context=context,
ray_job={
# RayJob manifest goes here, only .metadata.name is not required and will be generated if not provided
# RayJob manifest goes here
# .metadata.name is not required and will be generated if not provided
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
# full reference: https://ray-project.github.io/kuberay/reference/api/#rayjob
...
},
Expand Down Expand Up @@ -300,13 +302,13 @@ Running `pytest` will **automatically**:
- build an image with the local `dagster-ray` code
- start a `minikube` Kubernetes cluster
- load the built `dagster-ray` and loaded `kuberay-operator` images into the cluster
- install the `KubeRay Operator` in the cluster with `helm`
- install `KubeRay Operator` into the cluster with `helm`
- run the tests

Thus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!

> [!NOTE]
> Specifying a comma-separated list of `KubeRay Operator` versions in the `KUBE_RAY_OPERATOR_VERSIONS` environment variable will spawn a new test for each version.
> Specifying a comma-separated list of `KubeRay Operator` versions in the `PYTEST_KUBERAY_VERSIONS` environment variable will spawn a new test for each version.
> [!NOTE]
> it may take a while to download `minikube` and `kuberay-operator` images and build the local `dagster-ray` image during the first tests invocation
2 changes: 0 additions & 2 deletions dagster_ray/kuberay/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ def __init__(
self.config_file = config_file
self.context = context

self.kube_config: Optional[Any] = load_kubeconfig(config_file=config_file, context=context)

self._api = client.CustomObjectsApi()
self._core_v1_api = client.CoreV1Api()

Expand Down
10 changes: 7 additions & 3 deletions dagster_ray/kuberay/client/raycluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def wait_until_ready(
# TODO: use get_namespaced_custom_object instead
# once https://github.com/kubernetes-client/python/issues/1679
# is solved

for event in w.stream(
self._api.list_namespaced_custom_object,
self.group,
Expand All @@ -129,7 +130,7 @@ def wait_until_ready(

if status.get("state") == "failed":
raise Exception(
f"RayCluster {namespace}/{name} failed to start. More details: `kubectl -n {namespace} describe RayCluster {name}`"
f"RayCluster {namespace}/{name} failed to start. Reason:\n{status.get('reason')}\nMore details: `kubectl -n {namespace} describe RayCluster {name}`"
)

if (
Expand Down Expand Up @@ -196,6 +197,9 @@ def port_forward(
if self.context:
cmd.extend(["--context", self.context])

if self.kube_config:
cmd.extend(["--kubeconfig", self.kube_config])

process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

queue = Queue()
Expand Down Expand Up @@ -239,7 +243,7 @@ def should_stop():

@contextmanager
def job_submission_client(
self, name: str, namespace: str, port_forward: bool = False
self, name: str, namespace: str, port_forward: bool = False, timeout: int = 60
) -> Iterator["JobSubmissionClient"]:
"""
Returns a JobSubmissionClient object that can be used to interact with Ray jobs running in the KubeRay cluster.
Expand All @@ -259,7 +263,7 @@ def job_submission_client(

yield JobSubmissionClient(address=f"http://{host}:{dashboard_port}")
else:
self.wait_for_service_endpoints(service_name=f"{name}-head-svc", namespace=namespace)
self.wait_for_service_endpoints(service_name=f"{name}-head-svc", namespace=namespace, timeout=timeout)
with self.port_forward(name=name, namespace=namespace, local_dashboard_port=0, local_gcs_port=0) as (
local_dashboard_port,
_,
Expand Down
12 changes: 6 additions & 6 deletions dagster_ray/kuberay/client/rayjob/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ def get_job_sumission_id(self, name: str, namespace: str) -> str:

@property
def ray_cluster_client(self) -> RayClusterClient:
return RayClusterClient(config_file=self.kube_config, context=self.context)
return RayClusterClient(config_file=self.config_file, context=self.context)

def wait_until_running(
self,
name: str,
namespace: str,
timeout: int = 60 * 60,
timeout: int = 300,
poll_interval: int = 5,
) -> bool:
start_time = time.time()
Expand All @@ -70,11 +70,11 @@ def wait_until_running(
if status in ["Running", "Complete"]:
break
elif status == "Failed":
return False
raise RuntimeError(f"RayJob {namespace}/{name} deployment failed. Status:\n{status}")

if time.time() - start_time > timeout:
raise TimeoutError(
f"Timed out waiting for RayJob {name} deployment to become available." f"Status: {status}"
f"Timed out waiting for RayJob {namespace}/{name} deployment to become available. Status:\n{status}"
)

time.sleep(poll_interval)
Expand All @@ -86,7 +86,7 @@ def wait_until_running(
break

if time.time() - start_time > timeout:
raise TimeoutError(f"Timed out waiting for RayJob {name} to start. " f"Status: {status}")
raise TimeoutError(f"Timed out waiting for RayJob {namespace}/{name} to start. Status:\n{status}")

time.sleep(poll_interval)

Expand All @@ -96,7 +96,7 @@ def _wait_for_job_submission(
self,
name: str,
namespace: str,
timeout: int = 600,
timeout: int = 300,
poll_interval: int = 10,
):
start_time = time.time()
Expand Down
4 changes: 2 additions & 2 deletions dagster_ray/kuberay/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
"env": [],
"envFrom": [],
"resources": {
"limits": {"cpu": "1000m", "memory": "1Gi"},
"requests": {"cpu": "1000m", "memory": "1Gi"},
"limits": {"cpu": "50m", "memory": "0.1Gi"},
"requests": {"cpu": "50m", "memory": "0.1Gi"},
},
}
DEFAULT_HEAD_GROUP_SPEC = {
Expand Down
1 change: 1 addition & 0 deletions dagster_ray/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def read_messages(self, handler: "PipesMessageHandler") -> Iterator[PipesParams]
self._handler = None
if self._thread is not None:
self.terminate()
self._terminate_reading.wait()
self.thread.join()

def tail_job_logs(self, client: "JobSubmissionClient", job_id: str, blocking: bool = False) -> None:
Expand Down
7 changes: 6 additions & 1 deletion tests/kuberay/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def k8s_with_kuberay(

k8s.wait("deployment/kuberay-operator", "condition=Available=True", namespace="kuberay-operator")
# namespace to create RayClusters in
k8s.kubectl(["create", "namespace", NAMESPACE])
try:
k8s.kubectl(["create", "namespace", NAMESPACE])
except RuntimeError as e:
if "AlreadyExists" not in str(e):
raise

yield k8s
k8s.delete()
34 changes: 22 additions & 12 deletions tests/kuberay/test_rayjob.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re

import pytest
import ray # noqa: TID253
from dagster import AssetExecutionContext, asset, materialize
from dagster._core.definitions.data_version import (
Expand All @@ -9,6 +10,7 @@
from dagster._core.instance_for_test import instance_for_test
from pytest_kubernetes.providers import AClusterManager

from dagster_ray.kuberay.client import RayJobClient
from dagster_ray.kuberay.pipes import PipesRayJobClient

RAY_JOB = {
Expand All @@ -21,35 +23,28 @@
"spec": {
"activeDeadlineSeconds": 10800,
"entrypoint": "python /src/tests/kuberay/scripts/remote_job.py",
"entrypointNumCpus": 0.05,
"entrypointNumCpus": 0.1,
"rayClusterSpec": {
"autoscalerOptions": {
"idleTimeoutSeconds": 60,
"imagePullPolicy": "IfNotPresent",
"resources": {"limits": {"cpu": "1", "memory": "1Gi"}, "requests": {"cpu": "1", "memory": "1Gi"}},
"securityContext": {"runAsUser": 0},
},
"enableInTreeAutoscaling": False,
"headGroupSpec": {
"rayStartParams": {"dashboard-host": "0.0.0.0", "num-cpus": "0", "num-gpus": "0"},
"rayStartParams": {"dashboard-host": "0.0.0.0", "num-cpus": "1", "num-gpus": "0"},
"serviceType": "ClusterIP",
"template": {
"metadata": {"annotations": {}},
"spec": {
"affinity": {},
"containers": [
{
"env": [],
"imagePullPolicy": "IfNotPresent",
"name": "ray-head",
"resources": {
"limits": {"cpu": "500m", "memory": "800M"},
"requests": {"cpu": "500m", "memory": "800M"},
},
"securityContext": {"runAsUser": 0},
}
],
"serviceAccountName": "ray",
# "serviceAccountName": "ray",
"volumes": [],
},
},
Expand All @@ -64,7 +59,17 @@
}


def test_rayjob_pipes(k8s_with_kuberay: AClusterManager, capsys):
@pytest.fixture(scope="session")
def pipes_rayjob_client(k8s_with_kuberay: AClusterManager):
return PipesRayJobClient(
client=RayJobClient(
config_file=str(k8s_with_kuberay.kubeconfig),
),
port_forward=True,
)


def test_rayjob_pipes(pipes_rayjob_client: PipesRayJobClient, dagster_ray_image: str, capsys):
@asset
def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobClient) -> None:
pipes_rayjob_client.run(
Expand All @@ -74,7 +79,12 @@ def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobCli
)

with instance_for_test() as instance:
result = materialize([my_asset], resources={"pipes_rayjob_client": PipesRayJobClient()}, instance=instance)
result = materialize(
[my_asset],
resources={"pipes_rayjob_client": pipes_rayjob_client},
instance=instance,
tags={"dagster/image": dagster_ray_image},
)

assert result.success

Expand Down

0 comments on commit 589af20

Please sign in to comment.