Skip to content

Commit

Permalink
✨ working_dir propagation & better docker example
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 16, 2024
1 parent 1346beb commit f772f42
Show file tree
Hide file tree
Showing 13 changed files with 4,090 additions and 125 deletions.
39 changes: 31 additions & 8 deletions dagster_ray/executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, cast

from dagster import (
Expand Down Expand Up @@ -66,10 +67,24 @@ def ray_executor(init_context: InitExecutorContext) -> Executor:
exc_cfg = init_context.executor_config
ray_cfg = RayExecutorConfig(**exc_cfg["ray"]) # type: ignore

if ray_cfg.inherit_job_submission_client_from_ray_run_launcher and isinstance(
init_context.instance.run_launcher, RayRunLauncher
):
# TODO: some RunLauncher config values can be automatically passed to the executor
runtime_env = ray_cfg.runtime_env or {}

if isinstance(init_context.instance.run_launcher, RayRunLauncher):
runtime_env["env_vars"] = {
**runtime_env.get("env_vars", {}),
**init_context.instance.run_launcher.runtime_env.get("env_vars", {}),
}

if init_context.instance.run_launcher.runtime_env.get("working_dir") is not None:
# TODO: some RunLauncher config values can be automatically passed to the executor

if not runtime_env.get("working_dir"):
# pass the working dir from the RunLauncher to the executor
# since it was set, we are already running in this directory
# so we can simply use os.getcwd() to get it
runtime_env["working_dir"] = os.getcwd()

if ray_cfg.inherit_job_submission_client_from_ray_run_launcher:
client = init_context.instance.run_launcher.client
else:
client = JobSubmissionClient(
Expand All @@ -80,7 +95,7 @@ def ray_executor(init_context: InitExecutorContext) -> Executor:
RayStepHandler(
client=client,
env_vars=ray_cfg.env_vars,
runtime_env=ray_cfg.runtime_env,
runtime_env=runtime_env,
num_cpus=ray_cfg.num_cpus,
num_gpus=ray_cfg.num_gpus,
memory=ray_cfg.memory,
Expand Down Expand Up @@ -183,10 +198,18 @@ def launch_step(self, step_handler_context: StepHandlerContext) -> Iterator[Dags
},
)

args = step_handler_context.execute_step_args.get_command_args()

print(args[-1])

args[-1] = "'" + args[-1] + "'"

dagster_cmd = " ".join(args)

entrypoint = f"export PYTHONPATH=$PWD:$PYTHONPATH && echo $PYTHONPATH && python -c 'import example; print(example.__file__)' && {dagster_cmd}"

self.client.submit_job(
entrypoint=" ".join(
step_handler_context.execute_step_args.get_command_args(skip_serialized_namedtuple=True)
),
entrypoint=entrypoint,
submission_id=submission_id,
metadata=labels,
runtime_env=runtime_env,
Expand Down
41 changes: 30 additions & 11 deletions dagster_ray/run_launcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import sys
from typing import TYPE_CHECKING, Any, Dict, List, Optional
Expand Down Expand Up @@ -124,12 +125,9 @@ def launch_run(self, context: LaunchRunContext) -> None:
).get_command_args()
)

# wrap the json in quotes to prevent erros with shell commands
args[-1] = "'" + args[-1] + "'"

self._launch_ray_job(submission_id, " ".join(args), run)
self._launch_ray_job(submission_id, args, run)

def _launch_ray_job(self, submission_id: str, entrypoint: str, run: DagsterRun):
def _launch_ray_job(self, submission_id: str, args: List[str], run: DagsterRun):
# note: entrypoint is a shell command
job_origin = check.not_none(run.job_code_origin)

Expand Down Expand Up @@ -160,8 +158,32 @@ def _launch_ray_job(self, submission_id: str, entrypoint: str, run: DagsterRun):
}
)

json_data = json.loads(args[-1])

if (
json_data.get("pipeline_origin", {})
.get("repository_origin", {})
.get("code_pointer", {})
.get("working_directory")
is not None
and self.runtime_env.get("working_dir") is not None
):
# remove dagster working_dir
# it will pick up $PWD in this case
del json_data["pipeline_origin"]["repository_origin"]["code_pointer"]["working_directory"]

args[-1] = json.dumps(json_data)

