diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index c147272..c61ebc7 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -19,7 +19,7 @@ env: jobs: test: - name: Test Python ${{ matrix.py }} - KubeRay ${{ matrix.kuberay }} + name: Test Python ${{ matrix.py }} - Ray ${{ matrix.ray }} - Dagster ${{ matrix.dagster }} - KubeRay ${{ matrix.kuberay }} runs-on: ${{ matrix.os }}-latest strategy: fail-fast: false @@ -31,20 +31,16 @@ jobs: - "3.11" - "3.10" - "3.9" + ray: + - "2.37.0" + - "2.24.0" + dagster: + - "1.8.12" kuberay: - "1.1.0" - "1.2.2" steps: - uses: actions/checkout@v4 - - name: Install uv - uses: astral-sh/setup-uv@v3 - with: - version: 0.4.18 - enable-cache: true - - name: Set up Python ${{ matrix.py }} - run: uv python install ${{ matrix.py }} - - name: Install dependencies - run: uv sync --all-extras --dev - uses: azure/setup-helm@v4.2.0 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 @@ -53,14 +49,27 @@ jobs: with: start: false driver: docker - #- uses: mxschmitt/action-tmate@v3 + - name: Install uv + uses: astral-sh/setup-uv@v3 + with: + version: 0.4.25 + enable-cache: true + - name: Set up Python ${{ matrix.py }} + run: uv python install ${{ matrix.py }} && uv venv --python ${{ matrix.py }} + - name: Override ray==${{ matrix.ray }} dagster==${{ matrix.dagster }} + id: override + run: uv add --no-sync "ray[all]==${{ matrix.ray }}" "dagster==${{ matrix.dagster }}" || echo SKIP=1 >> $GITHUB_OUTPUT + - name: Install dependencies + run: uv sync --all-extras --dev + if: ${{ steps.override.outputs.SKIP != '1' }} - name: Run tests env: PYTEST_KUBERAY_VERSIONS: "${{ matrix.kuberay }}" run: uv run pytest -v . + if: ${{ steps.override.outputs.SKIP != '1' }} lint: - name: lint ${{ matrix.py }} - ${{ matrix.os }} + name: Lint ${{ matrix.py }} runs-on: ${{ matrix.os }}-latest strategy: fail-fast: false diff --git a/Dockerfile b/Dockerfile index 8134f99..7b3a839 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ COPY --from=bitnami/kubectl:1.30.3 /opt/bitnami/kubectl/bin/kubectl /usr/local/b # install uv (https://github.com/astral-sh/uv) # docs for using uv with Docker: https://docs.astral.sh/uv/guides/integration/docker/ -COPY --from=ghcr.io/astral-sh/uv:0.4.18 /uv /bin/uv +COPY --from=ghcr.io/astral-sh/uv:0.4.25 /uv /bin/uv ENV UV_PROJECT_ENVIRONMENT=/usr/local/ ENV DAGSTER_HOME=/opt/dagster/dagster_home @@ -27,12 +27,12 @@ WORKDIR /src COPY pyproject.toml uv.lock ./ RUN --mount=type=cache,target=/root/.cache/uv \ - uv sync --frozen --all-extras --no-dev --no-install-project + uv sync --frozen --all-extras --no-dev --no-install-project --inexact FROM base-prod AS base-dev # Node.js is needed for pyright in CI -ARG NODE_VERSION=20.7.0 +ARG NODE_VERSION=23.0.0 ARG NODE_PACKAGE=node-v$NODE_VERSION-linux-x64 ARG NODE_HOME=/opt/$NODE_PACKAGE ENV NODE_PATH $NODE_HOME/lib/node_modules @@ -41,6 +41,16 @@ RUN --mount=type=cache,target=/cache/downloads \ curl https://nodejs.org/dist/v$NODE_VERSION/$NODE_PACKAGE.tar.gz -o /cache/downloads/$NODE_PACKAGE.tar.gz \ && tar -xzC /opt/ -f /cache/downloads/$NODE_PACKAGE.tar.gz + +RUN mkdir dagster_ray && touch dagster_ray/__init__.py && touch README.md +COPY dagster_ray/_version.py dagster_ray/_version.py + +# Install specific Dagster and Ray versions (for integration tests) +ARG RAY_VERSION=2.35.0 +ARG DAGSTER_VERSION=1.8.12 +RUN --mount=type=cache,target=/root/.cache/uv \ + uv add --no-sync "ray[all]==$RAY_VERSION" "dagster==$DAGSTER_VERSION" + RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --all-extras --no-install-project @@ -51,4 +61,4 @@ FROM base-${BUILD_DEPENDENCIES} AS final COPY . . # finally install all our code -RUN uv sync --frozen --all-extras +RUN uv sync --frozen --all-extras --inexact diff --git a/dagster_ray/kuberay/configs.py b/dagster_ray/kuberay/configs.py index 95da7ea..603d0ba 100644 --- a/dagster_ray/kuberay/configs.py +++ b/dagster_ray/kuberay/configs.py @@ -30,14 +30,14 @@ "containers": [ { "volumeMounts": [ - # {"mountPath": "/tmp/ray", "name": "log-volume"}, + {"mountPath": "/tmp/ray", "name": "ray-logs"}, ], "name": "head", "imagePullPolicy": "Always", }, ], "volumes": [ - {"name": "log-volume", "emptyDir": {}}, + {"name": "ray-logs", "emptyDir": {}}, ], "affinity": {}, "tolerations": [], @@ -58,15 +58,13 @@ "imagePullSecrets": [], "containers": [ { - "volumeMounts": [ - # {"mountPath": "/tmp/ray", "name": "log-volume"} - ], + "volumeMounts": [{"mountPath": "/tmp/ray", "name": "ray-logs"}], "name": "worker", "imagePullPolicy": "Always", } ], "volumes": [ - {"name": "log-volume", "emptyDir": {}}, + {"name": "ray-logs", "emptyDir": {}}, ], "affinity": {}, "tolerations": [], diff --git a/dagster_ray/pipes.py b/dagster_ray/pipes.py index 62a3c26..c8c7daf 100644 --- a/dagster_ray/pipes.py +++ b/dagster_ray/pipes.py @@ -210,7 +210,6 @@ def run( # type: ignore ray_job (Dict[str, Any]): RayJob specification. `API reference `_. extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session. """ - from ray.job_submission import JobStatus with open_pipes_session( context=context, @@ -224,11 +223,7 @@ def run( # type: ignore try: self._read_messages(context, job_id) - status = self._wait_for_completion(context, job_id) - - if status in {JobStatus.FAILED, JobStatus.STOPPED}: - raise RuntimeError(f"RayJob {job_id} failed with status {status}") - + self._wait_for_completion(context, job_id) return PipesClientCompletedInvocation(session) except DagsterExecutionInterruptedError: @@ -281,12 +276,20 @@ def _read_messages(self, context: OpExecutionContext, job_id: str) -> None: ) def _wait_for_completion(self, context: OpExecutionContext, job_id: str) -> "JobStatus": + from ray.job_submission import JobStatus + context.log.info(f"[pipes] Waiting for RayJob {job_id} to complete...") while True: status = self.client.get_job_status(job_id) if status.is_terminal(): + if status in {JobStatus.FAILED, JobStatus.STOPPED}: + job_details = self.client.get_job_info(job_id) + raise RuntimeError( + f"[pipes] RayJob {job_id} failed with status {status}. Message:\n{job_details.message}" + ) + return status time.sleep(self.poll_interval) diff --git a/tests/kuberay/conftest.py b/tests/kuberay/conftest.py index 9715de9..f0f0b94 100644 --- a/tests/kuberay/conftest.py +++ b/tests/kuberay/conftest.py @@ -14,7 +14,7 @@ from dagster_ray.kuberay.client import RayClusterClient from dagster_ray.kuberay.configs import DEFAULT_HEAD_GROUP_SPEC, DEFAULT_WORKER_GROUP_SPECS from tests import ROOT_DIR -from tests.kuberay.utils import NAMESPACE, get_random_free_port +from tests.kuberay.utils import NAMESPACE @pytest.fixture(scope="session") @@ -28,6 +28,9 @@ def kuberay_helm_repo(): @pytest.fixture(scope="session") def dagster_ray_image(): + import dagster + import ray + """ Either returns the image name from the environment variable PYTEST_DAGSTER_RAY_IMAGE or builds the image and returns it @@ -36,7 +39,11 @@ def dagster_ray_image(): if PYTEST_DAGSTER_RAY_IMAGE is None: # build the local image python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" - image = f"local/dagster-ray:py-{python_version}" + ray_version = ray.__version__ + dagster_version = dagster.__version__ + + image = f"local/dagster-ray:py-{python_version}-{ray_version}-{dagster_version}" + subprocess.run( [ "docker", @@ -47,6 +54,10 @@ def dagster_ray_image(): "BUILD_DEPENDENCIES=dev", "--build-arg", f"PYTHON_VERSION={python_version}", + "--build-arg", + f"RAY_VERSION={ray_version}", + "--build-arg", + f"DAGSTER_VERSION={dagster_version}", "-t", image, str(ROOT_DIR), @@ -145,7 +156,7 @@ def k8s_with_raycluster( k8s_with_kuberay: AClusterManager, head_group_spec: Dict[str, Any], worker_group_specs: List[Dict[str, Any]], -) -> Iterator[Tuple[dict[str, int], AClusterManager]]: +) -> Iterator[Tuple[dict[str, str], AClusterManager]]: # create a RayCluster config.load_kube_config(str(k8s_with_kuberay.kubeconfig)) @@ -155,6 +166,8 @@ def k8s_with_raycluster( client.create( body={ + "kind": "RayCluster", + "apiVersion": "ray.io/v1", "metadata": {"name": PERSISTENT_RAY_CLUSTER_NAME}, "spec": { "headGroupSpec": head_group_spec, @@ -164,21 +177,22 @@ def k8s_with_raycluster( namespace=NAMESPACE, ) - redis_port = get_random_free_port() - dashboard_port = get_random_free_port() - - with k8s_with_kuberay.port_forwarding( - target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc", - source_port=redis_port, - target_port=10001, + client.wait_until_ready( + name=PERSISTENT_RAY_CLUSTER_NAME, namespace=NAMESPACE, - ), k8s_with_kuberay.port_forwarding( - target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc", - source_port=dashboard_port, - target_port=8265, + timeout=600, + ) + + with client.port_forward( + name=PERSISTENT_RAY_CLUSTER_NAME, namespace=NAMESPACE, - ): - yield {"redis": redis_port, "dashboard": dashboard_port}, k8s_with_kuberay + local_dashboard_port=0, + local_gcs_port=0, + ) as (dashboard_port, redis_port): + yield ( + {"gcs": f"ray://localhost:{redis_port}", "dashboard": f"http://localhost:{dashboard_port}"}, + k8s_with_kuberay, + ) client.delete( name=PERSISTENT_RAY_CLUSTER_NAME, diff --git a/tests/kuberay/test_pipes.py b/tests/kuberay/test_pipes.py index 7837074..57693cf 100644 --- a/tests/kuberay/test_pipes.py +++ b/tests/kuberay/test_pipes.py @@ -15,7 +15,8 @@ from dagster_ray import PipesRayJobClient from dagster_ray.kuberay.client import RayJobClient from dagster_ray.kuberay.pipes import PipesKubeRayJobClient -from tests.test_pipes import LOCAL_SCRIPT_PATH + +ENTRYPOINT = "python /src/tests/scripts/remote_job.py" RAY_JOB = { "apiVersion": "ray.io/v1", @@ -26,7 +27,7 @@ }, "spec": { "activeDeadlineSeconds": 10800, - "entrypoint": "python /src/tests/scripts/remote_job.py", + "entrypoint": ENTRYPOINT, "entrypointNumCpus": 0.1, "rayClusterSpec": { "autoscalerOptions": { @@ -118,11 +119,11 @@ def my_asset(context: AssetExecutionContext, pipes_kube_rayjob_client: PipesKube @pytest.fixture(scope="session") -def pipes_ray_job_client(k8s_with_raycluster: Tuple[dict[str, int], AClusterManager]): - ports, k8s = k8s_with_raycluster +def pipes_ray_job_client(k8s_with_raycluster: Tuple[dict[str, str], AClusterManager]): + hosts, k8s = k8s_with_raycluster return PipesRayJobClient( client=JobSubmissionClient( - address=f"https://localhost:{ports['dashboard']}", + address=hosts["dashboard"], ) ) @@ -132,7 +133,7 @@ def test_ray_job_pipes(pipes_ray_job_client: PipesRayJobClient, capsys): def my_asset(context: AssetExecutionContext, pipes_ray_job_client: PipesRayJobClient): result = pipes_ray_job_client.run( context=context, - submit_job_params={"entrypoint": f"{sys.executable} {LOCAL_SCRIPT_PATH}"}, + submit_job_params={"entrypoint": ENTRYPOINT, "entrypoint_num_cpus": 0.1}, extras={"foo": "bar"}, ).get_materialize_result() diff --git a/tests/kuberay/test_raycluster.py b/tests/kuberay/test_raycluster.py index c6a379c..be1713f 100644 --- a/tests/kuberay/test_raycluster.py +++ b/tests/kuberay/test_raycluster.py @@ -10,7 +10,7 @@ from dagster_ray.kuberay import KubeRayCluster, RayClusterClientResource, RayClusterConfig, cleanup_kuberay_clusters from dagster_ray.kuberay.client import RayClusterClient from dagster_ray.kuberay.ops import CleanupKuberayClustersConfig -from tests.kuberay.conftest import NAMESPACE, get_random_free_port +from tests.kuberay.utils import NAMESPACE, get_random_free_port @pytest.fixture(scope="session")