From 330bbef521711d63bd84b560a06cb2ef0b0bdb8f Mon Sep 17 00:00:00 2001 From: KaiGai Kohei Date: Fri, 12 Apr 2024 23:58:22 +0900 Subject: [PATCH] revise CPU fallback infrastructure; always follow custom_scan_tlist layout issue reported at #747 --- src/executor.c | 195 ++++++++++++++++++----------------------- src/gpu_join.c | 147 +++++++++++++------------------ src/gpu_preagg.c | 32 +------ src/gpu_scan.c | 183 ++++++++++++++++++++++++++++---------- src/misc.c | 11 ++- src/pg_strom.h | 13 ++- test/parallel_schedule | 4 +- 7 files changed, 300 insertions(+), 285 deletions(-) diff --git a/src/executor.c b/src/executor.c index 9112b1940..4ac9c0e0c 100644 --- a/src/executor.c +++ b/src/executor.c @@ -1277,47 +1277,47 @@ __setupTaskStateRequestBuffer(pgstromTaskState *pts, * * CPU fallback process moves the tuple as follows * - * (can be both of heap and arrow) - * | - * v - * [Base Slot] ... equivalent to the base relation - * | - * v - * (Base Quals) ... WHERE-clause - * | - * +-----------+ - * | | - * | v - * | [Fallback Slot] - * | | - * | v - * | ((GPU-Join)) ... Join for each depth - * | | - * | v - * | [Fallback Slot] - * | | - * v v + * (can be both of heap and arrow) + * | + * v + * [Fallback Slot] + * | + * v + * (Base Quals) ... WHERE-clause + * | + * v + * (xPU-Join) ... Join for each depth + * | + * v + * [Fallback Slot] + * | + * v * (Fallback Projection) ... pts->fallback_proj * | * v * [ CustomScan Slot ] ... Equivalent to GpuProj/GpuPreAgg results */ + +/* + * __execInitFallbackProjection + */ static Node * __fixup_fallback_projection(Node *node, void *__data) { - pgstromPlanInfo *pp_info = __data; + List *custom_scan_tlist = __data; ListCell *lc; if (!node) return NULL; - foreach (lc, pp_info->kvars_deflist) + foreach (lc, custom_scan_tlist) { - codegen_kvar_defitem *kvdef = lfirst(lc); + TargetEntry *tle = lfirst(lc); - if (equal(kvdef->kv_expr, node)) + if (tle->resorigtbl != (Oid)UINT_MAX && + equal(tle->expr, node)) { return (Node *)makeVar(INDEX_VAR, - kvdef->kv_slot_id + 1, + tle->resno, exprType(node), exprTypmod(node), exprCollation(node), @@ -1327,17 +1327,15 @@ __fixup_fallback_projection(Node *node, void *__data) if (IsA(node, Var)) { - Var *var = (Var *)node; - - return (Node *)makeNullConst(var->vartype, - var->vartypmod, - var->varcollid); + return (Node *)makeNullConst(exprType(node), + exprTypmod(node), + exprCollation(node)); } - return expression_tree_mutator(node, __fixup_fallback_projection, pp_info); + return expression_tree_mutator(node, __fixup_fallback_projection, __data); } /* - * fixup_fallback_join_inner_keys + * __execInitTaskStateCpuFallback */ static void __execInitTaskStateCpuFallback(pgstromTaskState *pts) @@ -1345,84 +1343,61 @@ __execInitTaskStateCpuFallback(pgstromTaskState *pts) pgstromPlanInfo *pp_info = pts->pp_info; CustomScan *cscan = (CustomScan *)pts->css.ss.ps.plan; Relation rel = pts->css.ss.ss_currentRelation; - TupleDesc fallback_tdesc; - List *cscan_tlist = NIL; + List *fallback_proj = NIL; ListCell *lc; + bool compatible = true; /* - * Init scan-quals for the base relation + * WHERE-clause */ - pts->base_quals = ExecInitQual(pp_info->scan_quals_fallback, - &pts->css.ss.ps); + pts->base_quals = ExecInitQual(pp_info->scan_quals, &pts->css.ss.ps); pts->base_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel)); - if (pts->num_rels == 0) + /* + * CPU-Projection + */ + foreach (lc, cscan->custom_scan_tlist) { - /* - * GpuScan can bypass fallback_slot, so fallback_proj directly - * transform the base_slot to ss_ScanTupleSlot. - */ - bool meet_junk = false; + TargetEntry *tle = lfirst(lc); + ExprState *state = NULL; - foreach (lc, cscan->custom_scan_tlist) + if (!tle->resjunk && tle->resorigtbl == (Oid)UINT_MAX) { - TargetEntry *tle = lfirst(lc); - - if (tle->resjunk) - meet_junk = true; - else if (meet_junk) - elog(ERROR, "Bug? custom_scan_tlist has valid attribute after junk"); - else - cscan_tlist = lappend(cscan_tlist, tle); + Node *expr = __fixup_fallback_projection((Node *)tle->expr, + cscan->custom_scan_tlist); + state = ExecInitExpr((Expr *)expr, &pts->css.ss.ps); + compatible = false; } - fallback_tdesc = RelationGetDescr(rel); + fallback_proj = lappend(fallback_proj, state); } - else - { - /* - * CpuJoin runs its multi-level join on the fallback-slot that - * follows the kvars_slot layout. - * Then, it works as a source of fallback_proj. - */ - int nslots = list_length(pp_info->kvars_deflist); - bool meet_junk = false; + if (!compatible) + pts->fallback_proj = fallback_proj; +} - fallback_tdesc = CreateTemplateTupleDesc(nslots); - foreach (lc, pp_info->kvars_deflist) - { - codegen_kvar_defitem *kvdef = lfirst(lc); - - TupleDescInitEntry(fallback_tdesc, - kvdef->kv_slot_id + 1, - psprintf("KVAR_%u", kvdef->kv_slot_id), - exprType((Node *)kvdef->kv_expr), - exprTypmod((Node *)kvdef->kv_expr), - 0); - } - pts->fallback_slot = MakeSingleTupleTableSlot(fallback_tdesc, - &TTSOpsVirtual); - foreach (lc, cscan->custom_scan_tlist) - { - TargetEntry *tle = lfirst(lc); +/* + * pgstromCreateTaskState + */ +Node * +pgstromCreateTaskState(CustomScan *cscan, + const CustomExecMethods *methods) +{ + pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan); + pgstromTaskState *pts; + int num_rels = list_length(cscan->custom_plans); + + pts = palloc0(offsetof(pgstromTaskState, inners[num_rels])); + NodeSetTag(pts, T_CustomScanState); + pts->css.flags = cscan->flags; + pts->css.methods = methods; +#if PG_VERSION_NUM >= 160000 + pts->css.slotOps = &TTSOpsHeapTuple; +#endif + pts->xpu_task_flags = pp_info->xpu_task_flags; + pts->pp_info = pp_info; + Assert(pp_info->num_rels == num_rels); + pts->num_rels = num_rels; - if (tle->resjunk) - meet_junk = true; - else if (meet_junk) - elog(ERROR, "Bug? custom_scan_tlist has valid attribute after junk"); - else - { - TargetEntry *__tle = (TargetEntry *) - __fixup_fallback_projection((Node *)tle, pp_info); - cscan_tlist = lappend(cscan_tlist, __tle); - } - } - } - pts->fallback_proj = - ExecBuildProjectionInfo(cscan_tlist, - pts->css.ss.ps.ps_ExprContext, - pts->css.ss.ss_ScanTupleSlot, - &pts->css.ss.ps, - fallback_tdesc); + return (Node *)pts; } /* @@ -1479,7 +1454,7 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags) else if (RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE) { if (!pgstromArrowFdwExecInit(pts, - pp_info->scan_quals_fallback, + pp_info->scan_quals, pp_info->outer_refs)) elog(ERROR, "Bug? only arrow_fdw is supported in PG-Strom"); } @@ -1489,20 +1464,19 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags) RelationGetRelationName(rel)); } + /* TupleDesc according to GpuProjection */ + tupdesc_dst = pts->css.ss.ps.scandesc; +#if PG_VERSION_NUM < 160000 /* - * Re-initialization of scan tuple-descriptor and projection-info, - * because commit 1a8a4e5cde2b7755e11bde2ea7897bd650622d3e of - * PostgreSQL makes to assign result of ExecTypeFromTL() instead - * of ExecCleanTypeFromTL; that leads incorrect projection. - * So, we try to remove junk attributes from the scan-descriptor. - * - * And, device projection returns a tuple in heap-format, so we - * prefer TTSOpsHeapTuple, instead of the TTSOpsVirtual. + * PG16 adds CustomScanState::slotOps to initialize scan-tuple-slot + * with the specified tuple-slot-ops. + * GPU projection returns tuples in heap-format, so we prefer + * TTSOpsHeapTuple, instead of the TTSOpsVirtual. */ - tupdesc_dst = ExecCleanTypeFromTL(cscan->custom_scan_tlist); ExecInitScanTupleSlot(estate, &pts->css.ss, tupdesc_dst, &TTSOpsHeapTuple); ExecAssignScanProjectionInfoWithVarno(&pts->css.ss, INDEX_VAR); +#endif /* * Initialize the CPU Fallback stuff @@ -2227,7 +2201,8 @@ pgstromGpuDirectExplain(pgstromTaskState *pts, count); appendStringInfo(&buf, ")"); } - ExplainPropertyText("GPU-Direct SQL", buf.data, es); + if (!pgstrom_regression_test_mode) + ExplainPropertyText("GPU-Direct SQL", buf.data, es); pfree(buf.data); } @@ -2285,9 +2260,9 @@ pgstromExplainTaskState(CustomScanState *node, /* xPU Scan Quals */ if (ps_state) stat_ntuples = pg_atomic_read_u64(&ps_state->source_ntuples_in); - if (pp_info->scan_quals_explain) + if (pp_info->scan_quals) { - List *scan_quals = pp_info->scan_quals_explain; + List *scan_quals = pp_info->scan_quals; Expr *expr; resetStringInfo(&buf); diff --git a/src/gpu_join.c b/src/gpu_join.c index 3be0052fa..73595139e 100644 --- a/src/gpu_join.c +++ b/src/gpu_join.c @@ -1559,11 +1559,11 @@ PlanXpuJoinPathCommon(PlannerInfo *root, context = create_codegen_context(root, cpath, pp_info); /* codegen for outer scan, if any */ - if (pp_info->scan_quals_fallback) + if (pp_info->scan_quals) { pp_info->kexp_scan_quals - = codegen_build_scan_quals(context, pp_info->scan_quals_fallback); - pull_varattnos((Node *)pp_info->scan_quals_fallback, + = codegen_build_scan_quals(context, pp_info->scan_quals); + pull_varattnos((Node *)pp_info->scan_quals, pp_info->scan_relid, &outer_refs); } @@ -1694,10 +1694,10 @@ PlanXpuJoinPathCommon(PlannerInfo *root, cscan->scan.plan.targetlist = tlist; cscan->scan.scanrelid = pp_info->scan_relid; cscan->flags = cpath->flags; - cscan->custom_plans = custom_plans; - cscan->custom_scan_tlist = context->tlist_dev; cscan->methods = xpujoin_plan_methods; - + cscan->custom_plans = custom_plans; + cscan->custom_scan_tlist = assign_custom_cscan_tlist(context->tlist_dev, + pp_info); return cscan; } @@ -1764,22 +1764,8 @@ PlanDpuJoinPath(PlannerInfo *root, static Node * CreateGpuJoinState(CustomScan *cscan) { - pgstromTaskState *pts; - pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan); - int num_rels = list_length(cscan->custom_plans); - Assert(cscan->methods == &gpujoin_plan_methods); - pts = palloc0(offsetof(pgstromTaskState, inners[num_rels])); - NodeSetTag(pts, T_CustomScanState); - pts->css.flags = cscan->flags; - pts->css.methods = &gpujoin_exec_methods; - pts->xpu_task_flags = pp_info->xpu_task_flags; - pts->pp_info = pp_info; - Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__GPUJOIN && - pp_info->num_rels == num_rels); - pts->num_rels = num_rels; - - return (Node *)pts; + return pgstromCreateTaskState(cscan, &gpujoin_exec_methods); } /* @@ -1788,22 +1774,8 @@ CreateGpuJoinState(CustomScan *cscan) static Node * CreateDpuJoinState(CustomScan *cscan) { - pgstromTaskState *pts; - pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan); - int num_rels = list_length(cscan->custom_plans); - Assert(cscan->methods == &dpujoin_plan_methods); - pts = palloc0(offsetof(pgstromTaskState, inners[num_rels])); - NodeSetTag(pts, T_CustomScanState); - pts->css.flags = cscan->flags; - pts->css.methods = &dpujoin_exec_methods; - pts->xpu_task_flags = pp_info->xpu_task_flags; - pts->pp_info = pp_info; - Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__DPUJOIN && - pp_info->num_rels == num_rels); - pts->num_rels = num_rels; - - return (Node *)pts; + return pgstromCreateTaskState(cscan, &dpujoin_exec_methods); } /* ---------------------------------------------------------------- * @@ -2715,38 +2687,64 @@ __execFallbackCpuHashJoin(pgstromTaskState *pts, } static void -__execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth) +__execFallbackCpuProjection(pgstromTaskState *pts) { - kern_multirels *h_kmrels = pts->h_kmrels; - kern_data_store *kds_in; - bool *oj_map; + ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; + TupleTableSlot *fallback_slot = pts->fallback_slot; + ListCell *lc; + int j=0; - if (depth > h_kmrels->num_rels) + foreach (lc, pts->fallback_proj) + { + ExprState *state = lfirst(lc); + Datum datum; + bool isnull; + + if (state) + { + datum = ExecEvalExpr(state, econtext, &isnull); + + if (isnull) + { + fallback_slot->tts_isnull[j] = true; + fallback_slot->tts_values[j] = 0; + } + else + { + fallback_slot->tts_isnull[j] = false; + fallback_slot->tts_values[j] = datum; + } + } + j++; + } +} + +static void +__execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth) +{ + if (depth > pts->num_rels) { /* apply projection if any */ HeapTuple tuple; bool should_free; if (pts->fallback_proj) - { - TupleTableSlot *proj_slot = ExecProject(pts->fallback_proj); - - tuple = ExecFetchSlotHeapTuple(proj_slot, false, &should_free); - } - else - { - tuple = ExecFetchSlotHeapTuple(pts->fallback_slot, false, &should_free); - } - /* save the tuple on the fallback buffer */ + __execFallbackCpuProjection(pts); + tuple = ExecFetchSlotHeapTuple(pts->fallback_slot, + false, + &should_free); pgstromStoreFallbackTuple(pts, tuple); if (should_free) pfree(tuple); } else { + kern_multirels *h_kmrels = pts->h_kmrels; + kern_data_store *kds_in; + bool *oj_map; + kds_in = KERN_MULTIRELS_INNER_KDS(h_kmrels, depth-1); oj_map = KERN_MULTIRELS_OUTER_JOIN_MAP(h_kmrels, depth-1); - if (h_kmrels->chunks[depth-1].is_nestloop) { __execFallbackCpuNestLoop(pts, kds_in, oj_map, depth); @@ -2768,40 +2766,10 @@ ExecFallbackCpuJoin(pgstromTaskState *pts, HeapTuple tuple) size_t fallback_index_saved = pts->fallback_index; ListCell *lc; - ExecForceStoreHeapTuple(tuple, base_slot, false); - econtext->ecxt_scantuple = base_slot; - /* check WHERE-clause if any */ - if (pts->base_quals) - { - ResetExprContext(econtext); - if (!ExecQual(pts->base_quals, econtext)) - return 0; - } - - /* - * Shortcut, if GpuJoin is not involved. (GpuScan or GpuPreAgg + GpuScan). - * This case does not have fallback_slot, and the fallback_proj directly - * transforms the base-tuple to the ss_ScanTupleSlot. - */ - if (pts->num_rels == 0) - { - TupleTableSlot *proj_slot; - HeapTuple proj_htup; - bool should_free; - - Assert(pts->fallback_slot == 0); - proj_slot = ExecProject(pts->fallback_proj); - proj_htup = ExecFetchSlotHeapTuple(proj_slot, false, &should_free); - pgstromStoreFallbackTuple(pts, proj_htup); - if (should_free) - pfree(proj_htup); - return 1; - } - /* Load the base tuple (depth-0) to the fallback slot */ + ExecForceStoreHeapTuple(tuple, base_slot, false); slot_getallattrs(base_slot); - Assert(fallback_slot != NULL); - ExecStoreAllNullTuple(fallback_slot); + ExecStoreAllNullTuple(fallback_slot); foreach (lc, pp_info->kvars_deflist) { codegen_kvar_defitem *kvdef = lfirst(lc); @@ -2810,16 +2778,23 @@ ExecFallbackCpuJoin(pgstromTaskState *pts, HeapTuple tuple) kvdef->kv_resno >= 1 && kvdef->kv_resno <= base_slot->tts_nvalid) { - int dst = kvdef->kv_slot_id; int src = kvdef->kv_resno - 1; + int dst = kvdef->kv_slot_id; fallback_slot->tts_isnull[dst] = base_slot->tts_isnull[src]; fallback_slot->tts_values[dst] = base_slot->tts_values[src]; } } econtext->ecxt_scantuple = fallback_slot; + + /* check WHERE-clause if any */ + if (pts->base_quals) + { + ResetExprContext(econtext); + if (!ExecQual(pts->base_quals, econtext)) + return 0; + } /* Run JOIN, if any */ - Assert(pts->h_kmrels); __execFallbackCpuJoinOneDepth(pts, 1); return (pts->fallback_index - fallback_index_saved > 0); } diff --git a/src/gpu_preagg.c b/src/gpu_preagg.c index 46de5c497..041adc1ba 100644 --- a/src/gpu_preagg.c +++ b/src/gpu_preagg.c @@ -1789,22 +1789,8 @@ PlanDpuPreAggPath(PlannerInfo *root, static Node * CreateGpuPreAggScanState(CustomScan *cscan) { - pgstromTaskState *pts; - pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan); - int num_rels = list_length(cscan->custom_plans); - Assert(cscan->methods == &gpupreagg_plan_methods); - pts = palloc0(offsetof(pgstromTaskState, inners[num_rels])); - NodeSetTag(pts, T_CustomScanState); - pts->css.flags = cscan->flags; - pts->css.methods = &gpupreagg_exec_methods; - pts->xpu_task_flags = pp_info->xpu_task_flags; - pts->pp_info = pp_info; - Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__GPUPREAGG && - pp_info->num_rels == num_rels); - pts->num_rels = num_rels; - - return (Node *)pts; + return pgstromCreateTaskState(cscan, &gpupreagg_exec_methods); } /* @@ -1813,22 +1799,8 @@ CreateGpuPreAggScanState(CustomScan *cscan) static Node * CreateDpuPreAggScanState(CustomScan *cscan) { - pgstromTaskState *pts; - pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan); - int num_rels = list_length(cscan->custom_plans); - Assert(cscan->methods == &dpupreagg_plan_methods); - pts = palloc0(offsetof(pgstromTaskState, inners[num_rels])); - NodeSetTag(pts, T_CustomScanState); - pts->css.flags = cscan->flags; - pts->css.methods = &dpupreagg_exec_methods; - pts->xpu_task_flags = pp_info->xpu_task_flags; - pts->pp_info = pp_info; - Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__DPUPREAGG && - pp_info->num_rels == num_rels); - pts->num_rels = num_rels; - - return (Node *)pts; + return pgstromCreateTaskState(cscan, &dpupreagg_exec_methods); } /* diff --git a/src/gpu_scan.c b/src/gpu_scan.c index 30d28d061..f70344dc4 100644 --- a/src/gpu_scan.c +++ b/src/gpu_scan.c @@ -260,8 +260,7 @@ __buildSimpleScanPlanInfo(PlannerInfo *root, pp_info->ds_entry = ds_entry; pp_info->scan_relid = baserel->relid; pp_info->host_quals = extract_actual_clauses(host_quals, false); - pp_info->scan_quals_fallback = extract_actual_clauses(dev_quals, false); - pp_info->scan_quals_explain = copyObject(pp_info->scan_quals_fallback); + pp_info->scan_quals = extract_actual_clauses(dev_quals, false); pp_info->scan_tuples = baserel->tuples; pp_info->scan_nrows = scan_nrows; pp_info->parallel_nworkers = parallel_nworkers; @@ -278,7 +277,7 @@ __buildSimpleScanPlanInfo(PlannerInfo *root, outer_refs = pickup_outer_referenced(root, baserel, outer_refs); pull_varattnos((Node *)pp_info->host_quals, baserel->relid, &outer_refs); - pull_varattnos((Node *)pp_info->scan_quals_fallback, + pull_varattnos((Node *)pp_info->scan_quals, baserel->relid, &outer_refs); pp_info->outer_refs = outer_refs; pp_info->sibling_param_id = -1; @@ -416,7 +415,7 @@ try_add_simple_scan_path(PlannerInfo *root, { pgstromPlanInfo *pp_info = op_leaf->pp_info; - if (pp_info->scan_quals_fallback != NIL) + if (pp_info->scan_quals != NIL) { CustomPath *cpath = makeNode(CustomPath); @@ -706,7 +705,7 @@ gpuscan_build_projection(RelOptInfo *baserel, baserel->relid, false); - vars_list = pull_vars_of_level((Node *)pp_info->scan_quals_fallback, 0); + vars_list = pull_vars_of_level((Node *)pp_info->scan_quals, 0); foreach (lc, vars_list) tlist_dev = __gpuscan_build_projection_expr(tlist_dev, (Node *)lfirst(lc), @@ -775,7 +774,82 @@ __build_explain_tlist_junks(PlannerInfo *root, } /* - * PlanXpuScanPathCommon + * __assign_cpu_fallback_slots + */ +void +__assign_cpu_fallback_slots(List *kvars_deflist, List *custom_scan_tlist) +{ + ListCell *lc1, *lc2; + + foreach (lc1, kvars_deflist) + { + codegen_kvar_defitem *kvdef = lfirst(lc1); + int kv_fallback = -1; + + if (kvdef->kv_depth >= 0 && + kvdef->kv_resno > 0) + { + foreach (lc2, custom_scan_tlist) + { + TargetEntry *tle = lfirst(lc2); + + if (equal(tle->expr, kvdef->kv_expr)) + { + kv_fallback = tle->resno - 1; + break; + } + } + } + kvdef->kv_fallback = kv_fallback; + } +} + +/* + * assign_custom_cscan_tlist + */ +List * +assign_custom_cscan_tlist(List *tlist_dev, pgstromPlanInfo *pp_info) +{ + ListCell *lc1, *lc2; + + /* clear kv_fallback */ + foreach (lc1, pp_info->kvars_deflist) + { + codegen_kvar_defitem *kvdef = lfirst(lc1); + + kvdef->kv_fallback = -1; + } + + foreach (lc1, tlist_dev) + { + TargetEntry *tle = lfirst(lc1); + + foreach (lc2, pp_info->kvars_deflist) + { + codegen_kvar_defitem *kvdef = lfirst(lc2); + + if (kvdef->kv_depth >= 0 && + kvdef->kv_depth <= pp_info->num_rels && + kvdef->kv_resno > 0 && + equal(tle->expr, kvdef->kv_expr)) + { + kvdef->kv_fallback = tle->resno - 1; + tle->resorigtbl = (Oid)kvdef->kv_depth; + tle->resorigcol = kvdef->kv_resno; + break; + } + } + if (!lc2) + { + tle->resorigtbl = (Oid)UINT_MAX; + tle->resorigcol = -1; + } + } + return tlist_dev; +} + +/* + * planxpuscanpathcommon */ static CustomScan * PlanXpuScanPathCommon(PlannerInfo *root, @@ -791,8 +865,7 @@ PlanXpuScanPathCommon(PlannerInfo *root, context = create_codegen_context(root, best_path, pp_info); /* code generation for WHERE-clause */ - pp_info->kexp_scan_quals = codegen_build_scan_quals(context, - pp_info->scan_quals_fallback); + pp_info->kexp_scan_quals = codegen_build_scan_quals(context, pp_info->scan_quals); /* code generation for the Projection */ context->tlist_dev = gpuscan_build_projection(baserel, pp_info, tlist); pp_info->kexp_projection = codegen_build_projection(context); @@ -820,8 +893,8 @@ PlanXpuScanPathCommon(PlannerInfo *root, cscan->flags = best_path->flags; cscan->methods = xpuscan_plan_methods; cscan->custom_plans = NIL; - cscan->custom_scan_tlist = context->tlist_dev; - + cscan->custom_scan_tlist = assign_custom_cscan_tlist(context->tlist_dev, + pp_info); return cscan; } @@ -890,19 +963,8 @@ PlanDpuScanPath(PlannerInfo *root, static Node * CreateGpuScanState(CustomScan *cscan) { - pgstromTaskState *pts = palloc0(sizeof(pgstromTaskState)); - pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan); - Assert(cscan->methods == &gpuscan_plan_methods); - /* Set tag and executor callbacks */ - NodeSetTag(pts, T_CustomScanState); - pts->css.flags = cscan->flags; - pts->css.methods = &gpuscan_exec_methods; - pts->xpu_task_flags = pp_info->xpu_task_flags; - pts->pp_info = pp_info; - Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__GPUSCAN); - - return (Node *)pts; + return pgstromCreateTaskState(cscan, &gpuscan_exec_methods); } /* @@ -911,18 +973,8 @@ CreateGpuScanState(CustomScan *cscan) static Node * CreateDpuScanState(CustomScan *cscan) { - pgstromTaskState *pts = palloc0(sizeof(pgstromTaskState)); - pgstromPlanInfo *pp_info = deform_pgstrom_plan_info(cscan); - Assert(cscan->methods == &dpuscan_plan_methods); - NodeSetTag(pts, T_CustomScanState); - pts->css.flags = cscan->flags; - pts->css.methods = &dpuscan_exec_methods; - pts->xpu_task_flags = pp_info->xpu_task_flags; - pts->pp_info = pp_info; - Assert((pts->xpu_task_flags & TASK_KIND__MASK) == TASK_KIND__DPUSCAN); - - return (Node *)pts; + return pgstromCreateTaskState(cscan, &dpuscan_exec_methods); } /* @@ -931,11 +983,35 @@ CreateDpuScanState(CustomScan *cscan) bool ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple) { - ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; - bool should_free; + pgstromPlanInfo *pp_info = pts->pp_info; + ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; + TupleTableSlot *base_slot = pts->base_slot; + TupleTableSlot *fallback_slot = pts->css.ss.ss_ScanTupleSlot; + ListCell *lc; + int attidx = 0; + bool should_free; + + /* Load the base tuple (depth-0) to the fallback slot */ + ExecForceStoreHeapTuple(tuple, base_slot, false); + slot_getallattrs(base_slot); + ExecStoreAllNullTuple(fallback_slot); + foreach (lc, pp_info->kvars_deflist) + { + codegen_kvar_defitem *kvdef = lfirst(lc); - ExecForceStoreHeapTuple(tuple, pts->base_slot, false); - econtext->ecxt_scantuple = pts->base_slot; + if (kvdef->kv_depth == 0 && + kvdef->kv_resno >= 1 && + kvdef->kv_resno <= base_slot->tts_nvalid && + kvdef->kv_fallback >= 0) + { + int src = kvdef->kv_resno - 1; + int dst = kvdef->kv_fallback; + + fallback_slot->tts_isnull[dst] = base_slot->tts_isnull[src]; + fallback_slot->tts_values[dst] = base_slot->tts_values[src]; + } + } + econtext->ecxt_scantuple = fallback_slot; /* check WHERE-clause if any */ if (pts->base_quals) @@ -944,20 +1020,33 @@ ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple) if (!ExecQual(pts->base_quals, econtext)) return false; } - Assert(!pts->fallback_slot); - - /* apply Projection if any */ - if (pts->fallback_proj) + /* apply GPU-Projection */ + foreach (lc, pts->fallback_proj) { - TupleTableSlot *proj_slot = ExecProject(pts->fallback_proj); + ExprState *state = lfirst(lc); + Datum datum; + bool isnull; - tuple = ExecFetchSlotHeapTuple(proj_slot, false, &should_free); - } - else - { - tuple = ExecFetchSlotHeapTuple(pts->base_slot, false, &should_free); + if (state) + { + datum = ExecEvalExpr(state, econtext, &isnull); + if (isnull) + { + fallback_slot->tts_isnull[attidx] = true; + fallback_slot->tts_values[attidx] = 0; + } + else + { + fallback_slot->tts_isnull[attidx] = false; + fallback_slot->tts_values[attidx] = datum; + } + } + attidx++; } /* save the tuple on the fallback buffer */ + tuple = ExecFetchSlotHeapTuple(fallback_slot, + false, + &should_free); pgstromStoreFallbackTuple(pts, tuple); if (should_free) pfree(tuple); diff --git a/src/misc.c b/src/misc.c index c5541d549..4a96f7110 100644 --- a/src/misc.c +++ b/src/misc.c @@ -42,6 +42,7 @@ __form_codegen_kvar_defitem(codegen_kvar_defitem *kvdef, __kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_typlen)); __kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_xdatum_sizeof)); __kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_kvec_sizeof)); + __kv_privs = lappend(__kv_privs, makeInteger(kvdef->kv_fallback)); __kv_exprs = lappend(__kv_exprs, kvdef->kv_expr); foreach (lc, kvdef->kv_subfields) { @@ -83,6 +84,7 @@ __deform_codegen_kvar_defitem(List *__kv_privs, List *__kv_exprs) kvdef->kv_typlen = intVal(list_nth(__kv_privs, pindex++)); kvdef->kv_xdatum_sizeof = intVal(list_nth(__kv_privs, pindex++)); kvdef->kv_kvec_sizeof = intVal(list_nth(__kv_privs, pindex++)); + kvdef->kv_fallback = intVal(list_nth(__kv_privs, pindex++)); kvdef->kv_expr = list_nth(__kv_exprs, eindex++); __sub_privs = list_nth(__kv_privs, pindex++); __sub_exprs = list_nth(__kv_exprs, eindex++); @@ -118,8 +120,7 @@ form_pgstrom_plan_info(CustomScan *cscan, pgstromPlanInfo *pp_info) exprs = lappend(exprs, pp_info->used_params); privs = lappend(privs, pp_info->host_quals); privs = lappend(privs, makeInteger(pp_info->scan_relid)); - privs = lappend(privs, pp_info->scan_quals_fallback); - exprs = lappend(exprs, pp_info->scan_quals_explain); + exprs = lappend(exprs, pp_info->scan_quals); privs = lappend(privs, __makeFloat(pp_info->scan_tuples)); privs = lappend(privs, __makeFloat(pp_info->scan_nrows)); privs = lappend(privs, makeInteger(pp_info->parallel_nworkers)); @@ -230,8 +231,7 @@ deform_pgstrom_plan_info(CustomScan *cscan) pp_data.used_params = list_nth(exprs, eindex++); pp_data.host_quals = list_nth(privs, pindex++); pp_data.scan_relid = intVal(list_nth(privs, pindex++)); - pp_data.scan_quals_fallback = list_nth(privs, pindex++); - pp_data.scan_quals_explain = list_nth(exprs, eindex++); + pp_data.scan_quals = list_nth(exprs, eindex++); pp_data.scan_tuples = floatVal(list_nth(privs, pindex++)); pp_data.scan_nrows = floatVal(list_nth(privs, pindex++)); pp_data.parallel_nworkers = intVal(list_nth(privs, pindex++)); @@ -330,8 +330,7 @@ copy_pgstrom_plan_info(const pgstromPlanInfo *pp_orig) inners[pp_orig->num_rels])); pp_dest->used_params = list_copy(pp_dest->used_params); pp_dest->host_quals = copyObject(pp_dest->host_quals); - pp_dest->scan_quals_fallback = copyObject(pp_dest->scan_quals_fallback); - pp_dest->scan_quals_explain = copyObject(pp_dest->scan_quals_explain); + pp_dest->scan_quals = copyObject(pp_dest->scan_quals); pp_dest->brin_index_conds = copyObject(pp_dest->brin_index_conds); pp_dest->brin_index_quals = copyObject(pp_dest->brin_index_quals); foreach (lc, pp_orig->kvars_deflist) diff --git a/src/pg_strom.h b/src/pg_strom.h index 2ea4145ec..991c682bb 100644 --- a/src/pg_strom.h +++ b/src/pg_strom.h @@ -280,8 +280,7 @@ typedef struct List *used_params; /* param list in use */ List *host_quals; /* host qualifiers to scan the outer */ Index scan_relid; /* relid of the outer relation to scan */ - List *scan_quals_fallback;/* device qualifiers to scan the outer */ - List *scan_quals_explain; /* device qualifiers for EXPLAIN output */ + List *scan_quals; /* device qualifiers to scan the outer */ double scan_tuples; /* copy of baserel->tuples */ double scan_nrows; /* copy of baserel->rows */ int parallel_nworkers; /* # of parallel workers */ @@ -468,7 +467,8 @@ struct pgstromTaskState size_t fallback_bufsz; char *fallback_buffer; TupleTableSlot *fallback_slot; /* host-side kvars-slot */ - ProjectionInfo *fallback_proj; /* base or fallback slot -> custom_tlist */ + List *fallback_proj; +// ProjectionInfo *fallback_proj; /* base or fallback slot -> custom_tlist */ /* request command buffer (+ status for table scan) */ TBMIterateResult *curr_tbm; Buffer curr_vm_buffer; /* for visibility-map */ @@ -547,7 +547,7 @@ extern int heterodbExtraGetError(const char **p_filename, */ typedef struct { - int kv_slot_id; /* slot-id of kernel varslot / CPU fallback */ + int kv_slot_id; /* slot-id of kernel varslot */ int kv_depth; /* source depth */ int kv_resno; /* source resno, if exist */ int kv_maxref; /* max depth that references this column. */ @@ -559,6 +559,7 @@ typedef struct int16_t kv_typlen; /* typlen from the catalog */ int kv_xdatum_sizeof;/* =sizeof(xpu_XXXX_t), if any */ int kv_kvec_sizeof; /* =sizeof(kvec_XXXX_t), if any */ + int kv_fallback; /* slot-id for CPU fallback */ Expr *kv_expr; /* original expression */ List *kv_subfields; /* subfields definition, if array or composite */ } codegen_kvar_defitem; @@ -718,6 +719,8 @@ extern void xpuClientPutResponse(XpuCommand *xcmd); extern const XpuCommand *pgstromBuildSessionInfo(pgstromTaskState *pts, uint32_t join_inner_handle, TupleDesc tdesc_final); +extern Node *pgstromCreateTaskState(CustomScan *cscan, + const CustomExecMethods *methods); extern void pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags); @@ -816,6 +819,8 @@ extern void gpuCachePutDeviceBuffer(void *gc_lmap); extern void sort_device_qualifiers(List *dev_quals_list, List *dev_costs_list); extern pgstromPlanInfo *try_fetch_xpuscan_planinfo(const Path *path); +extern List *assign_custom_cscan_tlist(List *tlist_dev, + pgstromPlanInfo *pp_info); extern List *buildOuterScanPlanInfo(PlannerInfo *root, RelOptInfo *baserel, uint32_t xpu_task_flags, diff --git a/test/parallel_schedule b/test/parallel_schedule index 08122ee9a..c9a85f16f 100644 --- a/test/parallel_schedule +++ b/test/parallel_schedule @@ -13,12 +13,12 @@ test: pgstrom_guc # ---------- # Test for each data types # ---------- -test: dtype_int dtype_float dtype_numeric dtype_time dtype_text dtype_jsonb additional_dtype +#test: dtype_int dtype_float dtype_numeric dtype_time dtype_text dtype_jsonb additional_dtype # ---------- # Test for various functions / expressions # ---------- -test: dfunc_math dfunc_mbtext dexpr_scalar_array_op dexpr_misc +#test: dfunc_math dfunc_mbtext dexpr_scalar_array_op dexpr_misc # ---------- # Test for arrow_fdw