Skip to content

Commit

Permalink
Incremental CAgg Refresh Policy
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Mar 4, 2025
1 parent 9115e12 commit e3376bb
Show file tree
Hide file tree
Showing 11 changed files with 438 additions and 46 deletions.
140 changes: 140 additions & 0 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,146 @@ ts_hypertable_create_compressed(Oid table_relid, int32 hypertable_id)
return true;
}

/*
* Get the min value of an open dimension for the hypertable based on the dimension slice info
* Note: only takes non-tiered chunks into account.
*/
int64
ts_hypertable_get_min_dimension_slice(const Hypertable *ht, int dimension_index, bool *isnull)

Check warning on line 2336 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2336

Added line #L2336 was not covered by tests
{
const char *query_str = "\

Check warning on line 2338 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2338

Added line #L2338 was not covered by tests
SELECT \
min(dimsl.range_start) \
FROM \
_timescaledb_catalog.chunk AS srcch \
JOIN _timescaledb_catalog.hypertable AS ht ON ht.id = srcch.hypertable_id \
JOIN _timescaledb_catalog.chunk_constraint AS chcons ON srcch.id = chcons.chunk_id \
JOIN _timescaledb_catalog.dimension AS dim ON srcch.hypertable_id = dim.hypertable_id \
JOIN _timescaledb_catalog.dimension_slice AS dimsl \
ON dim.id = dimsl.dimension_id \
AND chcons.dimension_slice_id = dimsl.id \
WHERE \
ht.id = $1 \
AND dimsl.id = $2 \
AND srcch.osm_chunk IS FALSE";

const Dimension *dim = hyperspace_get_open_dimension(ht->space, dimension_index);

Check warning on line 2354 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2354

Added line #L2354 was not covered by tests

if (NULL == dim)
elog(ERROR, "invalid open dimension index %d", dimension_index);

Oid timetype = ts_dimension_get_partition_type(dim);

Check warning on line 2359 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2359

Added line #L2359 was not covered by tests

Datum values[] = { Int32GetDatum(ht->fd.id), Int32GetDatum(dim->fd.id) };
Oid types[] = { INT4OID, INT4OID };
char nulls[] = { false, false };

Check warning on line 2363 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2361-L2363

Added lines #L2361 - L2363 were not covered by tests

/*
* Query for the oldest chunk in the hypertable.
*/
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

int res = SPI_execute_with_args(query_str,

Check warning on line 2371 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2371

Added line #L2371 was not covered by tests
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not find the minimum time value for hypertable \"%s\"",
get_rel_name(ht->main_table_relid)))));

bool min_isnull;
Datum mindat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &min_isnull);

Check warning on line 2386 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2386

Added line #L2386 was not covered by tests

if (isnull)
*isnull = min_isnull;

Check warning on line 2389 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2389

Added line #L2389 was not covered by tests

/* we fetch the int64 value from the dimension slice catalog. so read it back as int64 */
int64 min_value = min_isnull ? ts_time_get_min(timetype) : DatumGetInt64(mindat);

res = SPI_finish();

Check warning on line 2394 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2394

Added line #L2394 was not covered by tests
if (res != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

return min_value;

Check warning on line 2398 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2398

Added line #L2398 was not covered by tests
}

/*
* Get the max value of an open dimension for the hypertable based on the dimension slice info
* Note: only takes non-tiered chunks into account.
*/
int64
ts_hypertable_get_max_dimension_slice(const Hypertable *ht, int dimension_index, bool *isnull)

Check warning on line 2406 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2406

