Skip to content

Commit

Permalink
wait until RayCluster is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Sep 16, 2024
1 parent 5c09f36 commit 3645c4d
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 9 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobCli
pipes_rayjob_client.run(
context=context,
ray_job={
# RayJob manifest goes here, only .metadata.name is not required and will be generated if not provided
# RayJob manifest goes here
# .metadata.name is not required and will be generated if not provided
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
# full reference: https://ray-project.github.io/kuberay/reference/api/#rayjob
...
},
Expand Down
2 changes: 1 addition & 1 deletion dagster_ray/kuberay/client/raycluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def wait_until_ready(

if status.get("state") == "failed":
raise Exception(
f"RayCluster {namespace}/{name} failed to start. More details: `kubectl -n {namespace} describe RayCluster {name}`"
f"RayCluster {namespace}/{name} failed to start. Reason:\n{status.get('reason')}\nMore details: `kubectl -n {namespace} describe RayCluster {name}`"
)

if (
Expand Down
4 changes: 2 additions & 2 deletions dagster_ray/kuberay/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
"env": [],
"envFrom": [],
"resources": {
"limits": {"cpu": "1000m", "memory": "1Gi"},
"requests": {"cpu": "1000m", "memory": "1Gi"},
"limits": {"cpu": "50m", "memory": "0.1Gi"},
"requests": {"cpu": "50m", "memory": "0.1Gi"},
},
}
DEFAULT_HEAD_GROUP_SPEC = {
Expand Down
10 changes: 10 additions & 0 deletions dagster_ray/kuberay/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def _enrich_ray_job(
image_from_run_tag = context.dagster_run.tags.get("dagster/image")

for container in ray_job["spec"]["rayClusterSpec"]["headGroupSpec"]["template"]["spec"]["containers"]:
breakpoint()
container["image"] = container.get("image") or image_from_run_tag

for worker_spec in ray_job["spec"]["rayClusterSpec"]["workerGroupSpecs"]:
Expand All @@ -172,6 +173,15 @@ def _start(self, context: OpExecutionContext, ray_job: Dict[str, Any]) -> Dict[s
namespace=ray_job["metadata"]["namespace"],
)

self.client.ray_cluster_client.wait_until_ready(
name=self.client.get_ray_cluster_name(
name=name,
namespace=namespace,
),
namespace=namespace,
timeout=self.timeout,
)

self.client.wait_until_running(
name=name,
namespace=namespace,
Expand Down
1 change: 1 addition & 0 deletions dagster_ray/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def read_messages(self, handler: "PipesMessageHandler") -> Iterator[PipesParams]
self._handler = None
if self._thread is not None:
self.terminate()
self._terminate_reading.wait()
self.thread.join()

def tail_job_logs(self, client: "JobSubmissionClient", job_id: str, blocking: bool = False) -> None:
Expand Down
29 changes: 24 additions & 5 deletions tests/kuberay/test_rayjob.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re

import pytest
import ray # noqa: TID253
from dagster import AssetExecutionContext, asset, materialize
from dagster._core.definitions.data_version import (
Expand All @@ -9,6 +10,7 @@
from dagster._core.instance_for_test import instance_for_test
from pytest_kubernetes.providers import AClusterManager

from dagster_ray.kuberay.client import RayJobClient
from dagster_ray.kuberay.pipes import PipesRayJobClient

RAY_JOB = {
Expand All @@ -26,7 +28,10 @@
"autoscalerOptions": {
"idleTimeoutSeconds": 60,
"imagePullPolicy": "IfNotPresent",
"resources": {"limits": {"cpu": "1", "memory": "1Gi"}, "requests": {"cpu": "1", "memory": "1Gi"}},
"resources": {
"limits": {"cpu": "0.05", "memory": "0.1Gi"},
"requests": {"cpu": "0.05", "memory": "0.1Gi"},
},
"securityContext": {"runAsUser": 0},
},
"enableInTreeAutoscaling": False,
Expand All @@ -43,8 +48,8 @@
"imagePullPolicy": "IfNotPresent",
"name": "ray-head",
"resources": {
"limits": {"cpu": "500m", "memory": "800M"},
"requests": {"cpu": "500m", "memory": "800M"},
"limits": {"cpu": "500m", "memory": "300M"},
"requests": {"cpu": "500m", "memory": "100M"},
},
"securityContext": {"runAsUser": 0},
}
Expand All @@ -64,7 +69,16 @@
}


def test_rayjob_pipes(k8s_with_kuberay: AClusterManager, capsys):
@pytest.fixture(scope="session")
def pipes_rayjob_client(k8s_with_kuberay: AClusterManager):
return PipesRayJobClient(
client=RayJobClient(
config_file=str(k8s_with_kuberay.kubeconfig),
)
)


def test_rayjob_pipes(pipes_rayjob_client: PipesRayJobClient, dagster_ray_image: str, capsys):
@asset
def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobClient) -> None:
pipes_rayjob_client.run(
Expand All @@ -74,7 +88,12 @@ def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobCli
)

with instance_for_test() as instance:
result = materialize([my_asset], resources={"pipes_rayjob_client": PipesRayJobClient()}, instance=instance)
result = materialize(
[my_asset],
resources={"pipes_rayjob_client": pipes_rayjob_client},
instance=instance,
tags={"dagster/image": dagster_ray_image},
)

assert result.success

Expand Down

0 comments on commit 3645c4d

Please sign in to comment.