Skip to content

Commit

Permalink
chore(data-warehouse): Manually handle garbage collection of large ob…
Browse files Browse the repository at this point in the history
…jects (#27203)
  • Loading branch information
Gilbert09 authored Dec 31, 2024
1 parent 1a4c624 commit 8d12645
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 37 deletions.
84 changes: 48 additions & 36 deletions posthog/temporal/data_imports/pipelines/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gc
import time
from typing import Any
import pyarrow as pa
Expand Down Expand Up @@ -50,51 +51,62 @@ def __init__(self, source: DltSource, logger: FilteringBoundLogger, job_id: str,
self._internal_schema = HogQLSchema()

def run(self):
buffer: list[Any] = []
chunk_size = 5000
row_count = 0
chunk_index = 0

for item in self._resource:
try:
buffer: list[Any] = []
py_table = None
chunk_size = 5000
row_count = 0
chunk_index = 0

if isinstance(item, list):
if len(buffer) > 0:
buffer.extend(item)
if len(buffer) >= chunk_size:
py_table = table_from_py_list(buffer)
buffer = []
else:
if len(item) >= chunk_size:
py_table = table_from_py_list(item)
else:
for item in self._resource:
py_table = None

if isinstance(item, list):
if len(buffer) > 0:
buffer.extend(item)
if len(buffer) >= chunk_size:
py_table = table_from_py_list(buffer)
buffer = []
else:
if len(item) >= chunk_size:
py_table = table_from_py_list(item)
else:
buffer.extend(item)
continue
elif isinstance(item, dict):
buffer.append(item)
if len(buffer) < chunk_size:
continue
elif isinstance(item, dict):
buffer.append(item)
if len(buffer) < chunk_size:
continue

py_table = table_from_py_list(buffer)
buffer = []
elif isinstance(item, pa.Table):
py_table = item
else:
raise Exception(f"Unhandled item type: {item.__class__.__name__}")

assert py_table is not None
py_table = table_from_py_list(buffer)
buffer = []
elif isinstance(item, pa.Table):
py_table = item
else:
raise Exception(f"Unhandled item type: {item.__class__.__name__}")

self._process_pa_table(pa_table=py_table, index=chunk_index)
assert py_table is not None

row_count += py_table.num_rows
chunk_index += 1
self._process_pa_table(pa_table=py_table, index=chunk_index)

if len(buffer) > 0:
py_table = table_from_py_list(buffer)
self._process_pa_table(pa_table=py_table, index=chunk_index)
row_count += py_table.num_rows
row_count += py_table.num_rows
chunk_index += 1

self._post_run_operations(row_count=row_count)
if len(buffer) > 0:
py_table = table_from_py_list(buffer)
self._process_pa_table(pa_table=py_table, index=chunk_index)
row_count += py_table.num_rows

self._post_run_operations(row_count=row_count)
finally:
# Help reduce the memory footprint of each job
del self._resource
del self._delta_table_helper
if "buffer" in locals() and buffer is not None:
del buffer
if "py_table" in locals() and py_table is not None:
del py_table
gc.collect()

def _process_pa_table(self, pa_table: pa.Table, index: int):
delta_table = self._delta_table_helper.get_delta_table()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ def _run(
reset_pipeline: bool,
):
if settings.TEMPORAL_TASK_QUEUE == DATA_WAREHOUSE_TASK_QUEUE_V2:
PipelineNonDLT(source, logger, job_inputs.run_id, schema.is_incremental).run()
pipeline = PipelineNonDLT(source, logger, job_inputs.run_id, schema.is_incremental)
pipeline.run()
del pipeline
else:
table_row_counts = DataImportPipelineSync(
job_inputs, source, logger, reset_pipeline, schema.is_incremental
Expand Down

0 comments on commit 8d12645

Please sign in to comment.