Skip to content

Commit

Permalink
revise CPU fallback infrastructure; always follow custom_scan_tlist l…
Browse files Browse the repository at this point in the history
…ayout

issue reported at heterodb#747
  • Loading branch information
kaigai committed Apr 12, 2024
1 parent f352f5b commit 330bbef
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 285 deletions.
195 changes: 85 additions & 110 deletions src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1277,47 +1277,47 @@ __setupTaskStateRequestBuffer(pgstromTaskState *pts,
*
* CPU fallback process moves the tuple as follows
*
* <Base Relation> (can be both of heap and arrow)
* |
* v
* [Base Slot] ... equivalent to the base relation
* |
* v
* (Base Quals) ... WHERE-clause
* |
* +-----------+
* | |
* | v
* | [Fallback Slot]
* | |
* | v
* | ((GPU-Join)) ... Join for each depth
* | |
* | v
* | [Fallback Slot]
* | |
* v v
* <Base Relation> (can be both of heap and arrow)
* |
* v
* [Fallback Slot]
* |
* v
* (Base Quals) ... WHERE-clause
* |
* v
* (xPU-Join) ... Join for each depth
* |
* v
* [Fallback Slot]
* |
* v
* (Fallback Projection) ... pts->fallback_proj
* |
* v
* [ CustomScan Slot ] ... Equivalent to GpuProj/GpuPreAgg results
*/

/*
* __execInitFallbackProjection
*/
static Node *
__fixup_fallback_projection(Node *node, void *__data)
{
pgstromPlanInfo *pp_info = __data;
List *custom_scan_tlist = __data;
ListCell *lc;

if (!node)
return NULL;
foreach (lc, pp_info->kvars_deflist)
foreach (lc, custom_scan_tlist)
{
codegen_kvar_defitem *kvdef = lfirst(lc);
TargetEntry *tle = lfirst(lc);

if (equal(kvdef->kv_expr, node))
if (tle->resorigtbl != (Oid)UINT_MAX &&
equal(tle->expr, node))
{
return (Node *)makeVar(INDEX_VAR,
kvdef->kv_slot_id + 1,
tle->resno,
exprType(node),
exprTypmod(node),
exprCollation(node),
Expand All @@ -1327,102 +1327,77 @@ __fixup_fallback_projection(Node *node, void *__data)

if (IsA(node, Var))
{
Var *var = (Var *)node;

return (Node *)makeNullConst(var->vartype,
var->vartypmod,
var->varcollid);
return (Node *)makeNullConst(exprType(node),
exprTypmod(node),
exprCollation(node));
}
return expression_tree_mutator(node, __fixup_fallback_projection, pp_info);
return expression_tree_mutator(node, __fixup_fallback_projection, __data);
}

/*
* fixup_fallback_join_inner_keys
* __execInitTaskStateCpuFallback
*/
static void
__execInitTaskStateCpuFallback(pgstromTaskState *pts)
{
pgstromPlanInfo *pp_info = pts->pp_info;
CustomScan *cscan = (CustomScan *)pts->css.ss.ps.plan;
Relation rel = pts->css.ss.ss_currentRelation;
TupleDesc fallback_tdesc;
List *cscan_tlist = NIL;
List *fallback_proj = NIL;
ListCell *lc;
bool compatible = true;

/*
* Init scan-quals for the base relation
* WHERE-clause
*/
pts->base_quals = ExecInitQual(pp_info->scan_quals_fallback,
&pts->css.ss.ps);
pts->base_quals = ExecInitQual(pp_info->scan_quals, &pts->css.ss.ps);
pts->base_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel),
table_slot_callbacks(rel));
if (pts->num_rels == 0)
/*
* CPU-Projection
*/
foreach (lc, cscan->custom_scan_tlist)
{
/*
* GpuScan can bypass fallback_slot, so fallback_proj directly
* transform the base_slot to ss_ScanTupleSlot.
*/
bool meet_junk = false;
TargetEntry *tle = lfirst(lc);
ExprState *state = NULL;

foreach (lc, cscan->custom_scan_tlist)
if (!tle->resjunk && tle->resorigtbl == (Oid)UINT_MAX)
{
TargetEntry *tle = lfirst(lc);

if (tle->resjunk)
meet_junk = true;
else if (meet_junk)
elog(ERROR, "Bug? custom_scan_tlist has valid attribute after junk");
else
cscan_tlist = lappend(cscan_tlist, tle);
Node *expr = __fixup_fallback_projection((Node *)tle->expr,
cscan->custom_scan_tlist);
state = ExecInitExpr((Expr *)expr, &pts->css.ss.ps);
compatible = false;
}
fallback_tdesc = RelationGetDescr(rel);
fallback_proj = lappend(fallback_proj, state);
}
else
{
/*
* CpuJoin runs its multi-level join on the fallback-slot that
* follows the kvars_slot layout.
* Then, it works as a source of fallback_proj.
*/
int nslots = list_length(pp_info->kvars_deflist);
bool meet_junk = false;
if (!compatible)
pts->fallback_proj = fallback_proj;
}

fallback_tdesc = CreateTemplateTupleDesc(nslots);
foreach (lc, pp_info->kvars_deflist)
{
codegen_kvar_defitem *kvdef = lfirst(lc);

TupleDescInitEntry(fallback_tdesc,
kvdef->kv_slot_id + 1,
psprintf("KVAR_%u", kvdef->kv_slot_id),
exprType((Node *)kvdef->kv_expr),
exprTypmod((Node *)kvdef->kv_expr),
0);
}
pts->fallback_slot = MakeSingleTupleTableSlot(fallback_tdesc,
&TTSOpsVirtual);
foreach (lc, cscan->custom_scan_tlist)
{
TargetEntry *tle = lfirst(lc);
/*
* pgstromCreateTaskState
*/
Node *
pgstromCreateTaskState(CustomScan *cscan,
const CustomExecMethods *methods)
{
pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
pgstromTaskState *pts;
int num_rels = list_length(cscan->custom_plans);

pts = palloc0(offsetof(pgstromTaskState, inners[num_rels]));
NodeSetTag(pts, T_CustomScanState);
pts->css.flags = cscan->flags;
pts->css.methods = methods;
#if PG_VERSION_NUM >= 160000
pts->css.slotOps = &TTSOpsHeapTuple;
#endif
pts->xpu_task_flags = pp_info->xpu_task_flags;
pts->pp_info = pp_info;
Assert(pp_info->num_rels == num_rels);
pts->num_rels = num_rels;

if (tle->resjunk)
meet_junk = true;
else if (meet_junk)
elog(ERROR, "Bug? custom_scan_tlist has valid attribute after junk");
else
{
TargetEntry *__tle = (TargetEntry *)
__fixup_fallback_projection((Node *)tle, pp_info);
cscan_tlist = lappend(cscan_tlist, __tle);
}
}
}
pts->fallback_proj =
ExecBuildProjectionInfo(cscan_tlist,
pts->css.ss.ps.ps_ExprContext,
pts->css.ss.ss_ScanTupleSlot,
&pts->css.ss.ps,
fallback_tdesc);
return (Node *)pts;
}

/*
Expand Down Expand Up @@ -1479,7 +1454,7 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags)
else if (RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE)
{
if (!pgstromArrowFdwExecInit(pts,
pp_info->scan_quals_fallback,
pp_info->scan_quals,
pp_info->outer_refs))
elog(ERROR, "Bug? only arrow_fdw is supported in PG-Strom");
}
Expand All @@ -1489,20 +1464,19 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags)
RelationGetRelationName(rel));
}

/* TupleDesc according to GpuProjection */
tupdesc_dst = pts->css.ss.ps.scandesc;
#if PG_VERSION_NUM < 160000
/*
* Re-initialization of scan tuple-descriptor and projection-info,
* because commit 1a8a4e5cde2b7755e11bde2ea7897bd650622d3e of
* PostgreSQL makes to assign result of ExecTypeFromTL() instead
* of ExecCleanTypeFromTL; that leads incorrect projection.
* So, we try to remove junk attributes from the scan-descriptor.
*
* And, device projection returns a tuple in heap-format, so we
* prefer TTSOpsHeapTuple, instead of the TTSOpsVirtual.
* PG16 adds CustomScanState::slotOps to initialize scan-tuple-slot
* with the specified tuple-slot-ops.
* GPU projection returns tuples in heap-format, so we prefer
* TTSOpsHeapTuple, instead of the TTSOpsVirtual.
*/
tupdesc_dst = ExecCleanTypeFromTL(cscan->custom_scan_tlist);
ExecInitScanTupleSlot(estate, &pts->css.ss, tupdesc_dst,
&TTSOpsHeapTuple);
ExecAssignScanProjectionInfoWithVarno(&pts->css.ss, INDEX_VAR);
#endif

/*
* Initialize the CPU Fallback stuff
Expand Down Expand Up @@ -2227,7 +2201,8 @@ pgstromGpuDirectExplain(pgstromTaskState *pts,
count);
appendStringInfo(&buf, ")");
}
ExplainPropertyText("GPU-Direct SQL", buf.data, es);
if (!pgstrom_regression_test_mode)
ExplainPropertyText("GPU-Direct SQL", buf.data, es);

pfree(buf.data);
}
Expand Down Expand Up @@ -2285,9 +2260,9 @@ pgstromExplainTaskState(CustomScanState *node,
/* xPU Scan Quals */
if (ps_state)
stat_ntuples = pg_atomic_read_u64(&ps_state->source_ntuples_in);
if (pp_info->scan_quals_explain)
if (pp_info->scan_quals)
{
List *scan_quals = pp_info->scan_quals_explain;
List *scan_quals = pp_info->scan_quals;
Expr *expr;

resetStringInfo(&buf);
Expand Down
Loading

0 comments on commit 330bbef

Please sign in to comment.