Skip to content

Commit

Permalink
Adding data_modeling_job mechanism.
Browse files Browse the repository at this point in the history
  • Loading branch information
phixMe committed Feb 25, 2025
1 parent 4426606 commit feaaba4
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 4 deletions.
57 changes: 57 additions & 0 deletions posthog/migrations/0676_datamodelingjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Generated by Django 4.2.18 on 2025-02-25 21:55

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
import posthog.models.utils


class Migration(migrations.Migration):
dependencies = [
("posthog", "0675_add_playlist_viewed"),
]

operations = [
migrations.CreateModel(
name="DataModelingJob",
fields=[
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True, null=True)),
(
"id",
models.UUIDField(
default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False
),
),
(
"status",
models.CharField(
choices=[("Running", "Running"), ("Completed", "Completed"), ("Failed", "Failed")],
default="Running",
max_length=400,
),
),
("rows_materialized", models.IntegerField(default=0)),
("error", models.TextField(blank=True, null=True)),
("workflow_id", models.CharField(blank=True, max_length=400, null=True)),
("last_run_at", models.DateTimeField(default=django.utils.timezone.now)),
(
"created_by",
models.ForeignKey(
blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL
),
),
(
"saved_query",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to="posthog.datawarehousesavedquery"
),
),
("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team")),
],
options={
"abstract": False,
},
),
]
2 changes: 1 addition & 1 deletion posthog/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0675_add_playlist_viewed
0676_datamodelingjob
36 changes: 33 additions & 3 deletions posthog/temporal/data_modeling/run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from posthog.warehouse.util import database_sync_to_async
from posthog.warehouse.data_load.create_table import create_table_from_saved_query
from posthog.temporal.data_imports.util import prepare_s3_files_for_querying
from posthog.warehouse.models.data_modeling_job import DataModelingJob

logger = structlog.get_logger()

Expand Down Expand Up @@ -262,9 +263,18 @@ async def handle_model_ready(model: ModelNode, team_id: int, queue: asyncio.Queu
try:
if model.selected is True:
team = await database_sync_to_async(Team.objects.get)(id=team_id)
await materialize_model(model.label, team)
# Get the workflow ID from the activity info
workflow_id = temporalio.activity.info().workflow_id
key, delta_table, job_id = await materialize_model(model.label, team, workflow_id)
except Exception as err:
await logger.aexception("Failed to materialize model %s due to error: %s", model.label, str(err))

if job_id:
job = await database_sync_to_async(DataModelingJob.objects.get)(id=job_id)
job.status = DataModelingJob.Status.FAILED
job.error = str(err)
await database_sync_to_async(job.save)()

await queue.put(QueueMessage(status=ModelStatus.FAILED, label=model.label))
else:
await logger.ainfo("Materialized model %s", model.label)
Expand All @@ -273,14 +283,15 @@ async def handle_model_ready(model: ModelNode, team_id: int, queue: asyncio.Queu
queue.task_done()


async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTable]:
async def materialize_model(model_label: str, team: Team, workflow_id: str) -> tuple[str, DeltaTable, uuid.UUID]:
"""Materialize a given model by running its query in a dlt pipeline.
Arguments:
model_label: A label representing the ID or the name of the model to materialize.
If it's a valid UUID, then we will assume it's the ID, otherwise we'll assume
it is the model's name.
team: The team the model belongs to.
workflow_id: The ID of the workflow running the materialization.
"""
filter_params: dict[str, str | uuid.UUID] = {}
try:
Expand All @@ -294,6 +305,18 @@ async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTab
DataWarehouseSavedQuery.objects.prefetch_related("team").filter(team=team, **filter_params).get
)()

if saved_query.created_by_id is not None:
created_by_id = saved_query.created_by_id

# Create a job record for this materialization event
job = await database_sync_to_async(DataModelingJob.objects.create)(
team=team,
saved_query=saved_query,
status=DataModelingJob.Status.RUNNING,
workflow_id=workflow_id,
created_by_id=created_by_id,
)

query_columns = saved_query.columns
if not query_columns:
query_columns = await database_sync_to_async(saved_query.get_columns)()
Expand Down Expand Up @@ -338,10 +361,17 @@ async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTab

key, delta_table = tables.popitem()

# Count rows and update both DataWarehouseTable and DataModelingJob
row_count = await asyncio.to_thread(count_delta_table_rows, delta_table)
await update_table_row_count(saved_query, row_count)

return (key, delta_table)
# Update the job record with the row count and completed status
job.rows_materialized = row_count
job.status = DataModelingJob.Status.COMPLETED
job.last_run_at = dt.datetime.now(dt.UTC)
await database_sync_to_async(job.save)()

return (key, delta_table, job.id)


def count_delta_table_rows(delta_table: DeltaTable) -> int:
Expand Down
1 change: 1 addition & 0 deletions posthog/warehouse/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
from .modeling import *
from .table import *
from .query_tab_state import *
from .data_modeling_job import *
21 changes: 21 additions & 0 deletions posthog/warehouse/models/data_modeling_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.db import models
from django.utils import timezone
from posthog.models.utils import CreatedMetaFields, UUIDModel, UpdatedMetaFields


class DataModelingJob(CreatedMetaFields, UpdatedMetaFields, UUIDModel):
class Status(models.TextChoices):
RUNNING = "Running", "Running"
COMPLETED = "Completed", "Completed"
FAILED = "Failed", "Failed"

team = models.ForeignKey("posthog.Team", on_delete=models.CASCADE)
saved_query = models.ForeignKey("posthog.DataWarehouseSavedQuery", on_delete=models.CASCADE)
status = models.CharField(max_length=400, choices=Status.choices, default=Status.RUNNING)
rows_materialized = models.IntegerField(default=0)
error = models.TextField(null=True, blank=True)
workflow_id = models.CharField(max_length=400, null=True, blank=True)
last_run_at = models.DateTimeField(default=timezone.now)

def __str__(self) -> str:
return f"{self.saved_query.name} - {self.status}"

0 comments on commit feaaba4

Please sign in to comment.