From f2f532e62bdc0c843d21876cf3486d9bc5480a88 Mon Sep 17 00:00:00 2001 From: Daniel Gafni Date: Tue, 22 Oct 2024 00:37:13 +0200 Subject: [PATCH] matrix for ray and dagster versions --- .github/workflows/CI.yml | 13 +++++++-- Dockerfile | 18 ++++++++++--- dagster_ray/kuberay/configs.py | 10 +++---- dagster_ray/pipes.py | 11 ++++---- tests/kuberay/conftest.py | 46 +++++++++++++++++++++----------- tests/kuberay/test_pipes.py | 13 ++++----- tests/kuberay/test_raycluster.py | 2 +- 7 files changed, 72 insertions(+), 41 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index c147272..878aa84 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -19,7 +19,7 @@ env: jobs: test: - name: Test Python ${{ matrix.py }} - KubeRay ${{ matrix.kuberay }} + name: Test Python ${{ matrix.py }} - Ray ${{ matrix.kuberay }} - Dagster ${{ matrix.dagster }} - KubeRay ${{ matrix.kuberay }} runs-on: ${{ matrix.os }}-latest strategy: fail-fast: false @@ -31,6 +31,13 @@ jobs: - "3.11" - "3.10" - "3.9" + ray: + - "2.37.0" + - "2.24.0" + - "2.12.0" + dagster: + - "1.8.12" + - "1.7.16" kuberay: - "1.1.0" - "1.2.2" @@ -57,10 +64,12 @@ jobs: - name: Run tests env: PYTEST_KUBERAY_VERSIONS: "${{ matrix.kuberay }}" + PYTEST_RAY_VERSION: "${{ matrix.ray }}" + PYTEST_DAGSTER_VERSION: "${{ matrix.dagster }}" run: uv run pytest -v . lint: - name: lint ${{ matrix.py }} - ${{ matrix.os }} + name: Lint ${{ matrix.py }} runs-on: ${{ matrix.os }}-latest strategy: fail-fast: false diff --git a/Dockerfile b/Dockerfile index 8134f99..0009721 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ COPY --from=bitnami/kubectl:1.30.3 /opt/bitnami/kubectl/bin/kubectl /usr/local/b # install uv (https://github.com/astral-sh/uv) # docs for using uv with Docker: https://docs.astral.sh/uv/guides/integration/docker/ -COPY --from=ghcr.io/astral-sh/uv:0.4.18 /uv /bin/uv +COPY --from=ghcr.io/astral-sh/uv:0.4.25 /uv /bin/uv ENV UV_PROJECT_ENVIRONMENT=/usr/local/ ENV DAGSTER_HOME=/opt/dagster/dagster_home @@ -27,7 +27,7 @@ WORKDIR /src COPY pyproject.toml uv.lock ./ RUN --mount=type=cache,target=/root/.cache/uv \ - uv sync --frozen --all-extras --no-dev --no-install-project + uv sync --frozen --all-extras --no-dev --no-install-project --inexact FROM base-prod AS base-dev @@ -41,8 +41,18 @@ RUN --mount=type=cache,target=/cache/downloads \ curl https://nodejs.org/dist/v$NODE_VERSION/$NODE_PACKAGE.tar.gz -o /cache/downloads/$NODE_PACKAGE.tar.gz \ && tar -xzC /opt/ -f /cache/downloads/$NODE_PACKAGE.tar.gz + +RUN mkdir dagster_ray && touch dagster_ray/__init__.py && touch README.md +COPY dagster_ray/_version.py dagster_ray/_version.py + +# Install specific Dagster and Ray versions (for integration tests) +ARG RAY_VERSION=2.35.0 +ARG DAGSTER_VERSION=1.8.12 +RUN --mount=type=cache,target=/root/.cache/uv \ + uv add --no-sync "ray[all]==$RAY_VERSION" "dagster==$DAGSTER_VERSION" + RUN --mount=type=cache,target=/root/.cache/uv \ - uv sync --frozen --all-extras --no-install-project + uv sync --frozen --all-extras --no-install-project --inexact # ------------------------------------------------------------- FROM base-${BUILD_DEPENDENCIES} AS final @@ -51,4 +61,4 @@ FROM base-${BUILD_DEPENDENCIES} AS final COPY . . # finally install all our code -RUN uv sync --frozen --all-extras +RUN uv sync --frozen --all-extras --inexact diff --git a/dagster_ray/kuberay/configs.py b/dagster_ray/kuberay/configs.py index 95da7ea..603d0ba 100644 --- a/dagster_ray/kuberay/configs.py +++ b/dagster_ray/kuberay/configs.py @@ -30,14 +30,14 @@ "containers": [ { "volumeMounts": [ - # {"mountPath": "/tmp/ray", "name": "log-volume"}, + {"mountPath": "/tmp/ray", "name": "ray-logs"}, ], "name": "head", "imagePullPolicy": "Always", }, ], "volumes": [ - {"name": "log-volume", "emptyDir": {}}, + {"name": "ray-logs", "emptyDir": {}}, ], "affinity": {}, "tolerations": [], @@ -58,15 +58,13 @@ "imagePullSecrets": [], "containers": [ { - "volumeMounts": [ - # {"mountPath": "/tmp/ray", "name": "log-volume"} - ], + "volumeMounts": [{"mountPath": "/tmp/ray", "name": "ray-logs"}], "name": "worker", "imagePullPolicy": "Always", } ], "volumes": [ - {"name": "log-volume", "emptyDir": {}}, + {"name": "ray-logs", "emptyDir": {}}, ], "affinity": {}, "tolerations": [], diff --git a/dagster_ray/pipes.py b/dagster_ray/pipes.py index 62a3c26..d36d7e6 100644 --- a/dagster_ray/pipes.py +++ b/dagster_ray/pipes.py @@ -210,7 +210,6 @@ def run( # type: ignore ray_job (Dict[str, Any]): RayJob specification. `API reference `_. extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session. """ - from ray.job_submission import JobStatus with open_pipes_session( context=context, @@ -224,11 +223,7 @@ def run( # type: ignore try: self._read_messages(context, job_id) - status = self._wait_for_completion(context, job_id) - - if status in {JobStatus.FAILED, JobStatus.STOPPED}: - raise RuntimeError(f"RayJob {job_id} failed with status {status}") - + self._wait_for_completion(context, job_id) return PipesClientCompletedInvocation(session) except DagsterExecutionInterruptedError: @@ -287,6 +282,10 @@ def _wait_for_completion(self, context: OpExecutionContext, job_id: str) -> "Job status = self.client.get_job_status(job_id) if status.is_terminal(): + if status in {JobStatus.FAILED, JobStatus.STOPPED}: + job_details = self.client.get_job_info(job_id) + raise RuntimeError(f"RayJob {job_id} failed with status {status}. Message:\n{job_details.message}") + return status time.sleep(self.poll_interval) diff --git a/tests/kuberay/conftest.py b/tests/kuberay/conftest.py index 9715de9..7b7b5f9 100644 --- a/tests/kuberay/conftest.py +++ b/tests/kuberay/conftest.py @@ -14,7 +14,7 @@ from dagster_ray.kuberay.client import RayClusterClient from dagster_ray.kuberay.configs import DEFAULT_HEAD_GROUP_SPEC, DEFAULT_WORKER_GROUP_SPECS from tests import ROOT_DIR -from tests.kuberay.utils import NAMESPACE, get_random_free_port +from tests.kuberay.utils import NAMESPACE @pytest.fixture(scope="session") @@ -28,6 +28,9 @@ def kuberay_helm_repo(): @pytest.fixture(scope="session") def dagster_ray_image(): + import dagster + import ray + """ Either returns the image name from the environment variable PYTEST_DAGSTER_RAY_IMAGE or builds the image and returns it @@ -36,7 +39,11 @@ def dagster_ray_image(): if PYTEST_DAGSTER_RAY_IMAGE is None: # build the local image python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" - image = f"local/dagster-ray:py-{python_version}" + ray_version = os.getenv("PYTEST_RAY_VERSION") or ray.__version__ + dagster_version = os.getenv("PYTEST_DAGSTER_VERSION") or dagster.__version__ + + image = f"local/dagster-ray:py-{python_version}-{ray_version}-{dagster_version}" + subprocess.run( [ "docker", @@ -47,6 +54,10 @@ def dagster_ray_image(): "BUILD_DEPENDENCIES=dev", "--build-arg", f"PYTHON_VERSION={python_version}", + "--build-arg", + f"RAY_VERSION={ray_version}", + "--build-arg", + f"DAGSTER_VERSION={dagster_version}", "-t", image, str(ROOT_DIR), @@ -145,7 +156,7 @@ def k8s_with_raycluster( k8s_with_kuberay: AClusterManager, head_group_spec: Dict[str, Any], worker_group_specs: List[Dict[str, Any]], -) -> Iterator[Tuple[dict[str, int], AClusterManager]]: +) -> Iterator[Tuple[dict[str, str], AClusterManager]]: # create a RayCluster config.load_kube_config(str(k8s_with_kuberay.kubeconfig)) @@ -155,6 +166,8 @@ def k8s_with_raycluster( client.create( body={ + "kind": "RayCluster", + "apiVersion": "ray.io/v1", "metadata": {"name": PERSISTENT_RAY_CLUSTER_NAME}, "spec": { "headGroupSpec": head_group_spec, @@ -164,21 +177,22 @@ def k8s_with_raycluster( namespace=NAMESPACE, ) - redis_port = get_random_free_port() - dashboard_port = get_random_free_port() - - with k8s_with_kuberay.port_forwarding( - target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc", - source_port=redis_port, - target_port=10001, + client.wait_until_ready( + name=PERSISTENT_RAY_CLUSTER_NAME, namespace=NAMESPACE, - ), k8s_with_kuberay.port_forwarding( - target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc", - source_port=dashboard_port, - target_port=8265, + timeout=600, + ) + + with client.port_forward( + name=PERSISTENT_RAY_CLUSTER_NAME, namespace=NAMESPACE, - ): - yield {"redis": redis_port, "dashboard": dashboard_port}, k8s_with_kuberay + local_dashboard_port=0, + local_gcs_port=0, + ) as (dashboard_port, redis_port): + yield ( + {"gcs": f"ray://localhost:{redis_port}", "dashboard": f"http://localhost:{dashboard_port}"}, + k8s_with_kuberay, + ) client.delete( name=PERSISTENT_RAY_CLUSTER_NAME, diff --git a/tests/kuberay/test_pipes.py b/tests/kuberay/test_pipes.py index 7837074..57693cf 100644 --- a/tests/kuberay/test_pipes.py +++ b/tests/kuberay/test_pipes.py @@ -15,7 +15,8 @@ from dagster_ray import PipesRayJobClient from dagster_ray.kuberay.client import RayJobClient from dagster_ray.kuberay.pipes import PipesKubeRayJobClient -from tests.test_pipes import LOCAL_SCRIPT_PATH + +ENTRYPOINT = "python /src/tests/scripts/remote_job.py" RAY_JOB = { "apiVersion": "ray.io/v1", @@ -26,7 +27,7 @@ }, "spec": { "activeDeadlineSeconds": 10800, - "entrypoint": "python /src/tests/scripts/remote_job.py", + "entrypoint": ENTRYPOINT, "entrypointNumCpus": 0.1, "rayClusterSpec": { "autoscalerOptions": { @@ -118,11 +119,11 @@ def my_asset(context: AssetExecutionContext, pipes_kube_rayjob_client: PipesKube @pytest.fixture(scope="session") -def pipes_ray_job_client(k8s_with_raycluster: Tuple[dict[str, int], AClusterManager]): - ports, k8s = k8s_with_raycluster +def pipes_ray_job_client(k8s_with_raycluster: Tuple[dict[str, str], AClusterManager]): + hosts, k8s = k8s_with_raycluster return PipesRayJobClient( client=JobSubmissionClient( - address=f"https://localhost:{ports['dashboard']}", + address=hosts["dashboard"], ) ) @@ -132,7 +133,7 @@ def test_ray_job_pipes(pipes_ray_job_client: PipesRayJobClient, capsys): def my_asset(context: AssetExecutionContext, pipes_ray_job_client: PipesRayJobClient): result = pipes_ray_job_client.run( context=context, - submit_job_params={"entrypoint": f"{sys.executable} {LOCAL_SCRIPT_PATH}"}, + submit_job_params={"entrypoint": ENTRYPOINT, "entrypoint_num_cpus": 0.1}, extras={"foo": "bar"}, ).get_materialize_result() diff --git a/tests/kuberay/test_raycluster.py b/tests/kuberay/test_raycluster.py index c6a379c..be1713f 100644 --- a/tests/kuberay/test_raycluster.py +++ b/tests/kuberay/test_raycluster.py @@ -10,7 +10,7 @@ from dagster_ray.kuberay import KubeRayCluster, RayClusterClientResource, RayClusterConfig, cleanup_kuberay_clusters from dagster_ray.kuberay.client import RayClusterClient from dagster_ray.kuberay.ops import CleanupKuberayClustersConfig -from tests.kuberay.conftest import NAMESPACE, get_random_free_port +from tests.kuberay.utils import NAMESPACE, get_random_free_port @pytest.fixture(scope="session")