Skip to content

Commit

Permalink
First rebased version
Browse files Browse the repository at this point in the history
  • Loading branch information
jnidzwetzki committed Mar 14, 2024
1 parent 28615eb commit b1c996b
Show file tree
Hide file tree
Showing 21 changed files with 138 additions and 142 deletions.
2 changes: 1 addition & 1 deletion .unreleased/feature_6382
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Implements: #6382 Support for time_bucket with origin and offset in CAggs
Implements: #6382 Support for time_bucket with origin and offset in CAggs
1 change: 0 additions & 1 deletion sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ CREATE TABLE _timescaledb_catalog.continuous_agg (
user_view_name name NOT NULL,
partial_view_schema name NOT NULL,
partial_view_name name NOT NULL,
bucket_width bigint NOT NULL,
direct_view_schema name NOT NULL,
direct_view_name name NOT NULL,
materialized_only bool NOT NULL DEFAULT FALSE,
Expand Down
5 changes: 4 additions & 1 deletion src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,11 @@ continuous_agg_init(ContinuousAgg *cagg, const Form_continuous_agg fd)
Assert(NULL != cagg_ht);
time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0);
Assert(NULL != time_dim);

cagg->partition_type = ts_dimension_get_partition_type(time_dim);
cagg->relid = get_relname_relid(NameStr(fd->user_view_name), nspid);
memcpy(&cagg->data, fd, sizeof(cagg->data));

Assert(OidIsValid(cagg->relid));
Assert(OidIsValid(cagg->partition_type));

cagg->bucket_function = palloc0(sizeof(ContinuousAggsBucketFunction));
Expand Down
2 changes: 0 additions & 2 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
SetUserIdAndSecContext(saved_uid, saved_secctx); \
} while (0);

#define CAGG_BUCKET_OFFSET_UNDEFINED 0

typedef enum ContinuousAggViewOption
{
ContinuousEnabled = 0,
Expand Down
30 changes: 19 additions & 11 deletions tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

#include "common.h"

#include <utils/date.h>
#include <utils/timestamp.h>

static Const *check_time_bucket_argument(Node *arg, char *position);
static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id,
Oid hypertable_oid, AttrNumber hypertable_partition_colno,
Expand Down Expand Up @@ -60,11 +63,17 @@ caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id, Oid hypert
src->htpartcolno = hypertable_partition_colno;
src->htpartcoltype = hypertable_partition_coltype;
src->htpartcol_interval_len = hypertable_partition_col_interval;
src->bucket_width = 0; /* invalid value */
src->bucket_width_type = InvalidOid; /* invalid oid */
src->interval = NULL; /* not specified by default */
src->timezone = NULL; /* not specified by default */
TIMESTAMP_NOBEGIN(src->origin); /* origin is not specified by default */
src->bucket_width_type = InvalidOid; /* invalid oid */

/* Time based buckets */
src->bucket_time_width = NULL; /* not specified by default */
src->bucket_time_timezone = NULL; /* not specified by default */
src->bucket_time_offset = NULL; /* not specified by default */
TIMESTAMP_NOBEGIN(src->bucket_time_origin); /* origin is not specified by default */

/* Integer based buckets */
src->bucket_integer_width = 0; /* invalid value */
src->bucket_integer_offset = 0; /* invalid value */
}

