Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow sampling from snapshots and of snapshots #11311

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20250214-152957.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow sampling nodes snapshots depend on and of snapshots as a dependency
time: 2025-02-14T15:29:57.118017-06:00
custom:
Author: QMalcolm
Issue: "11301"
12 changes: 5 additions & 7 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
Resource,
SeedNode,
SemanticModel,
SnapshotNode,
SourceDefinition,
UnitTestNode,
)
Expand Down Expand Up @@ -253,18 +254,15 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF

# Only do event time filtering if the base node has the necessary event time configs
if (
(
isinstance(target.config, NodeConfig)
or isinstance(target.config, SourceConfig)
or isinstance(target.config, SeedConfig)
)
isinstance(target.config, (NodeConfig, SeedConfig, SourceConfig))
and target.config.event_time
and isinstance(self.model, ModelNode)
and isinstance(self.model, (ModelNode, SnapshotNode))
):

# Handling of microbatch models
if (
self.model.config.materialized == "incremental"
isinstance(self.model, ModelNode)
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
Expand Down
127 changes: 127 additions & 0 deletions tests/functional/sample_mode/test_sample_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@
{% endif %}
"""

snapshot_input_model_sql = """
{% snapshot snapshot_input_model %}
{{ config(strategy='timestamp', unique_key='id', updated_at='event_time', event_time='event_time') }}

select * from {{ ref('input_model') }}
{% endsnapshot %}
"""

model_from_snapshot_sql = """
{{ config(materialized='table') }}

SELECT * FROM {{ ref('snapshot_input_model') }}
"""


class BaseSampleMode:
# TODO This is now used in 3 test files, it might be worth turning into a full test utility method
Expand All @@ -108,6 +122,10 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)

assert result[0] == expected_row_count

def drop_table(self, project, relation_name: str):
relation = relation_from_name(project.adapter, "snapshot_input_model")
project.run_sql(f"drop table if exists {relation}")


class TestBasicSampleMode(BaseSampleMode):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -440,3 +458,112 @@ def test_sample_mode(
relation_name="sample_input_seed",
expected_row_count=expected_row_count,
)


class TestSamplingModelFromSnapshot(BaseSampleMode):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"snapshot_input_model.sql": snapshot_input_model_sql,
}

@pytest.mark.parametrize(
"sample_mode_available,run_sample_mode,expected_row_count",
[
(True, True, 2),
(True, False, 3),
(False, True, 3),
(False, False, 3),
],
)
@freezegun.freeze_time("2025-01-03T02:03:0Z")
def test_sample_mode(
self,
project,
mocker: MockerFixture,
sample_mode_available: bool,
run_sample_mode: bool,
expected_row_count: int,
):
run_args = ["build"]
if run_sample_mode:
run_args.append("--sample=1 day")

if sample_mode_available:
mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_SAMPLE_MODE": "1"})

_ = run_dbt(run_args)
self.assert_row_count(
project=project,
relation_name="snapshot_input_model",
expected_row_count=expected_row_count,
)
self.drop_table(project, "snapshot_input_model")


class TestSamplingSnapshot(BaseSampleMode):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"snapshot_input_model.sql": snapshot_input_model_sql,
}

@pytest.mark.parametrize(
"sample_mode_available,run_sample_mode,expected_row_count",
[
(True, True, 2),
(True, False, 3),
(False, True, 3),
(False, False, 3),
],
)
@freezegun.freeze_time("2025-01-03T02:03:0Z")
def test_sample_mode(
self,
project,
mocker: MockerFixture,
sample_mode_available: bool,
run_sample_mode: bool,
expected_row_count: int,
):
run_args = ["build"]

# create the snapshot before building a model that depends on it
_ = run_dbt(run_args)
# Snapshot should always have 3 in this test because we don't sample it
self.assert_row_count(
project=project,
relation_name="snapshot_input_model",
expected_row_count=3,
)

if run_sample_mode:
run_args.append("--sample=1 day")

if sample_mode_available:
mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_SAMPLE_MODE": "1"})

# create model that depends on the snapshot
write_file(
model_from_snapshot_sql, project.project_root, "models", "model_from_snapshot.sql"
)

_ = run_dbt(run_args)
self.assert_row_count(
project=project,
relation_name="model_from_snapshot",
expected_row_count=expected_row_count,
)
self.drop_table(project, "snapshot_input_model")
64 changes: 57 additions & 7 deletions tests/unit/context/test_providers.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import os
from argparse import Namespace
from datetime import datetime
from typing import Any, Optional
from typing import Any, Optional, Type, Union
from unittest import mock

import pytest
import pytz
from pytest_mock import MockerFixture

from dbt.adapters.base import BaseRelation
from dbt.artifacts.resources import NodeConfig, Quoting, SeedConfig
from dbt.artifacts.resources import NodeConfig, Quoting, SeedConfig, SnapshotConfig
from dbt.artifacts.resources.types import BatchSize
from dbt.context.providers import (
BaseResolver,
EventTimeFilter,
RuntimeRefResolver,
RuntimeSourceResolver,
)
from dbt.contracts.graph.nodes import BatchContext, ModelNode
from dbt.contracts.graph.nodes import BatchContext, ModelNode, SnapshotNode
from dbt.event_time.sample_window import SampleWindow
from dbt.flags import set_from_args

Expand Down Expand Up @@ -46,7 +46,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
assert resolver.resolve_limit == expected_resolve_limit

@pytest.mark.parametrize(
"use_microbatch_batches,materialized,incremental_strategy,sample_mode_available,sample,resolver_model_node,target_type,expect_filter",
"use_microbatch_batches,materialized,incremental_strategy,sample_mode_available,sample,resolver_model_node,target_type,resolver_model_type,expect_filter",
[
# Microbatch model without sample
(
Expand All @@ -57,6 +57,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
None,
True,
NodeConfig,
ModelNode,
True,
),
# Microbatch model with sample
Expand All @@ -71,6 +72,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
),
True,
NodeConfig,
ModelNode,
True,
),
# Normal model with sample
Expand All @@ -85,6 +87,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
),
True,
NodeConfig,
ModelNode,
True,
),
# Incremental merge model with sample
Expand All @@ -99,6 +102,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
),
True,
NodeConfig,
ModelNode,
True,
),
# Normal model with sample, but sample mode not available
Expand All @@ -113,6 +117,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
),
True,
NodeConfig,
ModelNode,
False,
),
# Sample, but not model node
Expand All @@ -127,6 +132,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
),
False,
NodeConfig,
ModelNode,
False,
),
# Microbatch, but not model node
Expand All @@ -138,6 +144,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
None,
False,
NodeConfig,
ModelNode,
False,
),
# Mircrobatch model, but not using batches
Expand All @@ -149,6 +156,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
None,
True,
NodeConfig,
ModelNode,
False,
),
# Non microbatch model, but supposed to use batches
Expand All @@ -160,10 +168,11 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
None,
True,
NodeConfig,
ModelNode,
False,
),
# Incremental merge
(True, "incremental", "merge", False, None, True, NodeConfig, False),
(True, "incremental", "merge", False, None, True, NodeConfig, ModelNode, False),
# Target seed node, with sample
(
False,
Expand All @@ -173,6 +182,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
SampleWindow.from_relative_string("2 days"),
True,
SeedConfig,
ModelNode,
True,
),
# Target seed node, with sample, but sample mode not availavle
Expand All @@ -184,10 +194,49 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit):
SampleWindow.from_relative_string("2 days"),
True,
SeedConfig,
ModelNode,
False,
),
# Target seed node, without sample, but sample mode availavle
(False, "table", None, True, None, True, SeedConfig, False),
(False, "table", None, True, None, True, SeedConfig, ModelNode, False),
# Sample model from snapshot node
(
False,
"table",
None,
True,
SampleWindow.from_relative_string("2 days"),
True,
NodeConfig,
SnapshotNode,
True,
),
# Try to sample model from snapshot, but sample mode not available
(
False,
"table",
None,
False,
SampleWindow.from_relative_string("2 days"),
True,
NodeConfig,
SnapshotNode,
False,
),
# Target model from snapshot, without sample, but sample mode availavle
(False, "table", None, True, None, True, NodeConfig, SnapshotNode, False),
# Target snapshot from model, with sample
(
False,
"table",
None,
True,
SampleWindow.from_relative_string("2 days"),
True,
SnapshotConfig,
ModelNode,
True,
),
],
)
def test_resolve_event_time_filter(
Expand All @@ -201,6 +250,7 @@ def test_resolve_event_time_filter(
sample: Optional[SampleWindow],
resolver_model_node: bool,
target_type: Any,
resolver_model_type: Union[Type[ModelNode], Type[SnapshotNode]],
expect_filter: bool,
) -> None:
# Target mocking
Expand All @@ -217,7 +267,7 @@ def test_resolve_event_time_filter(
resolver.config.args.EVENT_TIME_START = None
resolver.config.args.sample = sample
if resolver_model_node:
resolver.model = mock.MagicMock(spec=ModelNode)
resolver.model = mock.MagicMock(spec=resolver_model_type)
resolver.model.batch = BatchContext(
id="1",
event_time_start=datetime(2024, 1, 1, tzinfo=pytz.UTC),
Expand Down
Loading