Skip to content

Commit

Permalink
[dagster-aws] fix log location for non-yarn EMR (#27100)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Resolve #27050
  • Loading branch information
danielgafni authored Jan 22, 2025
1 parent 4fe3c17 commit a02e4fa
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 9 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,25 @@ class PipesEMRClient(PipesClient, TreatAsResourceParam):
context into AWS EMR job. Defaults to :py:class:`PipesEnvContextInjector`.
forward_termination (bool): Whether to cancel the EMR job if the Dagster process receives a termination signal.
wait_for_s3_logs_seconds (int): The number of seconds to wait for S3 logs to be written after execution completes.
s3_application_logs_prefix (str): The prefix to use when looking for application logs in S3.
Defaults to `containers`. Another common value is `steps` (for non-yarn clusters).
"""

def __init__(
self,
message_reader: PipesMessageReader,
client=None,
client: Optional["EMRClient"] = None,
context_injector: Optional[PipesContextInjector] = None,
forward_termination: bool = True,
wait_for_s3_logs_seconds: int = 10,
s3_application_logs_prefix: str = "containers",
):
self._client = client or boto3.client("emr")
self._client: EMRClient = cast("EMRClient", client or boto3.client("emr"))
self._message_reader = message_reader
self._context_injector = context_injector or PipesEnvContextInjector()
self.forward_termination = check.bool_param(forward_termination, "forward_termination")
self.wait_for_s3_logs_seconds = wait_for_s3_logs_seconds
self.s3_application_logs_prefix = s3_application_logs_prefix

@property
def client(self) -> "EMRClient":
Expand Down Expand Up @@ -174,7 +178,7 @@ def _wait_for_completion(

cluster = self._client.describe_cluster(ClusterId=cluster_id)

state: ClusterStateType = cluster["Cluster"]["Status"]["State"]
state: ClusterStateType = cluster["Cluster"]["Status"]["State"] # type: ignore

context.log.info(f"[pipes] EMR cluster {cluster_id} completed with state: {state}")

Expand Down Expand Up @@ -250,9 +254,12 @@ def _read_application_logs(
bucket = logs_uri.split("/")[2]
prefix = "/".join(logs_uri.split("/")[3:])

# discover container (application) logs (e.g. Python logs) and forward all of them
# ex. /containers/application_1727881613116_0001/container_1727881613116_0001_01_000001/stdout.gz
containers_prefix = os.path.join(prefix, f"{cluster_id}/containers/")
# discover application logs (e.g. Python logs) and forward all of them
# ex. /containers/application_1727881613116_0001/container_1727881613116_0001_01_000001/stdout.gz for Yarn
# or /steps/... for EMR steps
application_logs_prefix = os.path.join(
prefix, f"{cluster_id}/{self.s3_application_logs_prefix}/"
)

context.log.debug(
f"[pipes] Waiting for {self.wait_for_s3_logs_seconds} seconds to allow EMR to dump all logs to S3. "
Expand All @@ -262,13 +269,13 @@ def _read_application_logs(
time.sleep(self.wait_for_s3_logs_seconds) # give EMR a chance to dump all logs to S3

context.log.debug(
f"[pipes] Looking for application logs in s3://{os.path.join(bucket, containers_prefix)}"
f"[pipes] Looking for application logs in s3://{os.path.join(bucket, application_logs_prefix)}"
)

all_keys = [
obj["Key"]
for obj in self.message_reader.client.list_objects_v2(
Bucket=bucket, Prefix=containers_prefix
Bucket=bucket, Prefix=application_logs_prefix
)["Contents"]
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def __init__(
forward_termination: bool = True,
poll_interval: float = 5.0,
):
self._client = client or boto3.client("emr-serverless")
self._client: EMRServerlessClient = cast(
"EMRServerlessClient", client or boto3.client("emr-serverless")
)
self._context_injector = context_injector or PipesEnvContextInjector()
self._message_reader = message_reader or PipesCloudWatchMessageReader()
self.forward_termination = check.bool_param(forward_termination, "forward_termination")
Expand Down

1 comment on commit a02e4fa

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs-legacy ready!

✅ Preview
https://dagster-docs-legacy-bvho8xo6x-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit a02e4fa.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.