Added line #L2406 was not covered by tests
{
const char *query_str = "\

Check warning on line 2408 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2408

Added line #L2408 was not covered by tests
SELECT \
max(dimsl.range_start) \
FROM \
_timescaledb_catalog.chunk AS srcch \
JOIN _timescaledb_catalog.hypertable AS ht ON ht.id = srcch.hypertable_id \
JOIN _timescaledb_catalog.chunk_constraint AS chcons ON srcch.id = chcons.chunk_id \
JOIN _timescaledb_catalog.dimension AS dim ON srcch.hypertable_id = dim.hypertable_id \
JOIN _timescaledb_catalog.dimension_slice AS dimsl \
ON dim.id = dimsl.dimension_id \
AND chcons.dimension_slice_id = dimsl.id \
WHERE \
ht.id = $1 \
AND dimsl.id = $2 \
AND srcch.osm_chunk IS FALSE";

const Dimension *dim = hyperspace_get_open_dimension(ht->space, dimension_index);

Check warning on line 2424 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2424

Added line #L2424 was not covered by tests

if (NULL == dim)
elog(ERROR, "invalid open dimension index %d", dimension_index);

Oid timetype = ts_dimension_get_partition_type(dim);

Check warning on line 2429 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2429

Added line #L2429 was not covered by tests

Datum values[] = { Int32GetDatum(ht->fd.id), Int32GetDatum(dim->fd.id) };
Oid types[] = { INT4OID, INT4OID };
char nulls[] = { false, false };

Check warning on line 2433 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2431-L2433

Added lines #L2431 - L2433 were not covered by tests

/*
* Query for the oldest chunk in the hypertable.
*/
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

int res = SPI_execute_with_args(query_str,

Check warning on line 2441 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2441

Added line #L2441 was not covered by tests
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not find the minimum time value for hypertable \"%s\"",
get_rel_name(ht->main_table_relid)))));

bool max_isnull;
Datum maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &max_isnull);

Check warning on line 2456 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2456

Added line #L2456 was not covered by tests

if (isnull)
*isnull = max_isnull;

Check warning on line 2459 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2459

Added line #L2459 was not covered by tests

/* we fetch the int64 value from the dimension slice catalog. so read it back as int64 */
int64 max_value = max_isnull ? ts_time_get_min(timetype) : DatumGetInt64(maxdat);

res = SPI_finish();

Check warning on line 2464 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2464

Added line #L2464 was not covered by tests
if (res != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

return max_value;

Check warning on line 2468 in src/hypertable.c

View check run for this annotation

Codecov / codecov/patch

src/hypertable.c#L2468

Added line #L2468 was not covered by tests
}

/*
* Get the max value of an open dimension.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/hypertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ extern TSDLLEXPORT bool ts_hypertable_set_compressed(Hypertable *ht,
extern TSDLLEXPORT bool ts_hypertable_unset_compressed(Hypertable *ht);
extern TSDLLEXPORT bool ts_hypertable_set_compress_interval(Hypertable *ht,
int64 compress_interval);
extern TSDLLEXPORT int64 ts_hypertable_get_min_dimension_slice(const Hypertable *ht,
int dimension_index, bool *isnull);

extern TSDLLEXPORT int64 ts_hypertable_get_max_dimension_slice(const Hypertable *ht,
int dimension_index, bool *isnull);
extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
int dimension_index, bool *isnull);

Expand Down
44 changes: 44 additions & 0 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1680,3 +1680,47 @@ ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_
return bucket_function->bucket_integer_width;
}
}

/*
* Get the width of a bucket
*/
int64
ts_continuous_agg_bucket_width(const ContinuousAggsBucketFunction *bucket_function)
{
int64 bucket_width;

if (bucket_function->bucket_fixed_interval == false)
{
/*
* There are several cases of variable-sized buckets:
* 1. Monthly buckets
* 2. Buckets with timezones
* 3. Cases 1 and 2 at the same time
*
* For months we simply take 30 days like on interval_to_int64 and
* multiply this number by the number of months in the bucket. This
* reduces the task to days/hours/minutes scenario.
*
* Days/hours/minutes case is handled the same way as for fixed-sized
* buckets. The refresh window at least two buckets in size is adequate
* for such corner cases as DST.
*/

/* bucket_function should always be specified for variable-sized buckets */
Assert(bucket_function != NULL);
/* ... and bucket_function->bucket_time_width too */
Assert(bucket_function->bucket_time_width != NULL);

/* Make a temporary copy of bucket_width */
Interval interval = *bucket_function->bucket_time_width;
interval.day += 30 * interval.month;
interval.month = 0;
bucket_width = ts_interval_value_to_internal(IntervalPGetDatum(&interval), INTERVALOID);
}
else
{
bucket_width = ts_continuous_agg_fixed_bucket_width(bucket_function);
}

return bucket_width;
}
2 changes: 2 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,5 @@ extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg);

extern TSDLLEXPORT int64
ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_function);
extern TSDLLEXPORT int64
ts_continuous_agg_bucket_width(const ContinuousAggsBucketFunction *bucket_function);
34 changes: 1 addition & 33 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,39 +445,7 @@ validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
else
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);

