Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 11332 to 1.9.latest #11349

Open
wants to merge 1 commit into
base: 1.9.latest
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250303-131440.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix microbatch execution to not block main thread nor hang
time: 2025-03-03T13:14:40.432874-06:00
custom:
Author: QMalcolm
Issue: 11243 11306
18 changes: 18 additions & 0 deletions core/dbt/graph/thread_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from multiprocessing.pool import ThreadPool


class DbtThreadPool(ThreadPool):
"""A ThreadPool that tracks whether or not it's been closed"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.closed = False

def close(self):
self.closed = True
super().close()

def is_closed(self):
return self.closed
9 changes: 5 additions & 4 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:

return batches

def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
@staticmethod
def build_jinja_context_for_batch(model: ModelNode, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state
Expand All @@ -109,9 +110,9 @@ def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, An
jinja_context: Dict[str, Any] = {}

# Microbatch model properties
jinja_context["model"] = self.model.to_dict()
jinja_context["sql"] = self.model.compiled_code
jinja_context["compiled_code"] = self.model.compiled_code
jinja_context["model"] = model.to_dict()
jinja_context["sql"] = model.compiled_code
jinja_context["compiled_code"] = model.compiled_code

# Add incremental context variables for batches running incrementally
if incremental_batch:
Expand Down
12 changes: 7 additions & 5 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
return self.handle_microbatch_model(runner, pool)
runner.set_parent_task(self)
runner.set_pool(pool)

Check warning on line 173 in core/dbt/task/build.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/build.py#L172-L173

Added lines #L172 - L173 were not covered by tests

return self.call_runner(runner)

Expand All @@ -184,10 +185,11 @@
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
callback(self.handle_microbatch_model(runner, pool))
else:
args = [runner]
self._submit(pool, args, callback)
runner.set_parent_task(self)
runner.set_pool(pool)

args = [runner]
self._submit(pool, args, callback)

# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.
Expand Down
Loading
Loading