/*
Expand Down Expand Up @@ -160,25 +169,24 @@ process_additional_timebucket_parameter(CAggTimebucketInfo *tbinfo, Const *arg)
errmsg("invalid timezone name \"%s\"", tz_name)));
}

tbinfo->timezone = tz_name;
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
tbinfo->bucket_time_timezone = tz_name;
break;
case INTERVALOID:
/* Bucket offset as interval */
tbinfo->bucket_offset = DatumGetIntervalP(arg->constvalue);
tbinfo->bucket_time_offset = DatumGetIntervalP(arg->constvalue);
break;
case DATEOID:
/* Bucket origin as Date */
tbinfo->bucket_origin =
tbinfo->bucket_time_origin =
date2timestamptz_opt_overflow(DatumGetDateADT(arg->constvalue), NULL);
break;
case TIMESTAMPOID:
/* Bucket origin as Timestamp */
tbinfo->bucket_origin = DatumGetTimestamp(arg->constvalue);
tbinfo->bucket_time_origin = DatumGetTimestamp(arg->constvalue);
break;
case TIMESTAMPTZOID:
/* Bucket origin as TimestampTZ */
tbinfo->bucket_origin = DatumGetTimestampTz(arg->constvalue);
tbinfo->bucket_time_origin = DatumGetTimestampTz(arg->constvalue);
break;
default:
break;
Expand Down
11 changes: 2 additions & 9 deletions tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
#include <rewrite/rewriteHandler.h>
#include <rewrite/rewriteManip.h>
#include <utils/builtins.h>
#include <utils/date.h>
#include <utils/syscache.h>
#include <utils/timestamp.h>
#include <utils/typcache.h>

#include "errors.h"
Expand Down Expand Up @@ -70,13 +68,8 @@ typedef struct CAggTimebucketInfo
Oid htoid; /* hypertable oid */
AttrNumber htpartcolno; /* primary partitioning column of raw hypertable */
/* This should also be the column used by time_bucket */
Oid htpartcoltype;
int64 htpartcol_interval_len; /* interval length setting for primary partitioning column */
int64 bucket_width; /* bucket_width of time_bucket, stores BUCKET_WIDTH_VARIABLE for
variable-sized buckets */
Oid bucket_width_type; /* type of bucket_width */
Interval *interval; /* stores the interval, NULL if not specified */
const char *timezone; /* the name of the timezone, NULL if not specified */
Oid htpartcoltype; /* The collation type */
int64 htpartcol_interval_len; /* interval length setting for primary partitioning column */

/* General bucket information */
FuncExpr *bucket_func; /* function call expr of the bucketing function */
Expand Down
15 changes: 4 additions & 11 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,9 @@

static void create_cagg_catalog_entry(int32 matht_id, int32 rawht_id, const char *user_schema,
const char *user_view, const char *partial_schema,
const char *partial_view, int64 bucket_width,
Interval *bucket_offset, TimestampTz bucket_origin,
bool materialized_only, const char *direct_schema,
const char *direct_view, const bool finalized,
const int32 parent_mat_hypertable_id);
const char *partial_view, bool materialized_only,
const char *direct_schema, const char *direct_view,
const bool finalized, const int32 parent_mat_hypertable_id);
static void create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_function,
const char *bucket_width, const char *origin,
const char *offset, const char *timezone,
Expand Down Expand Up @@ -127,8 +125,7 @@ static Query *mattablecolumninfo_get_partial_select_query(MatTableColumnInfo *ma
static void
create_cagg_catalog_entry(int32 matht_id, int32 rawht_id, const char *user_schema,
const char *user_view, const char *partial_schema,
const char *partial_view, int64 bucket_width, Interval *bucket_offset,
TimestampTz bucket_origin, bool materialized_only,
const char *partial_view, bool materialized_only,
const char *direct_schema, const char *direct_view, const bool finalized,
const int32 parent_mat_hypertable_id)
{
Expand Down Expand Up @@ -169,7 +166,6 @@ create_cagg_catalog_entry(int32 matht_id, int32 rawht_id, const char *user_schem
NameGetDatum(&partial_schnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_name)] =
NameGetDatum(&partial_viewnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_bucket_width)] = Int64GetDatum(bucket_width);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_schema)] =
NameGetDatum(&direct_schnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_name)] =
Expand Down Expand Up @@ -816,9 +812,6 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
stmt->view->relname,
part_rel->schemaname,
part_rel->relname,
bucket_info->bucket_width,
bucket_info->bucket_offset,
bucket_info->bucket_origin,
materialized_only,
dum_rel->schemaname,
dum_rel->relname,
Expand Down
28 changes: 16 additions & 12 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static Datum internal_to_time_value_or_infinite(int64 internal, Oid time_type,
* materialization support *
***************************/

static void spi_update_materializations(Hypertable *mat_ht, ContinuousAgg *cagg,
static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
Expand All @@ -47,13 +47,15 @@ static void spi_delete_materializations(SchemaAndName materialization_table,
const NameData *time_column_name,
TimeRange invalidation_range,
const char *const chunk_condition);
static void
spi_insert_materializations(Hypertable *mat_ht, ContinuousAgg *cagg, SchemaAndName partial_view,
SchemaAndName materialization_table, const NameData *time_column_name,
TimeRange materialization_range, const char *const chunk_condition);
static void spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
TimeRange materialization_range,
const char *const chunk_condition);

void
continuous_agg_update_materialization(Hypertable *mat_ht, ContinuousAgg *cagg,
continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
Expand Down Expand Up @@ -220,9 +222,10 @@ internal_time_range_to_time_range(InternalTimeRange internal)
}

static void
spi_update_materializations(Hypertable *mat_ht, ContinuousAgg *cagg, SchemaAndName partial_view,
SchemaAndName materialization_table, const NameData *time_column_name,
TimeRange invalidation_range, const int32 chunk_id)
spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, TimeRange invalidation_range,
const int32 chunk_id)
{
StringInfo chunk_condition = makeStringInfo();

Expand Down Expand Up @@ -290,9 +293,10 @@ spi_delete_materializations(SchemaAndName materialization_table, const NameData
}

static void
spi_insert_materializations(Hypertable *mat_ht, ContinuousAgg *cagg, SchemaAndName partial_view,
SchemaAndName materialization_table, const NameData *time_column_name,
TimeRange materialization_range, const char *const chunk_condition)
spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/materialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ typedef struct InternalTimeRange
int64 end; /* exclusive */
} InternalTimeRange;

