Skip to content

Commit

Permalink
bugfix: nodeAgg.c didn't create a part of transition states if multip…
Browse files Browse the repository at this point in the history
…le aggregate functions are used

this problem was reported at heterodb#614
  • Loading branch information
kaigai committed Jul 21, 2023
1 parent d2ed8fb commit db5d301
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 23 deletions.
2 changes: 0 additions & 2 deletions src/aggfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ pgstrom_favg_trans_int(PG_FUNCTION_ARGS)
if (!PG_ARGISNULL(1))
{
arg = (kagg_state__pavg_int_packed *)PG_GETARG_BYTEA_P(1);

state->nitems += arg->nitems;
state->sum += arg->sum;
}
Expand Down Expand Up @@ -367,7 +366,6 @@ pgstrom_favg_trans_fp(PG_FUNCTION_ARGS)
if (!PG_ARGISNULL(1))
{
arg = (kagg_state__pavg_fp_packed *)PG_GETARG_BYTEA_P(1);

state->nitems += arg->nitems;
state->sum += arg->sum;
}
Expand Down
51 changes: 31 additions & 20 deletions src/gpu_preagg.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ static aggfunc_catalog_t aggfunc_catalog_array[] = {
KAGG_ACTION__PMIN_INT64, false
},
{"min(float2)",
"s:min_f2(bytea)",
"s:min_f2(bytea)",
"s:pmin(float8)",
KAGG_ACTION__PMIN_FP64, false
},
{"min(float4)",
"s:min_f4(bytea)",
"s:min_f4(bytea)",
"s:pmin(float8)",
KAGG_ACTION__PMIN_FP64, false
},
{"min(float8)",
"s:min_f8(bytea)",
"s:min_f8(bytea)",
"s:pmin(float8)",
KAGG_ACTION__PMIN_FP64, false
},
Expand Down Expand Up @@ -145,17 +145,17 @@ static aggfunc_catalog_t aggfunc_catalog_array[] = {
KAGG_ACTION__PMAX_INT64, false
},
{"max(float2)",
"s:max_f2(bytea)",
"s:max_f2(bytea)",
"s:pmax(float8)",
KAGG_ACTION__PMAX_FP64, false
},
{"max(float4)",
"s:max_f4(bytea)",
"s:max_f4(bytea)",
"s:pmax(float8)",
KAGG_ACTION__PMAX_FP64, false
},
{"max(float8)",
"s:max_f8(bytea)",
"s:max_f8(bytea)",
"s:pmax(float8)",
KAGG_ACTION__PMAX_FP64, false
},
Expand Down Expand Up @@ -194,22 +194,22 @@ static aggfunc_catalog_t aggfunc_catalog_array[] = {
*/
{"sum(int1)",
"s:sum(int8)",
"s:psum(int8)",
"s:psum(int8)",
KAGG_ACTION__PSUM_INT, false
},
{"sum(int2)",
"s:sum(int8)",
"s:psum(int8)",
"s:psum(int8)",
KAGG_ACTION__PSUM_INT, false
},
{"sum(int4)",
"s:sum(int8)",
"s:psum(int8)",
"s:psum(int8)",
KAGG_ACTION__PSUM_INT, false
},
{"sum(int8)",
"c:sum(int8)",
"s:psum(int8)",
"s:sum_num(int8)",
"s:psum(int8)",
KAGG_ACTION__PSUM_INT, false
},
{"sum(float2)",
Expand Down Expand Up @@ -576,7 +576,7 @@ static aggfunc_catalog_t aggfunc_catalog_array[] = {
},
{"regr_count(float8,float8)",
"s:regr_count(bytea)",
"s:pcovar(float8,float8)",
"s:pcovar(float8,float8)",
KAGG_ACTION__COVAR, false
},
{"regr_intercept(float8,float8)",
Expand All @@ -591,7 +591,7 @@ static aggfunc_catalog_t aggfunc_catalog_array[] = {
},
{"regr_slope(float8,float8)",
"s:regr_slope(bytea)",
"s:pcovar(float8,float8)",
"s:pcovar(float8,float8)",
KAGG_ACTION__COVAR, false
},
{"regr_sxx(float8,float8)",
Expand Down Expand Up @@ -790,7 +790,7 @@ aggfunc_catalog_lookup_by_oid(Oid aggfn_oid)
HASHCTL hctl;

memset(&hctl, 0, sizeof(HASHCTL));
hctl.keysize = sizeof(Oid);
hctl.keysize = sizeof(Oid);
hctl.entrysize = sizeof(aggfunc_catalog_entry);
hctl.hcxt = CacheMemoryContext;
aggfunc_catalog_htable = hash_create("XPU GroupBy Catalog Hash",
Expand Down Expand Up @@ -926,7 +926,7 @@ make_expr_typecast(Expr *expr, Oid target_type)
else if (cast->castmethod == COERCION_METHOD_FUNCTION)
{
Assert(OidIsValid(cast->castfunc));
expr = (Expr *)makeFuncExpr(cast->castfunc,
expr = (Expr *)makeFuncExpr(cast->castfunc,
target_type,
list_make1(expr),
InvalidOid, /* always right? */
Expand Down Expand Up @@ -1057,16 +1057,27 @@ make_alternative_aggref(xpugroupby_build_path_context *con, Aggref *aggref)
aggref_alt->aggdirectargs = NIL; /* see sanity checks */
aggref_alt->args = list_make1(makeTargetEntry(partfn, 1, NULL, false));
aggref_alt->aggorder = NIL; /* see sanity check */
aggref_alt->aggdistinct = NIL; /* see sanity check */
aggref_alt->aggfilter = NULL; /* processed in partial-function */
aggref_alt->aggdistinct = NIL; /* see sanity check */
aggref_alt->aggfilter = NULL; /* processed in partial-function */
aggref_alt->aggstar = false;
aggref_alt->aggvariadic = false;
aggref_alt->aggkind = AGGKIND_NORMAL; /* see sanity check */
aggref_alt->agglevelsup = 0;
aggref_alt->aggsplit = AGGSPLIT_SIMPLE;
aggref_alt->aggno = aggref->aggno;
aggref_alt->aggtransno = aggref->aggtransno;
aggref_alt->aggtransno = aggref->aggno;
aggref_alt->location = aggref->location;
/*
* MEMO: nodeAgg.c creates AggStatePerTransData for each aggtransno (that is
* unique ID of transition state in the Agg). This is a kind of optimization
* for the case when multiple aggregate function has identical transition state.
* However, its impact is not large for GpuPreAgg because most of reduction
* works are already executed at the xPU device side.
* So, we simply assign aggref->aggno (unique ID within the Agg node) to
* construct transition state for each alternative aggregate function.
*
* See the issue #614 to reproduce the problem in the future version.
*/

/*
* Update the cost factor
Expand Down Expand Up @@ -1275,7 +1286,7 @@ prepend_partial_groupby_custompath(xpugroupby_build_path_context *con)
else if ((con->xpu_task_flags & DEVKIND__ANY) == DEVKIND__NVIDIA_DPU)
{
xpu_operator_cost = pgstrom_dpu_operator_cost;
xpu_tuple_cost = pgstrom_dpu_tuple_cost;
xpu_tuple_cost = pgstrom_dpu_tuple_cost;
xpu_ratio = pgstrom_dpu_operator_ratio();
}
else
Expand Down Expand Up @@ -1390,7 +1401,7 @@ __xpupreagg_add_custompath(PlannerInfo *root,
con.input_path = input_path;
con.target_upper = root->upper_targets[UPPERREL_GROUP_AGG];
con.target_partial = create_empty_pathtarget();
con.target_final = create_empty_pathtarget();
con.target_final = create_empty_pathtarget();
con.xpu_task_flags = xpu_task_flags;
con.custom_path_methods = custom_path_methods;
extract_input_path_params(input_path,
Expand Down
28 changes: 27 additions & 1 deletion src/pg_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,35 @@ pthreadCondSignal(pthread_cond_t *cond)
/*
* Misc debug functions
*/
INLINE_FUNCTION(void)
dump_tuple_desc(const TupleDesc tdesc)
{
fprintf(stderr, "tupdesc %p { natts=%d, tdtypeid=%u, tdtypmod=%d, tdrefcount=%d }\n",
tdesc,
tdesc->natts,
tdesc->tdtypeid,
tdesc->tdtypmod,
tdesc->tdrefcount);
for (int j=0; j < tdesc->natts; j++)
{
Form_pg_attribute attr = TupleDescAttr(tdesc, j);

fprintf(stderr, "attr[%d] { attname='%s', atttypid=%u, attlen=%d, attnum=%d, atttypmod=%d, attbyval=%c, attalign=%c, attnotnull=%c attisdropped=%c }\n",
j,
NameStr(attr->attname),
attr->atttypid,
(int)attr->attlen,
(int)attr->attnum,
(int)attr->atttypmod,
attr->attbyval ? 't' : 'f',
attr->attalign,
attr->attnotnull ? 't' : 'f',
attr->attisdropped ? 't' : 'f');
}
}

/*
* print_kern_data_store
* dump_kern_data_store
*/
INLINE_FUNCTION(void)
dump_kern_data_store(const kern_data_store *kds)
Expand Down
9 changes: 9 additions & 0 deletions src/sql/pg_strom--5.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2915,6 +2915,15 @@ CREATE AGGREGATE pgstrom.sum(int8)
initcond = 0,
parallel = safe
);
-- bigint --> numeric
CREATE AGGREGATE pgstrom.sum_num(int8)
(
sfunc = pg_catalog.int8_sum,
stype = numeric,
initcond = 0,
parallel = safe
);

-- float8 --> float4
CREATE AGGREGATE pgstrom.sum_f4(float8)
(
Expand Down

0 comments on commit db5d301

Please sign in to comment.