# wrap with quotes to avoid issues with shells
args[-1] = "'" + args[-1] + "'"

dagster_cmd = " ".join(args)

# FIXME: Dagster is not picking up Definition from $PWD
entrypoint = f"export PYTHONPATH=$PWD:$PYTHONPATH && {dagster_cmd}"

self._instance.report_engine_event(
"Creating Ray run job",
"Creating Ray job for the run",
run,
EngineEventData(
{
Expand Down Expand Up @@ -205,10 +227,7 @@ def resume_run(self, context: ResumeRunContext) -> None:
).get_command_args()
)

# wrap the json in quotes to prevent erros with shell commands
args[-1] = "'" + args[-1] + "'"

self._launch_ray_job(submission_id, " ".join(args), run)
self._launch_ray_job(submission_id, args, run)

def terminate(self, run_id: str) -> bool:
check.str_param(run_id, "run_id")
Expand Down Expand Up @@ -279,7 +298,7 @@ def check_run_worker_health(self, run: DagsterRun):
# something went wrong
if run.status in (DagsterRunStatus.STARTED, DagsterRunStatus.CANCELING) and status.is_terminal():
return CheckRunHealthResult(
WorkerStatus.FAILED, f"Run has not completed but Ray job has is in status: {status}"
WorkerStatus.FAILED, f"Dagster run has not completed but Ray job has is in status: {status}"
)

elif status == JobStatus.FAILED:
Expand Down
1 change: 1 addition & 0 deletions examples/docker/run_launcher/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.10
33 changes: 16 additions & 17 deletions examples/docker/run_launcher/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,30 @@
FROM python:3.10-slim

ENV UV_SYSTEM_PYTHON=1
ENV UV_PROJECT_ENVIRONMENT=/usr/local/

# install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install \
dagster \
dagster-graphql \
dagster-webserver \
dagster-postgres \
dagster-docker \
dagster-k8s \
ray[all]
# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there
ENV DAGSTER_HOME=/dagster_home

WORKDIR /src
WORKDIR $DAGSTER_HOME

COPY examples/docker/run_launcher/pyproject.toml examples/docker/run_launcher/uv.lock ./

COPY pyproject.toml README.md ./
COPY dagster_ray ./dagster_ray

RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install -e .
uv sync --no-editable --frozen

# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there
ENV DAGSTER_HOME=/dagster_home
COPY pyproject.toml README.md /dagster-ray/
COPY dagster_ray /dagster-ray/dagster_ray

WORKDIR $DAGSTER_HOME
RUN uv pip install /dagster-ray

COPY examples/docker/run_launcher/example ./example

RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --inexact

COPY examples/docker/run_launcher/ ./
COPY examples/docker/run_launcher/dagster.yaml examples/docker/run_launcher/workspace.yaml ./
6 changes: 4 additions & 2 deletions examples/docker/run_launcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ docker compose up --build

2. In your browser, open `localhost:3000` to access the Dagster UI and `localhost:8265` to access the Ray dashboard.

3. Launch a job. Observe how the steps are executed in separate Ray jobs in parallel.
3. Launch a job. Observe how the steps are executed in separate Ray jobs in parallel. Inspect the launched Ray jobs in the Ray dashboard.

4. Make changes to `src/definitions.py`.
4. Make changes to files in `example` package.

5. Re-run the job and observe how your changes are immidiately picked up.
31 changes: 23 additions & 8 deletions examples/docker/run_launcher/dagster.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler

run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator

run_launcher:
module: dagster_ray
Expand All @@ -13,12 +6,20 @@ run_launcher:
ray:
address: http://ray-head:8265
runtime_env:
working_dir: /dagster_home
working_dir: "."
env_vars:
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB

scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler

run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator

run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
Expand Down Expand Up @@ -61,5 +62,19 @@ event_log_storage:
env: DAGSTER_POSTGRES_DB
port: 5432


local_artifact_storage:
module: dagster.core.storage.root
class: LocalArtifactStorage
config:
base_dir: /storage


compute_logs:
module: dagster.core.storage.local_compute_log_manager
class: LocalComputeLogManager
config:
base_dir: /logs

telemetry:
enabled: false
50 changes: 0 additions & 50 deletions examples/docker/run_launcher/definitions.py

This file was deleted.

Loading

0 comments on commit f772f42

Please sign in to comment.