void continuous_agg_update_materialization(Hypertable *mat_ht, ContinuousAgg *cagg,
void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
Expand Down
62 changes: 24 additions & 38 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ typedef struct CaggRefreshState

static Hypertable *cagg_get_hypertable_or_fail(int32 hypertable_id);
static InternalTimeRange get_largest_bucketed_window(Oid timetype, int64 bucket_width);
static InternalTimeRange compute_inscribed_bucketed_refresh_window(
ContinuousAgg *cagg, const InternalTimeRange *const refresh_window, const int64 bucket_width);
static InternalTimeRange compute_circumscribed_bucketed_refresh_window(
ContinuousAgg *cagg, const InternalTimeRange *const refresh_window, const int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function);
static InternalTimeRange
compute_inscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
const int64 bucket_width);
static InternalTimeRange
compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
const ContinuousAggsBucketFunction *bucket_function);
static void continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
static void continuous_agg_refresh_execute(CaggRefreshState *refresh,
static void continuous_agg_refresh_execute(const CaggRefreshState *refresh,
const InternalTimeRange *bucketed_refresh_window,
const int32 chunk_id);
static void log_refresh_window(int elevel, const ContinuousAgg *cagg,
Expand All @@ -70,7 +73,7 @@ static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const CaggRefreshCallContext callctx);
static ContinuousAgg *get_cagg_by_relid(const Oid cagg_relid);
static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext callctx);
static bool process_cagg_invalidations_and_refresh(ContinuousAgg *cagg,
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
int32 chunk_id);
Expand Down Expand Up @@ -143,7 +146,7 @@ get_largest_bucketed_window(Oid timetype, int64 bucket_width)
* where part of its data were dropped by a retention policy. See #2198 for details.
*/
static InternalTimeRange
compute_inscribed_bucketed_refresh_window(ContinuousAgg *cagg,
compute_inscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
const int64 bucket_width)
{
Expand All @@ -154,9 +157,10 @@ compute_inscribed_bucketed_refresh_window(ContinuousAgg *cagg,
NullableDatum offset = { .value = 0, .isnull = true };
NullableDatum null_datum = { .value = 0, .isnull = true };

if (cagg != NULL && cagg->data.bucket_offset != NULL && cagg->data.bucket_offset != 0)
if (cagg != NULL && cagg->bucket_function != NULL &&
cagg->bucket_function->bucket_time_offset != 0)
{
offset.value = IntervalPGetDatum(cagg->data.bucket_offset);
offset.value = IntervalPGetDatum(cagg->bucket_function->bucket_time_offset);
offset.isnull = true;
}

Expand Down Expand Up @@ -225,13 +229,12 @@ compute_inscribed_bucketed_refresh_window(ContinuousAgg *cagg,
* dropping chunks manually or as part of retention policy.
*/
static InternalTimeRange
compute_circumscribed_bucketed_refresh_window(ContinuousAgg *cagg,
compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
const int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function)
{
/* TODO FIXME */
if (bucket_width == BUCKET_WIDTH_VARIABLE)
if (bucket_function->bucket_fixed_interval == false)
{
InternalTimeRange result = *refresh_window;
ts_compute_circumscribed_bucketed_refresh_window_variable(&result.start,
Expand Down Expand Up @@ -321,7 +324,7 @@ continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg
* refresh state.
*/
static void
continuous_agg_refresh_execute(CaggRefreshState *refresh,
continuous_agg_refresh_execute(const CaggRefreshState *refresh,
const InternalTimeRange *bucketed_refresh_window,
const int32 chunk_id)
{
Expand Down Expand Up @@ -415,7 +418,7 @@ update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window,
}

static long
continuous_agg_scan_refresh_window_ranges(ContinuousAgg *cagg,
continuous_agg_scan_refresh_window_ranges(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations,
const ContinuousAggsBucketFunction *bucket_function,
Expand Down Expand Up @@ -452,10 +455,7 @@ continuous_agg_scan_refresh_window_ranges(ContinuousAgg *cagg,
};

InternalTimeRange bucketed_refresh_window =
compute_circumscribed_bucketed_refresh_window(cagg,
&invalidation,
bucket_width,
bucket_function);
compute_circumscribed_bucketed_refresh_window(cagg, &invalidation, bucket_function);

(*exec_func)(&bucketed_refresh_window, callctx, count, func_arg1, func_arg2);

Expand Down Expand Up @@ -494,22 +494,12 @@ continuous_agg_scan_refresh_window_ranges(ContinuousAgg *cagg,
* as illustrated above.
*/
static void
<<<<<<< HEAD
continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations, int32 chunk_id,
const bool do_merged_refresh,
<<<<<<< HEAD
const InternalTimeRange merged_refresh_window,
const CaggRefreshCallContext callctx)
=======
=======
continuous_agg_refresh_with_window(ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations, const int64 bucket_width,
int32 chunk_id, const bool do_merged_refresh,
>>>>>>> fd503c804 (Support for CAgg with origin/offset parameter)
const InternalTimeRange merged_refresh_window)
>>>>>>> cb1a2f41b (Support for CAgg with origin/offset parameter)
{
CaggRefreshState refresh;

Expand Down Expand Up @@ -668,7 +658,8 @@ continuous_agg_calculate_merged_refresh_window(const int32 raw_hypertable_id,
}

static bool
process_cagg_invalidations_and_refresh(ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx, int32 chunk_id)
{
InvalidationStore *invalidations;
Expand Down Expand Up @@ -723,7 +714,8 @@ process_cagg_invalidations_and_refresh(ContinuousAgg *cagg, const InternalTimeRa
}

void
continuous_agg_refresh_internal(ContinuousAgg *cagg, const InternalTimeRange *refresh_window_arg,
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
{
Expand Down Expand Up @@ -773,13 +765,7 @@ continuous_agg_refresh_internal(ContinuousAgg *cagg, const InternalTimeRange *re
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function);
Assert(bucket_width > 0);
refresh_window =
<<<<<<< HEAD
compute_inscribed_bucketed_refresh_window(refresh_window_arg, bucket_width);
=======
compute_inscribed_bucketed_refresh_window(cagg,
refresh_window_arg,
ts_continuous_agg_bucket_width(cagg));
>>>>>>> fd503c804 (Support for CAgg with origin/offset parameter)
compute_inscribed_bucketed_refresh_window(cagg, refresh_window_arg, bucket_width);
}
}

Expand Down
Loading

0 comments on commit b1c996b

Please sign in to comment.