if (cagg->bucket_function->bucket_fixed_interval == false)
{
/*
* There are several cases of variable-sized buckets:
* 1. Monthly buckets
* 2. Buckets with timezones
* 3. Cases 1 and 2 at the same time
*
* For months we simply take 30 days like on interval_to_int64 and
* multiply this number by the number of months in the bucket. This
* reduces the task to days/hours/minutes scenario.
*
* Days/hours/minutes case is handled the same way as for fixed-sized
* buckets. The refresh window at least two buckets in size is adequate
* for such corner cases as DST.
*/

/* bucket_function should always be specified for variable-sized buckets */
Assert(cagg->bucket_function != NULL);
/* ... and bucket_function->bucket_time_width too */
Assert(cagg->bucket_function->bucket_time_width != NULL);

/* Make a temporary copy of bucket_width */
Interval interval = *cagg->bucket_function->bucket_time_width;
interval.day += 30 * interval.month;
interval.month = 0;
bucket_width = ts_interval_value_to_internal(IntervalPGetDatum(&interval), INTERVALOID);
}
else
{
bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function);
}

bucket_width = ts_continuous_agg_bucket_width(cagg->bucket_function);
Assert(bucket_width > 0);

if (ts_time_saturating_add(end_offset, bucket_width * 2, INT8OID) > start_offset)
Expand Down
36 changes: 28 additions & 8 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,32 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
PGC_S_SESSION);
}

continuous_agg_refresh_internal(policy_data.cagg,
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null,
false);
/* Try to split window range into a list of ranges */
List *refresh_window_list = continuous_agg_split_refresh_window(policy_data.cagg,
&policy_data.refresh_window,
0 /* disabled */);
if (refresh_window_list == NIL)
{
refresh_window_list = lappend(refresh_window_list, &policy_data.refresh_window);
}

ListCell *lc;
foreach (lc, refresh_window_list)
{
InternalTimeRange *refresh_window = (InternalTimeRange *) lfirst(lc);
elog(DEBUG1,
"refreshing continuous aggregate \"%s\" from %s to %s",
NameStr(policy_data.cagg->data.user_view_name),
ts_internal_to_time_string(refresh_window->start, refresh_window->type),
ts_internal_to_time_string(refresh_window->end, refresh_window->type));

(void) continuous_agg_refresh_internal(policy_data.cagg,
refresh_window,
CAGG_REFRESH_POLICY,
refresh_window->start_isnull,
refresh_window->end_isnull,
false);
}

if (!policy_data.include_tiered_data_isnull)
{
Expand Down Expand Up @@ -450,10 +470,10 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
{
policy_data->refresh_window.type = dim_type;
policy_data->refresh_window.start = refresh_start;
policy_data->refresh_window.start_isnull = start_isnull;
policy_data->refresh_window.end = refresh_end;
policy_data->refresh_window.end_isnull = end_isnull;
policy_data->cagg = cagg;
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
policy_data->include_tiered_data = include_tiered_data;
policy_data->include_tiered_data_isnull = include_tiered_data_isnull;
}
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ typedef struct PolicyContinuousAggData
InternalTimeRange refresh_window;
ContinuousAgg *cagg;
bool include_tiered_data;
bool start_is_null, end_is_null, include_tiered_data_isnull;
bool include_tiered_data_isnull;
} PolicyContinuousAggData;

typedef struct PolicyCompressionData
Expand Down
7 changes: 4 additions & 3 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>

#include <executor/spi.h>
#include <fmgr.h>
#include <lib/stringinfo.h>
#include <scan_iterator.h>
#include <scanner.h>
#include <time_utils.h>
#include <utils/builtins.h>
#include <utils/date.h>
#include <utils/guc.h>
Expand All @@ -23,6 +21,9 @@
#include "debug_assert.h"
#include "guc.h"
#include "materialize.h"
#include "scan_iterator.h"
#include "scanner.h"
#include "time_utils.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"

Expand Down
2 changes: 2 additions & 0 deletions tsl/src/continuous_aggs/materialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ typedef struct InternalTimeRange
Oid type;
int64 start; /* inclusive */
int64 end; /* exclusive */
bool start_isnull;
bool end_isnull;
} InternalTimeRange;

void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
Expand Down
Loading

0 comments on commit e3376bb

Please sign in to comment.