diff --git a/src/executor.c b/src/executor.c
index 9112b194..4ac9c0e0 100644
--- a/src/executor.c
+++ b/src/executor.c
@@ -1277,47 +1277,47 @@ __setupTaskStateRequestBuffer(pgstromTaskState *pts,
*
* CPU fallback process moves the tuple as follows
*
- * (can be both of heap and arrow)
- * |
- * v
- * [Base Slot] ... equivalent to the base relation
- * |
- * v
- * (Base Quals) ... WHERE-clause
- * |
- * +-----------+
- * | |
- * | v
- * | [Fallback Slot]
- * | |
- * | v
- * | ((GPU-Join)) ... Join for each depth
- * | |
- * | v
- * | [Fallback Slot]
- * | |
- * v v
+ * (can be both of heap and arrow)
+ * |
+ * v
+ * [Fallback Slot]
+ * |
+ * v
+ * (Base Quals) ... WHERE-clause
+ * |
+ * v
+ * (xPU-Join) ... Join for each depth
+ * |
+ * v
+ * [Fallback Slot]
+ * |
+ * v
* (Fallback Projection) ... pts->fallback_proj
* |
* v
* [ CustomScan Slot ] ... Equivalent to GpuProj/GpuPreAgg results
*/
+
+/*
+ * __execInitFallbackProjection
+ */
static Node *
__fixup_fallback_projection(Node *node, void *__data)
{
- pgstromPlanInfo *pp_info = __data;
+ List *custom_scan_tlist = __data;
ListCell *lc;
if (!node)
return NULL;
- foreach (lc, pp_info->kvars_deflist)
+ foreach (lc, custom_scan_tlist)
{
- codegen_kvar_defitem *kvdef = lfirst(lc);
+ TargetEntry *tle = lfirst(lc);
- if (equal(kvdef->kv_expr, node))
+ if (tle->resorigtbl != (Oid)UINT_MAX &&
+ equal(tle->expr, node))
{
return (Node *)makeVar(INDEX_VAR,
- kvdef->kv_slot_id + 1,
+ tle->resno,
exprType(node),
exprTypmod(node),
exprCollation(node),
@@ -1327,17 +1327,15 @@ __fixup_fallback_projection(Node *node, void *__data)
if (IsA(node, Var))
{
- Var *var = (Var *)node;
-
- return (Node *)makeNullConst(var->vartype,
- var->vartypmod,
- var->varcollid);
+ return (Node *)makeNullConst(exprType(node),
+ exprTypmod(node),
+ exprCollation(node));
}
- return expression_tree_mutator(node, __fixup_fallback_projection, pp_info);
+ return expression_tree_mutator(node, __fixup_fallback_projection, __data);
}
/*
- * fixup_fallback_join_inner_keys
+ * __execInitTaskStateCpuFallback
*/
static void
__execInitTaskStateCpuFallback(pgstromTaskState *pts)
@@ -1345,84 +1343,61 @@ __execInitTaskStateCpuFallback(pgstromTaskState *pts)
pgstromPlanInfo *pp_info = pts->pp_info;
CustomScan *cscan = (CustomScan *)pts->css.ss.ps.plan;
Relation rel = pts->css.ss.ss_currentRelation;
- TupleDesc fallback_tdesc;
- List *cscan_tlist = NIL;
+ List *fallback_proj = NIL;
ListCell *lc;
+ bool compatible = true;
/*
- * Init scan-quals for the base relation
+ * WHERE-clause
*/
- pts->base_quals = ExecInitQual(pp_info->scan_quals_fallback,
- &pts->css.ss.ps);
+ pts->base_quals = ExecInitQual(pp_info->scan_quals, &pts->css.ss.ps);
pts->base_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel),
table_slot_callbacks(rel));
- if (pts->num_rels == 0)
+ /*
+ * CPU-Projection
+ */
+ foreach (lc, cscan->custom_scan_tlist)
{
- /*
- * GpuScan can bypass fallback_slot, so fallback_proj directly
- * transform the base_slot to ss_ScanTupleSlot.
- */
- bool meet_junk = false;
+ TargetEntry *tle = lfirst(lc);
+ ExprState *state = NULL;
- foreach (lc, cscan->custom_scan_tlist)
+ if (!tle->resjunk && tle->resorigtbl == (Oid)UINT_MAX)
{
- TargetEntry *tle = lfirst(lc);
-
- if (tle->resjunk)
- meet_junk = true;
- else if (meet_junk)
- elog(ERROR, "Bug? custom_scan_tlist has valid attribute after junk");
- else
- cscan_tlist = lappend(cscan_tlist, tle);
+ Node *expr = __fixup_fallback_projection((Node *)tle->expr,
+ cscan->custom_scan_tlist);
+ state = ExecInitExpr((Expr *)expr, &pts->css.ss.ps);
+ compatible = false;
}
- fallback_tdesc = RelationGetDescr(rel);
+ fallback_proj = lappend(fallback_proj, state);
}
- else
- {
- /*
- * CpuJoin runs its multi-level join on the fallback-slot that
- * follows the kvars_slot layout.
- * Then, it works as a source of fallback_proj.
- */
- int nslots = list_length(pp_info->kvars_deflist);
- bool meet_junk = false;
+ if (!compatible)
+ pts->fallback_proj = fallback_proj;
+}
- fallback_tdesc = CreateTemplateTupleDesc(nslots);
- foreach (lc, pp_info->kvars_deflist)
- {
- codegen_kvar_defitem *kvdef = lfirst(lc);
-
- TupleDescInitEntry(fallback_tdesc,
- kvdef->kv_slot_id + 1,
- psprintf("KVAR_%u", kvdef->kv_slot_id),
- exprType((Node *)kvdef->kv_expr),
- exprTypmod((Node *)kvdef->kv_expr),
- 0);
- }
- pts->fallback_slot = MakeSingleTupleTableSlot(fallback_tdesc,
- &TTSOpsVirtual);
- foreach (lc, cscan->custom_scan_tlist)
- {
- TargetEntry *tle = lfirst(lc);
+/*
+ * pgstromCreateTaskState
+ */
+Node *
+pgstromCreateTaskState(CustomScan *cscan,
+ const CustomExecMethods *methods)
+{
+ pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
+ pgstromTaskState *pts;
+ int num_rels = list_length(cscan->custom_plans);
+
+ pts = palloc0(offsetof(pgstromTaskState, inners[num_rels]));
+ NodeSetTag(pts, T_CustomScanState);
+ pts->css.flags = cscan->flags;
+ pts->css.methods = methods;
+#if PG_VERSION_NUM >= 160000
+ pts->css.slotOps = &TTSOpsHeapTuple;
+#endif
+ pts->xpu_task_flags = pp_info->xpu_task_flags;
+ pts->pp_info = pp_info;
+ Assert(pp_info->num_rels == num_rels);
+ pts->num_rels = num_rels;
- if (tle->resjunk)
- meet_junk = true;
- else if (meet_junk)
- elog(ERROR, "Bug? custom_scan_tlist has valid attribute after junk");
- else
- {
- TargetEntry *__tle = (TargetEntry *)
- __fixup_fallback_projection((Node *)tle, pp_info);
- cscan_tlist = lappend(cscan_tlist, __tle);
- }
- }
- }
- pts->fallback_proj =
- ExecBuildProjectionInfo(cscan_tlist,
- pts->css.ss.ps.ps_ExprContext,
- pts->css.ss.ss_ScanTupleSlot,
- &pts->css.ss.ps,
- fallback_tdesc);
+ return (Node *)pts;
}
/*
@@ -1479,7 +1454,7 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags)
else if (RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE)
{
if (!pgstromArrowFdwExecInit(pts,
- pp_info->scan_quals_fallback,
+ pp_info->scan_quals,
pp_info->outer_refs))
elog(ERROR, "Bug? only arrow_fdw is supported in PG-Strom");
}
@@ -1489,20 +1464,19 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags)
RelationGetRelationName(rel));
}
+ /* TupleDesc according to GpuProjection */
+ tupdesc_dst = pts->css.ss.ps.scandesc;
+#if PG_VERSION_NUM < 160000
/*
- * Re-initialization of scan tuple-descriptor and projection-info,
- * because commit 1a8a4e5cde2b7755e11bde2ea7897bd650622d3e of
- * PostgreSQL makes to assign result of ExecTypeFromTL() instead
- * of ExecCleanTypeFromTL; that leads incorrect projection.
- * So, we try to remove junk attributes from the scan-descriptor.
- *
- * And, device projection returns a tuple in heap-format, so we
- * prefer TTSOpsHeapTuple, instead of the TTSOpsVirtual.
+ * PG16 adds CustomScanState::slotOps to initialize scan-tuple-slot
+ * with the specified tuple-slot-ops.
+ * GPU projection returns tuples in heap-format, so we prefer
+ * TTSOpsHeapTuple, instead of the TTSOpsVirtual.
*/
- tupdesc_dst = ExecCleanTypeFromTL(cscan->custom_scan_tlist);
ExecInitScanTupleSlot(estate, &pts->css.ss, tupdesc_dst,
&TTSOpsHeapTuple);
ExecAssignScanProjectionInfoWithVarno(&pts->css.ss, INDEX_VAR);
+#endif
/*
* Initialize the CPU Fallback stuff
@@ -2227,7 +2201,8 @@ pgstromGpuDirectExplain(pgstromTaskState *pts,
count);
appendStringInfo(&buf, ")");
}
- ExplainPropertyText("GPU-Direct SQL", buf.data, es);
+ if (!pgstrom_regression_test_mode)
+ ExplainPropertyText("GPU-Direct SQL", buf.data, es);
pfree(buf.data);
}
@@ -2285,9 +2260,9 @@ pgstromExplainTaskState(CustomScanState *node,
/* xPU Scan Quals */
if (ps_state)
stat_ntuples = pg_atomic_read_u64(&ps_state->source_ntuples_in);
- if (pp_info->scan_quals_explain)
+ if (pp_info->scan_quals)
{
- List *scan_quals = pp_info->scan_quals_explain;
+ List *scan_quals = pp_info->scan_quals;
Expr *expr;
resetStringInfo(&buf);
diff --git a/src/gpu_join.c b/src/gpu_join.c
index 3be0052f..73595139 100644
--- a/src/gpu_join.c
+++ b/src/gpu_join.c
@@ -1559,11 +1559,11 @@ PlanXpuJoinPathCommon(PlannerInfo *root,
context = create_codegen_context(root, cpath, pp_info);
/* codegen for outer scan, if any */
- if (pp_info->scan_quals_fallback)
+ if (pp_info->scan_quals)
{
pp_info->kexp_scan_quals
- = codegen_build_scan_quals(context, pp_info->scan_quals_fallback);
- pull_varattnos((Node *)pp_info->scan_quals_fallback,
+ = codegen_build_scan_quals(context, pp_info->scan_quals);
+ pull_varattnos((Node *)pp_info->scan_quals,
pp_info->scan_relid,
&outer_refs);
}
@@ -1694,10 +1694,10 @@ PlanXpuJoinPathCommon(PlannerInfo *root,
cscan->scan.plan.targetlist = tlist;
cscan->scan.scanrelid = pp_info->scan_relid;
cscan->flags = cpath->flags;
- cscan->custom_plans = custom_plans;
- cscan->custom_scan_tlist = context->tlist_dev;
cscan->methods = xpujoin_plan_methods;
-
+ cscan->custom_plans = custom_plans;
+ cscan->custom_scan_tlist = assign_custom_cscan_tlist(context->tlist_dev,
+ pp_info);
return cscan;
}
@@ -1764,22 +1764,8 @@ PlanDpuJoinPath(PlannerInfo *root,
static Node *
CreateGpuJoinState(CustomScan *cscan)
{
- pgstromTaskState *pts;
- pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
- int num_rels = list_length(cscan->custom_plans);
-
Assert(cscan->methods == &gpujoin_plan_methods);
- pts = palloc0(offsetof(pgstromTaskState, inners[num_rels]));
- NodeSetTag(pts, T_CustomScanState);
- pts->css.flags = cscan->flags;
- pts->css.methods = &gpujoin_exec_methods;
- pts->xpu_task_flags = pp_info->xpu_task_flags;
- pts->pp_info = pp_info;
- Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__GPUJOIN &&
- pp_info->num_rels == num_rels);
- pts->num_rels = num_rels;
-
- return (Node *)pts;
+ return pgstromCreateTaskState(cscan, &gpujoin_exec_methods);
}
/*
@@ -1788,22 +1774,8 @@ CreateGpuJoinState(CustomScan *cscan)
static Node *
CreateDpuJoinState(CustomScan *cscan)
{
- pgstromTaskState *pts;
- pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
- int num_rels = list_length(cscan->custom_plans);
-
Assert(cscan->methods == &dpujoin_plan_methods);
- pts = palloc0(offsetof(pgstromTaskState, inners[num_rels]));
- NodeSetTag(pts, T_CustomScanState);
- pts->css.flags = cscan->flags;
- pts->css.methods = &dpujoin_exec_methods;
- pts->xpu_task_flags = pp_info->xpu_task_flags;
- pts->pp_info = pp_info;
- Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__DPUJOIN &&
- pp_info->num_rels == num_rels);
- pts->num_rels = num_rels;
-
- return (Node *)pts;
+ return pgstromCreateTaskState(cscan, &dpujoin_exec_methods);
}
/* ---------------------------------------------------------------- *
@@ -2715,38 +2687,64 @@ __execFallbackCpuHashJoin(pgstromTaskState *pts,
}
static void
-__execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth)
+__execFallbackCpuProjection(pgstromTaskState *pts)
{
- kern_multirels *h_kmrels = pts->h_kmrels;
- kern_data_store *kds_in;
- bool *oj_map;
+ ExprContext *econtext = pts->css.ss.ps.ps_ExprContext;
+ TupleTableSlot *fallback_slot = pts->fallback_slot;
+ ListCell *lc;
+ int j=0;
- if (depth > h_kmrels->num_rels)
+ foreach (lc, pts->fallback_proj)
+ {
+ ExprState *state = lfirst(lc);
+ Datum datum;
+ bool isnull;
+
+ if (state)
+ {
+ datum = ExecEvalExpr(state, econtext, &isnull);
+
+ if (isnull)
+ {
+ fallback_slot->tts_isnull[j] = true;
+ fallback_slot->tts_values[j] = 0;
+ }
+ else
+ {
+ fallback_slot->tts_isnull[j] = false;
+ fallback_slot->tts_values[j] = datum;
+ }
+ }
+ j++;
+ }
+}
+
+static void
+__execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth)
+{
+ if (depth > pts->num_rels)
{
/* apply projection if any */
HeapTuple tuple;
bool should_free;
if (pts->fallback_proj)
- {
- TupleTableSlot *proj_slot = ExecProject(pts->fallback_proj);
-
- tuple = ExecFetchSlotHeapTuple(proj_slot, false, &should_free);
- }
- else
- {
- tuple = ExecFetchSlotHeapTuple(pts->fallback_slot, false, &should_free);
- }
- /* save the tuple on the fallback buffer */
+ __execFallbackCpuProjection(pts);
+ tuple = ExecFetchSlotHeapTuple(pts->fallback_slot,
+ false,
+ &should_free);
pgstromStoreFallbackTuple(pts, tuple);
if (should_free)
pfree(tuple);
}
else
{
+ kern_multirels *h_kmrels = pts->h_kmrels;
+ kern_data_store *kds_in;
+ bool *oj_map;
+
kds_in = KERN_MULTIRELS_INNER_KDS(h_kmrels, depth-1);
oj_map = KERN_MULTIRELS_OUTER_JOIN_MAP(h_kmrels, depth-1);
-
if (h_kmrels->chunks[depth-1].is_nestloop)
{
__execFallbackCpuNestLoop(pts, kds_in, oj_map, depth);
@@ -2768,40 +2766,10 @@ ExecFallbackCpuJoin(pgstromTaskState *pts, HeapTuple tuple)
size_t fallback_index_saved = pts->fallback_index;
ListCell *lc;
- ExecForceStoreHeapTuple(tuple, base_slot, false);
- econtext->ecxt_scantuple = base_slot;
- /* check WHERE-clause if any */
- if (pts->base_quals)
- {
- ResetExprContext(econtext);
- if (!ExecQual(pts->base_quals, econtext))
- return 0;
- }
-
- /*
- * Shortcut, if GpuJoin is not involved. (GpuScan or GpuPreAgg + GpuScan).
- * This case does not have fallback_slot, and the fallback_proj directly
- * transforms the base-tuple to the ss_ScanTupleSlot.
- */
- if (pts->num_rels == 0)
- {
- TupleTableSlot *proj_slot;
- HeapTuple proj_htup;
- bool should_free;
-
- Assert(pts->fallback_slot == 0);
- proj_slot = ExecProject(pts->fallback_proj);
- proj_htup = ExecFetchSlotHeapTuple(proj_slot, false, &should_free);
- pgstromStoreFallbackTuple(pts, proj_htup);
- if (should_free)
- pfree(proj_htup);
- return 1;
- }
-
/* Load the base tuple (depth-0) to the fallback slot */
+ ExecForceStoreHeapTuple(tuple, base_slot, false);
slot_getallattrs(base_slot);
- Assert(fallback_slot != NULL);
- ExecStoreAllNullTuple(fallback_slot);
+ ExecStoreAllNullTuple(fallback_slot);
foreach (lc, pp_info->kvars_deflist)
{
codegen_kvar_defitem *kvdef = lfirst(lc);
@@ -2810,16 +2778,23 @@ ExecFallbackCpuJoin(pgstromTaskState *pts, HeapTuple tuple)
kvdef->kv_resno >= 1 &&
kvdef->kv_resno <= base_slot->tts_nvalid)
{
- int dst = kvdef->kv_slot_id;
int src = kvdef->kv_resno - 1;
+ int dst = kvdef->kv_slot_id;
fallback_slot->tts_isnull[dst] = base_slot->tts_isnull[src];
fallback_slot->tts_values[dst] = base_slot->tts_values[src];
}
}
econtext->ecxt_scantuple = fallback_slot;
+
+ /* check WHERE-clause if any */
+ if (pts->base_quals)
+ {
+ ResetExprContext(econtext);
+ if (!ExecQual(pts->base_quals, econtext))
+ return 0;
+ }
/* Run JOIN, if any */
- Assert(pts->h_kmrels);
__execFallbackCpuJoinOneDepth(pts, 1);
return (pts->fallback_index - fallback_index_saved > 0);
}
diff --git a/src/gpu_preagg.c b/src/gpu_preagg.c
index 46de5c49..041adc1b 100644
--- a/src/gpu_preagg.c
+++ b/src/gpu_preagg.c
@@ -1789,22 +1789,8 @@ PlanDpuPreAggPath(PlannerInfo *root,
static Node *
CreateGpuPreAggScanState(CustomScan *cscan)
{
- pgstromTaskState *pts;
- pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
- int num_rels = list_length(cscan->custom_plans);
-
Assert(cscan->methods == &gpupreagg_plan_methods);
- pts = palloc0(offsetof(pgstromTaskState, inners[num_rels]));
- NodeSetTag(pts, T_CustomScanState);
- pts->css.flags = cscan->flags;
- pts->css.methods = &gpupreagg_exec_methods;
- pts->xpu_task_flags = pp_info->xpu_task_flags;
- pts->pp_info = pp_info;
- Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__GPUPREAGG &&
- pp_info->num_rels == num_rels);
- pts->num_rels = num_rels;
-
- return (Node *)pts;
+ return pgstromCreateTaskState(cscan, &gpupreagg_exec_methods);
}
/*
@@ -1813,22 +1799,8 @@ CreateGpuPreAggScanState(CustomScan *cscan)
static Node *
CreateDpuPreAggScanState(CustomScan *cscan)
{
- pgstromTaskState *pts;
- pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
- int num_rels = list_length(cscan->custom_plans);
-
Assert(cscan->methods == &dpupreagg_plan_methods);
- pts = palloc0(offsetof(pgstromTaskState, inners[num_rels]));
- NodeSetTag(pts, T_CustomScanState);
- pts->css.flags = cscan->flags;
- pts->css.methods = &dpupreagg_exec_methods;
- pts->xpu_task_flags = pp_info->xpu_task_flags;
- pts->pp_info = pp_info;
- Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__DPUPREAGG &&
- pp_info->num_rels == num_rels);
- pts->num_rels = num_rels;
-
- return (Node *)pts;
+ return pgstromCreateTaskState(cscan, &dpupreagg_exec_methods);
}
/*
diff --git a/src/gpu_scan.c b/src/gpu_scan.c
index 30d28d06..f70344dc 100644
--- a/src/gpu_scan.c
+++ b/src/gpu_scan.c
@@ -260,8 +260,7 @@ __buildSimpleScanPlanInfo(PlannerInfo *root,
pp_info->ds_entry = ds_entry;
pp_info->scan_relid = baserel->relid;
pp_info->host_quals = extract_actual_clauses(host_quals, false);
- pp_info->scan_quals_fallback = extract_actual_clauses(dev_quals, false);
- pp_info->scan_quals_explain = copyObject(pp_info->scan_quals_fallback);
+ pp_info->scan_quals = extract_actual_clauses(dev_quals, false);
pp_info->scan_tuples = baserel->tuples;
pp_info->scan_nrows = scan_nrows;
pp_info->parallel_nworkers = parallel_nworkers;
@@ -278,7 +277,7 @@ __buildSimpleScanPlanInfo(PlannerInfo *root,
outer_refs = pickup_outer_referenced(root, baserel, outer_refs);
pull_varattnos((Node *)pp_info->host_quals,
baserel->relid, &outer_refs);
- pull_varattnos((Node *)pp_info->scan_quals_fallback,
+ pull_varattnos((Node *)pp_info->scan_quals,
baserel->relid, &outer_refs);
pp_info->outer_refs = outer_refs;
pp_info->sibling_param_id = -1;
@@ -416,7 +415,7 @@ try_add_simple_scan_path(PlannerInfo *root,
{
pgstromPlanInfo *pp_info = op_leaf->pp_info;
- if (pp_info->scan_quals_fallback != NIL)
+ if (pp_info->scan_quals != NIL)
{
CustomPath *cpath = makeNode(CustomPath);
@@ -706,7 +705,7 @@ gpuscan_build_projection(RelOptInfo *baserel,
baserel->relid,
false);
- vars_list = pull_vars_of_level((Node *)pp_info->scan_quals_fallback, 0);
+ vars_list = pull_vars_of_level((Node *)pp_info->scan_quals, 0);
foreach (lc, vars_list)
tlist_dev = __gpuscan_build_projection_expr(tlist_dev,
(Node *)lfirst(lc),
@@ -775,7 +774,82 @@ __build_explain_tlist_junks(PlannerInfo *root,
}
/*
- * PlanXpuScanPathCommon
+ * __assign_cpu_fallback_slots
+ */
+void
+__assign_cpu_fallback_slots(List *kvars_deflist, List *custom_scan_tlist)
+{
+ ListCell *lc1, *lc2;
+
+ foreach (lc1, kvars_deflist)
+ {
+ codegen_kvar_defitem *kvdef = lfirst(lc1);
+ int kv_fallback = -1;
+
+ if (kvdef->kv_depth >= 0 &&
+ kvdef->kv_resno > 0)
+ {
+ foreach (lc2, custom_scan_tlist)
+ {
+ TargetEntry *tle = lfirst(lc2);
+
+ if (equal(tle->expr, kvdef->kv_expr))
+ {
+ kv_fallback = tle->resno - 1;
+ break;
+ }
+ }
+ }
+ kvdef->kv_fallback = kv_fallback;
+ }
+}
+
+/*
+ * assign_custom_cscan_tlist
+ */
+List *
+assign_custom_cscan_tlist(List *tlist_dev, pgstromPlanInfo *pp_info)
+{
+ ListCell *lc1, *lc2;
+
+ /* clear kv_fallback */
+ foreach (lc1, pp_info->kvars_deflist)
+ {
+ codegen_kvar_defitem *kvdef = lfirst(lc1);
+
+ kvdef->kv_fallback = -1;
+ }
+
+ foreach (lc1, tlist_dev)
+ {
+ TargetEntry *tle = lfirst(lc1);
+
+ foreach (lc2, pp_info->kvars_deflist)
+ {
+ codegen_kvar_defitem *kvdef = lfirst(lc2);
+
+ if (kvdef->kv_depth >= 0 &&
+ kvdef->kv_depth <= pp_info->num_rels &&
+ kvdef->kv_resno > 0 &&
+ equal(tle->expr, kvdef->kv_expr))
+ {
+ kvdef->kv_fallback = tle->resno - 1;
+ tle->resorigtbl = (Oid)kvdef->kv_depth;
+ tle->resorigcol = kvdef->kv_resno;
+ break;
+ }
+ }
+ if (!lc2)
+ {
+ tle->resorigtbl = (Oid)UINT_MAX;
+ tle->resorigcol = -1;
+ }
+ }
+ return tlist_dev;
+}
+
+/*
+ * planxpuscanpathcommon
*/
static CustomScan *
PlanXpuScanPathCommon(PlannerInfo *root,
@@ -791,8 +865,7 @@ PlanXpuScanPathCommon(PlannerInfo *root,
context = create_codegen_context(root, best_path, pp_info);
/* code generation for WHERE-clause */
- pp_info->kexp_scan_quals = codegen_build_scan_quals(context,
- pp_info->scan_quals_fallback);
+ pp_info->kexp_scan_quals = codegen_build_scan_quals(context, pp_info->scan_quals);
/* code generation for the Projection */
context->tlist_dev = gpuscan_build_projection(baserel, pp_info, tlist);
pp_info->kexp_projection = codegen_build_projection(context);
@@ -820,8 +893,8 @@ PlanXpuScanPathCommon(PlannerInfo *root,
cscan->flags = best_path->flags;
cscan->methods = xpuscan_plan_methods;
cscan->custom_plans = NIL;
- cscan->custom_scan_tlist = context->tlist_dev;
-
+ cscan->custom_scan_tlist = assign_custom_cscan_tlist(context->tlist_dev,
+ pp_info);
return cscan;
}
@@ -890,19 +963,8 @@ PlanDpuScanPath(PlannerInfo *root,
static Node *
CreateGpuScanState(CustomScan *cscan)
{
- pgstromTaskState *pts = palloc0(sizeof(pgstromTaskState));
- pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
-
Assert(cscan->methods == &gpuscan_plan_methods);
- /* Set tag and executor callbacks */
- NodeSetTag(pts, T_CustomScanState);
- pts->css.flags = cscan->flags;
- pts->css.methods = &gpuscan_exec_methods;
- pts->xpu_task_flags = pp_info->xpu_task_flags;
- pts->pp_info = pp_info;
- Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__GPUSCAN);
-
- return (Node *)pts;
+ return pgstromCreateTaskState(cscan, &gpuscan_exec_methods);
}
/*
@@ -911,18 +973,8 @@ CreateGpuScanState(CustomScan *cscan)
static Node *
CreateDpuScanState(CustomScan *cscan)
{
- pgstromTaskState *pts = palloc0(sizeof(pgstromTaskState));
- pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan);
-
Assert(cscan->methods == &dpuscan_plan_methods);
- NodeSetTag(pts, T_CustomScanState);
- pts->css.flags = cscan->flags;
- pts->css.methods = &dpuscan_exec_methods;
- pts->xpu_task_flags = pp_info->xpu_task_flags;
- pts->pp_info = pp_info;
- Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__DPUSCAN);
-
- return (Node *)pts;
+ return pgstromCreateTaskState(cscan, &dpuscan_exec_methods);
}
/*
@@ -931,11 +983,35 @@ CreateDpuScanState(CustomScan *cscan)
bool
ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple)
{
- ExprContext *econtext = pts->css.ss.ps.ps_ExprContext;
- bool should_free;
+ pgstromPlanInfo *pp_info = pts->pp_info;
+ ExprContext *econtext = pts->css.ss.ps.ps_ExprContext;
+ TupleTableSlot *base_slot = pts->base_slot;
+ TupleTableSlot *fallback_slot = pts->css.ss.ss_ScanTupleSlot;
+ ListCell *lc;
+ int attidx = 0;
+ bool should_free;
+
+ /* Load the base tuple (depth-0) to the fallback slot */
+ ExecForceStoreHeapTuple(tuple, base_slot, false);
+ slot_getallattrs(base_slot);
+ ExecStoreAllNullTuple(fallback_slot);
+ foreach (lc, pp_info->kvars_deflist)
+ {
+ codegen_kvar_defitem *kvdef = lfirst(lc);
- ExecForceStoreHeapTuple(tuple, pts->base_slot, false);
- econtext->ecxt_scantuple = pts->base_slot;
+ if (kvdef->kv_depth == 0 &&
+ kvdef->kv_resno >= 1 &&
+ kvdef->kv_resno <= base_slot->tts_nvalid &&
+ kvdef->kv_fallback >= 0)
+ {
+ int src = kvdef->kv_resno - 1;
+ int dst = kvdef->kv_fallback;
+
+ fallback_slot->tts_isnull[dst] = base_slot->tts_isnull[src];
+ fallback_slot->tts_values[dst] = base_slot->tts_values[src];
+ }
+ }
+ econtext->ecxt_scantuple = fallback_slot;
/* check WHERE-clause if any */
if (pts->base_quals)
@@ -944,20 +1020,33 @@ ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple)
if (!ExecQual(pts->base_quals, econtext))
return false;
}
- Assert(!pts->fallback_slot);
-
- /* apply Projection if any */
- if (pts->fallback_proj)
+ /* apply GPU-Projection */
+ foreach (lc, pts->fallback_proj)
{
- TupleTableSlot *proj_slot = ExecProject(pts->fallback_proj);
+ ExprState *state = lfirst(lc);
+ Datum datum;
+ bool isnull;
- tuple = ExecFetchSlotHeapTuple(proj_slot, false, &should_free);
- }
- else
- {
- tuple = ExecFetchSlotHeapTuple(pts->base_slot, false, &should_free);
+ if (state)
+ {
+ datum = ExecEvalExpr(state, econtext, &isnull);
+ if (isnull)
+ {
+ fallback_slot->tts_isnull[attidx] = true;
+ fallback_slot->tts_values[attidx] = 0;
+ }
+ else
+ {
+ fallback_slot->tts_isnull[attidx] = false;
+ fallback_slot->tts_values[attidx] = datum;
+ }
+ }
+ attidx++;
}
/* save the tuple on the fallback buffer */
+ tuple = ExecFetchSlotHeapTuple(fallback_slot,
+ false,
+ &should_free);
pgstromStoreFallbackTuple(pts, tuple);
if (should_free)
pfree(tuple);
diff --git a/src/misc.c b/src/misc.c
index c5541d54..4a96f711 100644
--- a/src/misc.c
+++ b/src/misc.c
@@ -42,6 +42,7 @@ __form_codegen_kvar_defitem(codegen_kvar_defitem *kvdef,
__kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_typlen));
__kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_xdatum_sizeof));
__kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_kvec_sizeof));
+ __kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_fallback));
__kv_exprs = lappend(__kv_exprs, kvdef->kv_expr);
foreach (lc, kvdef->kv_subfields)
{
@@ -83,6 +84,7 @@ __deform_codegen_kvar_defitem(List *__kv_privs, List *__kv_exprs)
kvdef->kv_typlen = intVal(list_nth(__kv_privs, pindex++));
kvdef->kv_xdatum_sizeof = intVal(list_nth(__kv_privs, pindex++));
kvdef->kv_kvec_sizeof = intVal(list_nth(__kv_privs, pindex++));
+ kvdef->kv_fallback = intVal(list_nth(__kv_privs, pindex++));
kvdef->kv_expr = list_nth(__kv_exprs, eindex++);
__sub_privs = list_nth(__kv_privs, pindex++);
__sub_exprs = list_nth(__kv_exprs, eindex++);
@@ -118,8 +120,7 @@ form_pgstrom_plan_info(CustomScan *cscan, pgstromPlanInfo *pp_info)
exprs = lappend(exprs, pp_info->used_params);
privs = lappend(privs, pp_info->host_quals);
privs = lappend(privs, makeInteger(pp_info->scan_relid));
- privs = lappend(privs, pp_info->scan_quals_fallback);
- exprs = lappend(exprs, pp_info->scan_quals_explain);
+ exprs = lappend(exprs, pp_info->scan_quals);
privs = lappend(privs, __makeFloat(pp_info->scan_tuples));
privs = lappend(privs, __makeFloat(pp_info->scan_nrows));
privs = lappend(privs, makeInteger(pp_info->parallel_nworkers));
@@ -230,8 +231,7 @@ deform_pgstrom_plan_info(CustomScan *cscan)
pp_data.used_params = list_nth(exprs, eindex++);
pp_data.host_quals = list_nth(privs, pindex++);
pp_data.scan_relid = intVal(list_nth(privs, pindex++));
- pp_data.scan_quals_fallback = list_nth(privs, pindex++);
- pp_data.scan_quals_explain = list_nth(exprs, eindex++);
+ pp_data.scan_quals = list_nth(exprs, eindex++);
pp_data.scan_tuples = floatVal(list_nth(privs, pindex++));
pp_data.scan_nrows = floatVal(list_nth(privs, pindex++));
pp_data.parallel_nworkers = intVal(list_nth(privs, pindex++));
@@ -330,8 +330,7 @@ copy_pgstrom_plan_info(const pgstromPlanInfo *pp_orig)
inners[pp_orig->num_rels]));
pp_dest->used_params = list_copy(pp_dest->used_params);
pp_dest->host_quals = copyObject(pp_dest->host_quals);
- pp_dest->scan_quals_fallback = copyObject(pp_dest->scan_quals_fallback);
- pp_dest->scan_quals_explain = copyObject(pp_dest->scan_quals_explain);
+ pp_dest->scan_quals = copyObject(pp_dest->scan_quals);
pp_dest->brin_index_conds = copyObject(pp_dest->brin_index_conds);
pp_dest->brin_index_quals = copyObject(pp_dest->brin_index_quals);
foreach (lc, pp_orig->kvars_deflist)
diff --git a/src/pg_strom.h b/src/pg_strom.h
index 2ea4145e..991c682b 100644
--- a/src/pg_strom.h
+++ b/src/pg_strom.h
@@ -280,8 +280,7 @@ typedef struct
List *used_params; /* param list in use */
List *host_quals; /* host qualifiers to scan the outer */
Index scan_relid; /* relid of the outer relation to scan */
- List *scan_quals_fallback;/* device qualifiers to scan the outer */
- List *scan_quals_explain; /* device qualifiers for EXPLAIN output */
+ List *scan_quals; /* device qualifiers to scan the outer */
double scan_tuples; /* copy of baserel->tuples */
double scan_nrows; /* copy of baserel->rows */
int parallel_nworkers; /* # of parallel workers */
@@ -468,7 +467,8 @@ struct pgstromTaskState
size_t fallback_bufsz;
char *fallback_buffer;
TupleTableSlot *fallback_slot; /* host-side kvars-slot */
- ProjectionInfo *fallback_proj; /* base or fallback slot -> custom_tlist */
+ List *fallback_proj;
+// ProjectionInfo *fallback_proj; /* base or fallback slot -> custom_tlist */
/* request command buffer (+ status for table scan) */
TBMIterateResult *curr_tbm;
Buffer curr_vm_buffer; /* for visibility-map */
@@ -547,7 +547,7 @@ extern int heterodbExtraGetError(const char **p_filename,
*/
typedef struct
{
- int kv_slot_id; /* slot-id of kernel varslot / CPU fallback */
+ int kv_slot_id; /* slot-id of kernel varslot */
int kv_depth; /* source depth */
int kv_resno; /* source resno, if exist */
int kv_maxref; /* max depth that references this column. */
@@ -559,6 +559,7 @@ typedef struct
int16_t kv_typlen; /* typlen from the catalog */
int kv_xdatum_sizeof;/* =sizeof(xpu_XXXX_t), if any */
int kv_kvec_sizeof; /* =sizeof(kvec_XXXX_t), if any */
+ int kv_fallback; /* slot-id for CPU fallback */
Expr *kv_expr; /* original expression */
List *kv_subfields; /* subfields definition, if array or composite */
} codegen_kvar_defitem;
@@ -718,6 +719,8 @@ extern void xpuClientPutResponse(XpuCommand *xcmd);
extern const XpuCommand *pgstromBuildSessionInfo(pgstromTaskState *pts,
uint32_t join_inner_handle,
TupleDesc tdesc_final);
+extern Node *pgstromCreateTaskState(CustomScan *cscan,
+ const CustomExecMethods *methods);
extern void pgstromExecInitTaskState(CustomScanState *node,
EState *estate,
int eflags);
@@ -816,6 +819,8 @@ extern void gpuCachePutDeviceBuffer(void *gc_lmap);
extern void sort_device_qualifiers(List *dev_quals_list,
List *dev_costs_list);
extern pgstromPlanInfo *try_fetch_xpuscan_planinfo(const Path *path);
+extern List *assign_custom_cscan_tlist(List *tlist_dev,
+ pgstromPlanInfo *pp_info);
extern List *buildOuterScanPlanInfo(PlannerInfo *root,
RelOptInfo *baserel,
uint32_t xpu_task_flags,
diff --git a/test/parallel_schedule b/test/parallel_schedule
index 08122ee9..c9a85f16 100644
--- a/test/parallel_schedule
+++ b/test/parallel_schedule
@@ -13,12 +13,12 @@ test: pgstrom_guc
# ----------
# Test for each data types
# ----------
-test: dtype_int dtype_float dtype_numeric dtype_time dtype_text dtype_jsonb additional_dtype
+#test: dtype_int dtype_float dtype_numeric dtype_time dtype_text dtype_jsonb additional_dtype
# ----------
# Test for various functions / expressions
# ----------
-test: dfunc_math dfunc_mbtext dexpr_scalar_array_op dexpr_misc
+#test: dfunc_math dfunc_mbtext dexpr_scalar_array_op dexpr_misc
# ----------
# Test for arrow_fdw