Skip to content

Commit

Permalink
Fallback to single refresh if estimated number of batches is greater …
Browse files Browse the repository at this point in the history
…than timescaledb.materializations_per_refresh_window
  • Loading branch information
fabriziomello committed Mar 3, 2025
1 parent 2131d6b commit ae74bdd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1723,4 +1723,4 @@ ts_continuous_agg_bucket_width(const ContinuousAggsBucketFunction *bucket_functi
}

return bucket_width;
}
}
17 changes: 14 additions & 3 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,6 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
// {
// min_bucket_start = ts_time_get_nobegin(refresh_window.type);
// }


/* If refresh window range start is NULL then get the first bucket from the original hypertable
*/
Expand Down Expand Up @@ -911,6 +910,18 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
// log_refresh_window(DEBUG1, context->cagg, &context->materialization_range, "splitting");
// return refresh_window_list;

int64 estimated_batches =
(refresh_window.end - refresh_window.start) / (bucket_width * range_factor);
if (estimated_batches > ts_guc_cagg_max_individual_materializations)
{
// elog(INFO, "Fallback to single refresh window: %ld", estimated_batches);
refresh_window_list =
lappend(refresh_window_list, &context->internal_materialization_range);
return refresh_window_list;
}

log_refresh_window(INFO, context->cagg, &refresh_window, "before produce ranges");

const Dimension *time_dim;
// Hypertable *ht = cagg_get_hypertable_or_fail(context->cagg->data.raw_hypertable_id);
time_dim = hyperspace_get_open_dimension(ht->space, 0);
Expand All @@ -931,14 +942,14 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
) \
SELECT \
refresh_start AS start, \
LEAST($5, refresh_start + $3) AS end \
LEAST($5::numeric, refresh_start::numeric + $3::numeric)::bigint AS end \
FROM \
pg_catalog.generate_series($4, $5, $3) AS refresh_start \
WHERE \
EXISTS ( \
SELECT FROM chunk_ranges \
WHERE \
pg_catalog.int8range(refresh_start, refresh_start + $3) \
pg_catalog.int8range(refresh_start, LEAST($5::numeric, refresh_start::numeric + $3::numeric)::bigint) \
OPERATOR(pg_catalog.&&) \
pg_catalog.int8range(chunk_ranges.start, chunk_ranges.end) \
);";
Expand Down

0 comments on commit ae74bdd

Please sign in to comment.