Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-vguttha committed Feb 18, 2025
1 parent de4582e commit e444f33
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 22 deletions.
6 changes: 3 additions & 3 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from multiprocessing.pool import ThreadPool
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

from opentelemetry import context, trace
from opentelemetry import trace

from dbt import tracking, utils
from dbt.adapters.base import BaseAdapter, BaseRelation
Expand Down Expand Up @@ -700,6 +700,7 @@ def __init__(
) -> None:
super().__init__(args, config, manifest)
self.batch_map = batch_map
self._dbt_tracer = trace.get_tracer("com.dbt.runner")

def raise_on_first_error(self) -> bool:
return False
Expand Down Expand Up @@ -891,8 +892,7 @@ def safe_run_hooks(
if num_hooks == 0:
return status

tracer = trace.get_tracer("dbt-runner")
with tracer.start_as_current_span(hook_type, context=context.get_current()) as _:
with self._dbt_tracer.start_as_current_span(hook_type) as _:
for idx, hook in enumerate(ordered_hooks, 1):
with log_contextvars(node_info=hook.node_info):
hook.index = idx
Expand Down
31 changes: 13 additions & 18 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Type, Union

from opentelemetry import context, trace
from opentelemetry.trace import SpanContext, StatusCode
from opentelemetry.trace import Link, SpanContext, StatusCode

import dbt.exceptions
import dbt.tracking
Expand Down Expand Up @@ -95,6 +95,7 @@ def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> No
self.run_count: int = 0
self.started_at: float = 0
self._node_span_context_mapping: Dict[str, SpanContext] = {}
self._dbt_tracer = trace.get_tracer("com.dbt.runner")

if self.args.state:
self.previous_state = PreviousState(
Expand Down Expand Up @@ -227,25 +228,21 @@ def get_runner(self, node) -> BaseRunner:
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner: BaseRunner, parent_context=None) -> RunResult:
tracer = trace.get_tracer("dbt-runner")
node_info = runner.node.node_info
if parent_context is None:
parent_context = context.get_current()
model_span = tracer.start_span(node_info["unique_id"], context=parent_context)
ctx = trace.set_span_in_context(model_span)
token = context.attach(ctx)
self._node_span_context_mapping[node_info["unique_id"]] = model_span.get_span_context()
links = []
if hasattr(runner.node.depends_on, "nodes"):
for parent_node in runner.node.depends_on.nodes:
if parent_node in self._node_span_context_mapping:
try:
model_span.add_link(
links.append(
Link(
self._node_span_context_mapping[parent_node],
{"model_name": parent_node},
)
except Exception:
pass
with log_contextvars(node_info=node_info):
{"parent_model_fqn": parent_node},
),
)
with log_contextvars(node_info=node_info), self._dbt_tracer.start_as_current_span(
node_info["unique_id"], context=parent_context, links=links
) as node_span:
self._node_span_context_mapping[node_info["unique_id"]] = node_span.get_span_context()
runner.node.update_event_status(
started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started
)
Expand All @@ -265,9 +262,7 @@ def call_runner(self, runner: BaseRunner, parent_context=None) -> RunResult:
thread_exception = e
finally:
if result.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.PartialSuccess):
model_span.set_status(StatusCode.ERROR)
context.detach(token)
model_span.end()
node_span.set_status(StatusCode.ERROR)
if result is not None:
fire_event(
NodeFinished(
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ types-pytz
types-requests
types-setuptools
mocker
opentelemetry-api>=1.23.0
opentelemetry-api

0 comments on commit e444f33

Please sign in to comment.