diff --git a/src/executor.c b/src/executor.c index 4ac9c0e0..dc2896c7 100644 --- a/src/executor.c +++ b/src/executor.c @@ -1270,36 +1270,8 @@ __setupTaskStateRequestBuffer(pgstromTaskState *pts, pts->xcmd_buf.len = off; } - - -/* - * __execInitTaskStateCpuFallback - * - * CPU fallback process moves the tuple as follows - * - * (can be both of heap and arrow) - * | - * v - * [Fallback Slot] - * | - * v - * (Base Quals) ... WHERE-clause - * | - * v - * (xPU-Join) ... Join for each depth - * | - * v - * [Fallback Slot] - * | - * v - * (Fallback Projection) ... pts->fallback_proj - * | - * v - * [ CustomScan Slot ] ... Equivalent to GpuProj/GpuPreAgg results - */ - /* - * __execInitFallbackProjection + * __fixup_fallback_projection */ static Node * __fixup_fallback_projection(Node *node, void *__data) @@ -1334,6 +1306,32 @@ __fixup_fallback_projection(Node *node, void *__data) return expression_tree_mutator(node, __fixup_fallback_projection, __data); } +/* + * fallback_varload_mapping + */ +typedef struct { + int32_t src_depth; + int32_t src_resno; + int32_t dst_resno; +} fallback_varload_mapping; + +static int +__compare_fallback_varload_mapping(const void *__a, const void *__b) +{ + const fallback_varload_mapping *a = __a; + const fallback_varload_mapping *b = __b; + + if (a->src_depth < b->src_depth) + return -1; + if (a->src_depth > b->src_depth) + return 1; + if (a->src_resno < b->src_resno) + return -1; + if (a->src_resno > b->src_resno) + return 1; + return 0; +} + /* * __execInitTaskStateCpuFallback */ @@ -1345,7 +1343,13 @@ __execInitTaskStateCpuFallback(pgstromTaskState *pts) Relation rel = pts->css.ss.ss_currentRelation; List *fallback_proj = NIL; ListCell *lc; + int nrooms = list_length(cscan->custom_scan_tlist); + int nitems = 0; + int last_depth = -1; + List *src_list = NIL; + List *dst_list = NIL; bool compatible = true; + fallback_varload_mapping *vl_map; /* * WHERE-clause @@ -1356,15 +1360,26 @@ __execInitTaskStateCpuFallback(pgstromTaskState *pts) /* * CPU-Projection */ + vl_map = alloca(sizeof(fallback_varload_mapping) * nrooms); foreach (lc, cscan->custom_scan_tlist) { TargetEntry *tle = lfirst(lc); - ExprState *state = NULL; + ExprState *state = NULL; + Node *expr; - if (!tle->resjunk && tle->resorigtbl == (Oid)UINT_MAX) + if (tle->resorigtbl >= 0 && + tle->resorigtbl <= pts->num_rels) + { + vl_map[nitems].src_depth = tle->resorigtbl; + vl_map[nitems].src_resno = tle->resorigcol; + vl_map[nitems].dst_resno = tle->resno; + nitems++; + } + else if (!tle->resjunk) { - Node *expr = __fixup_fallback_projection((Node *)tle->expr, - cscan->custom_scan_tlist); + Assert(tle->resorigtbl == (Oid)UINT_MAX); + expr = __fixup_fallback_projection((Node *)tle->expr, + cscan->custom_scan_tlist); state = ExecInitExpr((Expr *)expr, &pts->css.ss.ps); compatible = false; } @@ -1372,6 +1387,38 @@ __execInitTaskStateCpuFallback(pgstromTaskState *pts) } if (!compatible) pts->fallback_proj = fallback_proj; + + /* fallback var-loads */ + qsort(vl_map, nitems, + sizeof(fallback_varload_mapping), + __compare_fallback_varload_mapping); + + for (int i=0; i <= nitems; i++) + { + if (i == nitems || + vl_map[i].src_depth != last_depth) + { + if (last_depth == 0) + { + pts->fallback_load_src = src_list; + pts->fallback_load_dst = dst_list; + } + else if (last_depth > 0 && + last_depth <= pts->num_rels) + { + pts->inners[last_depth-1].inner_load_src = src_list; + pts->inners[last_depth-1].inner_load_dst = dst_list; + } + src_list = NIL; + dst_list = NIL; + if (i == nitems) + break; + } + last_depth = vl_map[i].src_depth; + src_list = lappend_int(src_list, vl_map[i].src_resno); + dst_list = lappend_int(dst_list, vl_map[i].dst_resno); + } + Assert(src_list == NIL && dst_list == NIL); } /* @@ -1498,15 +1545,15 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags) istate->econtext = CreateExprContext(estate); istate->depth = depth_index + 1; istate->join_type = pp_inner->join_type; - istate->join_quals = ExecInitQual(pp_inner->join_quals_fallback, + istate->join_quals = ExecInitQual(pp_inner->join_quals_original, &pts->css.ss.ps); - istate->other_quals = ExecInitQual(pp_inner->other_quals_fallback, + istate->other_quals = ExecInitQual(pp_inner->other_quals_original, &pts->css.ss.ps); if (pp_inner->join_type == JOIN_FULL || pp_inner->join_type == JOIN_RIGHT) has_right_outer = true; - foreach (cell, pp_inner->hash_outer_keys_fallback) + foreach (cell, pp_inner->hash_outer_keys_original) { Node *outer_key = (Node *)lfirst(cell); ExprState *es; @@ -1522,7 +1569,7 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags) dtype->type_hashfunc); } /* inner hash-keys references the result of inner-slot */ - foreach (cell, pp_inner->hash_inner_keys_fallback) + foreach (cell, pp_inner->hash_inner_keys_original) { Node *inner_key = (Node *)lfirst(cell); ExprState *es; diff --git a/src/gpu_join.c b/src/gpu_join.c index 73595139..2ebc4d05 100644 --- a/src/gpu_join.c +++ b/src/gpu_join.c @@ -1796,15 +1796,27 @@ typedef struct } inner_preload_buffer; static uint32_t -get_tuple_hashvalue(pgstromTaskInnerState *istate, - TupleTableSlot *slot) +get_tuple_hashvalue(pgstromTaskState *pts, + pgstromTaskInnerState *istate, + TupleTableSlot *inner_slot) { - ExprContext *econtext = istate->econtext; - uint32_t hash = 0xffffffffU; - ListCell *lc1, *lc2; + ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; + TupleTableSlot *scan_slot = pts->css.ss.ss_ScanTupleSlot; + uint32_t hash = 0xffffffffU; + ListCell *lc1, *lc2; + + /* move to scan_slot from inner_slot */ + forboth (lc1, istate->inner_load_src, + lc2, istate->inner_load_dst) + { + int src = lfirst_int(lc1) - 1; + int dst = lfirst_int(lc2) - 1; + scan_slot->tts_isnull[dst] = inner_slot->tts_isnull[src]; + scan_slot->tts_values[dst] = inner_slot->tts_values[src]; + } /* calculation of a hash value of this entry */ - econtext->ecxt_innertuple = slot; + econtext->ecxt_scantuple = scan_slot; forboth (lc1, istate->hash_inner_keys, lc2, istate->hash_inner_funcs) { @@ -1826,6 +1838,7 @@ get_tuple_hashvalue(pgstromTaskInnerState *istate, */ static void execInnerPreloadOneDepth(MemoryContext memcxt, + pgstromTaskState *pts, pgstromTaskInnerState *istate, pg_atomic_uint64 *p_shared_inner_nitems, pg_atomic_uint64 *p_shared_inner_usage) @@ -1840,6 +1853,7 @@ execInnerPreloadOneDepth(MemoryContext memcxt, memset(preload_buf, 0, offsetof(inner_preload_buffer, rows)); preload_buf->nrooms = 12000; + ExecStoreAllNullTuple(pts->css.ss.ss_ScanTupleSlot); for (;;) { TupleTableSlot *slot; @@ -1884,7 +1898,7 @@ execInnerPreloadOneDepth(MemoryContext memcxt, if (istate->hash_inner_keys != NIL) { - uint32_t hash = get_tuple_hashvalue(istate, slot); + uint32_t hash = get_tuple_hashvalue(pts, istate, slot); preload_buf->rows[index].htup = htup; preload_buf->rows[index].hash = hash; @@ -2250,7 +2264,7 @@ GpuJoinInnerPreload(pgstromTaskState *pts) { pgstromTaskInnerState *istate = &leader->inners[i]; - execInnerPreloadOneDepth(memcxt, istate, + execInnerPreloadOneDepth(memcxt, pts, istate, &ps_state->inners[i].inner_nitems, &ps_state->inners[i].inner_usage); } @@ -2436,27 +2450,28 @@ __execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth); static void __execFallbackLoadVarsSlot(TupleTableSlot *fallback_slot, - const kern_expression *kexp_vloads, + List *inner_load_src, + List *inner_load_dst, const kern_data_store *kds, const ItemPointer t_self, const HeapTupleHeaderData *htup) { - const kern_varload_desc *vl_desc = kexp_vloads->u.load.desc; - uint32_t offset = htup->t_hoff; - uint32_t kvcnt = 0; - uint32_t resno; + uint32_t h_off = htup->t_hoff; uint32_t ncols = Min(htup->t_infomask2 & HEAP_NATTS_MASK, kds->ncols); bool heap_hasnull = ((htup->t_infomask & HEAP_HASNULL) != 0); + ListCell *lc1, *lc2; - Assert(kexp_vloads->opcode == FuncOpCode__LoadVars); /* extract system attributes, if rquired */ - while (kvcnt < kexp_vloads->u.load.nitems && - vl_desc->vl_resno < 0) + forboth (lc1, inner_load_src, + lc2, inner_load_dst) { - int slot_id = vl_desc->vl_slot_id; + int src = lfirst_int(lc1); + int dst = lfirst_int(lc2); Datum datum; - switch (vl_desc->vl_resno) + if (src >= 0) + break; + switch (src) { case SelfItemPointerAttributeNumber: datum = PointerGetDatum(t_self); @@ -2475,45 +2490,38 @@ __execFallbackLoadVarsSlot(TupleTableSlot *fallback_slot, datum = ObjectIdGetDatum(kds->table_oid); break; default: - elog(ERROR, "invalid attnum: %d", vl_desc->vl_resno); + elog(ERROR, "invalid attnum: %d", src); } - fallback_slot->tts_isnull[slot_id] = false; - fallback_slot->tts_values[slot_id] = datum; - vl_desc++; - kvcnt++; + fallback_slot->tts_isnull[dst] = false; + fallback_slot->tts_values[dst] = datum; } /* extract the user data */ - resno = 1; - while (kvcnt < kexp_vloads->u.load.nitems && resno <= ncols) + for (int j=0; j < ncols && lc1 && lc2; j++) { - const kern_colmeta *cmeta = &kds->colmeta[resno-1]; - const char *addr; + const kern_colmeta *cmeta = &kds->colmeta[j]; + const char *addr; + Datum datum; - if (heap_hasnull && att_isnull(resno-1, htup->t_bits)) + if (heap_hasnull && att_isnull(j, htup->t_bits)) { addr = NULL; + datum = 0; } else { if (cmeta->attlen > 0) - offset = TYPEALIGN(cmeta->attalign, offset); - else if (!VARATT_NOT_PAD_BYTE((char *)htup + offset)) - offset = TYPEALIGN(cmeta->attalign, offset); - addr = ((char *)htup + offset); + h_off = TYPEALIGN(cmeta->attalign, h_off); + else if (!VARATT_NOT_PAD_BYTE((char *)htup + h_off)) + h_off = TYPEALIGN(cmeta->attalign, h_off); + addr = ((char *)htup + h_off); if (cmeta->attlen > 0) - offset += cmeta->attlen; + h_off += cmeta->attlen; + else if (cmeta->attlen == -1) + h_off += VARSIZE_ANY(addr); else - offset += VARSIZE_ANY(addr); - } - - if (vl_desc->vl_resno == resno) - { - int slot_id = vl_desc->vl_slot_id; - Datum datum; + elog(ERROR, "unknown typlen (%d)", cmeta->attlen); - if (!addr) - datum = 0; - else if (cmeta->attbyval) + if (cmeta->attbyval) { switch (cmeta->attlen) { @@ -2530,29 +2538,36 @@ __execFallbackLoadVarsSlot(TupleTableSlot *fallback_slot, datum = *((uint64_t *)addr); break; default: - elog(ERROR, "invalid attlen: %d", cmeta->attlen); + elog(ERROR, "invalid typlen (%d) of inline type", + cmeta->attlen); } } else { datum = PointerGetDatum(addr); } - fallback_slot->tts_isnull[slot_id] = !addr; - fallback_slot->tts_values[slot_id] = datum; - vl_desc++; - kvcnt++; } - resno++; + if (lfirst_int(lc1) == j+1) + { + int dst = lfirst_int(lc2) - 1; + + fallback_slot->tts_isnull[dst] = !addr; + fallback_slot->tts_values[dst] = datum; + + lc1 = lnext(inner_load_src, lc1); + lc2 = lnext(inner_load_dst, lc2); + } } /* fill-up by NULL for the remaining fields */ - while (kvcnt < kexp_vloads->u.load.nitems) + while (lc1 && lc2) { - int slot_id = vl_desc->vl_slot_id; + int dst = lfirst_int(lc2) - 1; + + fallback_slot->tts_isnull[dst] = true; + fallback_slot->tts_values[dst] = 0; - fallback_slot->tts_isnull[slot_id] = true; - fallback_slot->tts_values[slot_id] = 0; - vl_desc++; - kvcnt++; + lc1 = lnext(inner_load_src, lc1); + lc2 = lnext(inner_load_dst, lc2); } } @@ -2562,18 +2577,10 @@ __execFallbackCpuNestLoop(pgstromTaskState *pts, bool *oj_map, int depth) { pgstromTaskInnerState *istate = &pts->inners[depth-1]; - pgstromPlanInfo *pp_info = pts->pp_info; ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; - kern_expression *kexp_join_kvars_load = NULL; + TupleTableSlot *scan_slot = pts->css.ss.ss_ScanTupleSlot; - if (pp_info->kexp_load_vars_packed) - { - const kern_expression *temp = (const kern_expression *) - VARDATA(pp_info->kexp_load_vars_packed); - kexp_join_kvars_load = __PICKUP_PACKED_KEXP(temp, depth); - } Assert(kds_in->format == KDS_FORMAT_ROW); - for (uint32_t index=0; index < kds_in->nitems; index++) { kern_tupitem *tupitem = KDS_GET_TUPITEM(kds_in, index); @@ -2582,14 +2589,15 @@ __execFallbackCpuNestLoop(pgstromTaskState *pts, continue; ResetExprContext(econtext); /* load inner variable */ - if (kexp_join_kvars_load) + if (istate->inner_load_src != NIL && + istate->inner_load_dst != NIL) { ItemPointerData t_self; ItemPointerSetInvalid(&t_self); - Assert(kexp_join_kvars_load->u.load.depth == depth); - __execFallbackLoadVarsSlot(pts->fallback_slot, - kexp_join_kvars_load, + __execFallbackLoadVarsSlot(scan_slot, + istate->inner_load_src, + istate->inner_load_dst, kds_in, &t_self, &tupitem->htup); @@ -2617,19 +2625,12 @@ __execFallbackCpuHashJoin(pgstromTaskState *pts, bool *oj_map, int depth) { pgstromTaskInnerState *istate = &pts->inners[depth-1]; - pgstromPlanInfo *pp_info = pts->pp_info; ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; - kern_expression *kexp_join_kvars_load = NULL; + TupleTableSlot *scan_slot = pts->css.ss.ss_ScanTupleSlot; kern_hashitem *hitem; uint32_t hash; ListCell *lc1, *lc2; - if (pp_info->kexp_load_vars_packed) - { - const kern_expression *temp = (const kern_expression *) - VARDATA(pp_info->kexp_load_vars_packed); - kexp_join_kvars_load = __PICKUP_PACKED_KEXP(temp, depth); - } Assert(kds_in->format == KDS_FORMAT_HASH); /* @@ -2658,13 +2659,15 @@ __execFallbackCpuHashJoin(pgstromTaskState *pts, { if (hitem->hash != hash) continue; - if (kexp_join_kvars_load) + if (istate->inner_load_src != NIL && + istate->inner_load_dst != NIL) { ItemPointerData t_self; ItemPointerSetInvalid(&t_self); - __execFallbackLoadVarsSlot(pts->fallback_slot, - kexp_join_kvars_load, + __execFallbackLoadVarsSlot(scan_slot, + istate->inner_load_src, + istate->inner_load_dst, kds_in, &t_self, &hitem->t.htup); @@ -2690,9 +2693,9 @@ static void __execFallbackCpuProjection(pgstromTaskState *pts) { ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; - TupleTableSlot *fallback_slot = pts->fallback_slot; + TupleTableSlot *scan_slot = pts->css.ss.ss_ScanTupleSlot; ListCell *lc; - int j=0; + int dst=0; foreach (lc, pts->fallback_proj) { @@ -2706,16 +2709,16 @@ __execFallbackCpuProjection(pgstromTaskState *pts) if (isnull) { - fallback_slot->tts_isnull[j] = true; - fallback_slot->tts_values[j] = 0; + scan_slot->tts_isnull[dst] = true; + scan_slot->tts_values[dst] = 0; } else { - fallback_slot->tts_isnull[j] = false; - fallback_slot->tts_values[j] = datum; + scan_slot->tts_isnull[dst] = false; + scan_slot->tts_values[dst] = datum; } } - j++; + dst++; } } @@ -2725,17 +2728,16 @@ __execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth) if (depth > pts->num_rels) { /* apply projection if any */ - HeapTuple tuple; - bool should_free; + TupleTableSlot *scan_slot = pts->css.ss.ss_ScanTupleSlot; + HeapTuple tuple; if (pts->fallback_proj) __execFallbackCpuProjection(pts); - tuple = ExecFetchSlotHeapTuple(pts->fallback_slot, - false, - &should_free); + tuple = heap_form_tuple(scan_slot->tts_tupleDescriptor, + scan_slot->tts_values, + scan_slot->tts_isnull); pgstromStoreFallbackTuple(pts, tuple); - if (should_free) - pfree(tuple); + pfree(tuple); } else { @@ -2759,33 +2761,26 @@ __execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth) bool ExecFallbackCpuJoin(pgstromTaskState *pts, HeapTuple tuple) { - pgstromPlanInfo *pp_info = pts->pp_info; ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; TupleTableSlot *base_slot = pts->base_slot; - TupleTableSlot *fallback_slot = pts->fallback_slot; + TupleTableSlot *scan_slot = pts->css.ss.ss_ScanTupleSlot; size_t fallback_index_saved = pts->fallback_index; - ListCell *lc; + ListCell *lc1, *lc2; /* Load the base tuple (depth-0) to the fallback slot */ ExecForceStoreHeapTuple(tuple, base_slot, false); slot_getallattrs(base_slot); - ExecStoreAllNullTuple(fallback_slot); - foreach (lc, pp_info->kvars_deflist) + ExecStoreAllNullTuple(scan_slot); + forboth (lc1, pts->fallback_load_src, + lc2, pts->fallback_load_dst) { - codegen_kvar_defitem *kvdef = lfirst(lc); - - if (kvdef->kv_depth == 0 && - kvdef->kv_resno >= 1 && - kvdef->kv_resno <= base_slot->tts_nvalid) - { - int src = kvdef->kv_resno - 1; - int dst = kvdef->kv_slot_id; + int src = lfirst_int(lc1) - 1; + int dst = lfirst_int(lc2) - 1; - fallback_slot->tts_isnull[dst] = base_slot->tts_isnull[src]; - fallback_slot->tts_values[dst] = base_slot->tts_values[src]; - } + scan_slot->tts_isnull[dst] = base_slot->tts_isnull[src]; + scan_slot->tts_values[dst] = base_slot->tts_values[src]; } - econtext->ecxt_scantuple = fallback_slot; + econtext->ecxt_scantuple = scan_slot; /* check WHERE-clause if any */ if (pts->base_quals) @@ -2803,20 +2798,12 @@ static void __execFallbackCpuJoinRightOuterOneDepth(pgstromTaskState *pts, int depth) { pgstromTaskInnerState *istate = &pts->inners[depth-1]; - pgstromPlanInfo *pp_info = pts->pp_info; ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; - TupleTableSlot *fallback_slot = pts->fallback_slot; - kern_expression *kexp_join_kvars_load = NULL; + TupleTableSlot *fallback_slot = pts->css.ss.ss_ScanTupleSlot; kern_multirels *h_kmrels = pts->h_kmrels; kern_data_store *kds_in = KERN_MULTIRELS_INNER_KDS(h_kmrels, depth-1); bool *oj_map = KERN_MULTIRELS_OUTER_JOIN_MAP(h_kmrels, depth-1); - if (pp_info->kexp_load_vars_packed) - { - const kern_expression *temp = (const kern_expression *) - VARDATA(pp_info->kexp_load_vars_packed); - kexp_join_kvars_load = __PICKUP_PACKED_KEXP(temp, depth); - } Assert(oj_map != NULL); ExecStoreAllNullTuple(fallback_slot); @@ -2825,8 +2812,8 @@ __execFallbackCpuJoinRightOuterOneDepth(pgstromTaskState *pts, int depth) { if (oj_map[i]) continue; - - if (kexp_join_kvars_load) + if (istate->inner_load_src != NIL && + istate->inner_load_dst != NIL) { kern_tupitem *titem = KDS_GET_TUPITEM(kds_in, i); ItemPointerData t_self; @@ -2835,7 +2822,8 @@ __execFallbackCpuJoinRightOuterOneDepth(pgstromTaskState *pts, int depth) continue; ItemPointerSetInvalid(&t_self); __execFallbackLoadVarsSlot(fallback_slot, - kexp_join_kvars_load, + istate->inner_load_src, + istate->inner_load_dst, kds_in, &t_self, &titem->htup); diff --git a/src/gpu_scan.c b/src/gpu_scan.c index f70344dc..f50755ca 100644 --- a/src/gpu_scan.c +++ b/src/gpu_scan.c @@ -773,37 +773,6 @@ __build_explain_tlist_junks(PlannerInfo *root, } } -/* - * __assign_cpu_fallback_slots - */ -void -__assign_cpu_fallback_slots(List *kvars_deflist, List *custom_scan_tlist) -{ - ListCell *lc1, *lc2; - - foreach (lc1, kvars_deflist) - { - codegen_kvar_defitem *kvdef = lfirst(lc1); - int kv_fallback = -1; - - if (kvdef->kv_depth >= 0 && - kvdef->kv_resno > 0) - { - foreach (lc2, custom_scan_tlist) - { - TargetEntry *tle = lfirst(lc2); - - if (equal(tle->expr, kvdef->kv_expr)) - { - kv_fallback = tle->resno - 1; - break; - } - } - } - kvdef->kv_fallback = kv_fallback; - } -} - /* * assign_custom_cscan_tlist */ @@ -983,11 +952,10 @@ CreateDpuScanState(CustomScan *cscan) bool ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple) { - pgstromPlanInfo *pp_info = pts->pp_info; ExprContext *econtext = pts->css.ss.ps.ps_ExprContext; TupleTableSlot *base_slot = pts->base_slot; TupleTableSlot *fallback_slot = pts->css.ss.ss_ScanTupleSlot; - ListCell *lc; + ListCell *lc1, *lc2; int attidx = 0; bool should_free; @@ -995,21 +963,14 @@ ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple) ExecForceStoreHeapTuple(tuple, base_slot, false); slot_getallattrs(base_slot); ExecStoreAllNullTuple(fallback_slot); - foreach (lc, pp_info->kvars_deflist) + forboth (lc1, pts->fallback_load_src, + lc2, pts->fallback_load_dst) { - codegen_kvar_defitem *kvdef = lfirst(lc); + int src = lfirst_int(lc1) - 1; + int dst = lfirst_int(lc2) - 1; - if (kvdef->kv_depth == 0 && - kvdef->kv_resno >= 1 && - kvdef->kv_resno <= base_slot->tts_nvalid && - kvdef->kv_fallback >= 0) - { - int src = kvdef->kv_resno - 1; - int dst = kvdef->kv_fallback; - - fallback_slot->tts_isnull[dst] = base_slot->tts_isnull[src]; - fallback_slot->tts_values[dst] = base_slot->tts_values[src]; - } + fallback_slot->tts_isnull[dst] = base_slot->tts_isnull[src]; + fallback_slot->tts_values[dst] = base_slot->tts_values[src]; } econtext->ecxt_scantuple = fallback_slot; @@ -1021,9 +982,9 @@ ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple) return false; } /* apply GPU-Projection */ - foreach (lc, pts->fallback_proj) + foreach (lc1, pts->fallback_proj) { - ExprState *state = lfirst(lc); + ExprState *state = lfirst(lc1); Datum datum; bool isnull; diff --git a/src/misc.c b/src/misc.c index 4a96f711..eb4abe93 100644 --- a/src/misc.c +++ b/src/misc.c @@ -190,7 +190,7 @@ form_pgstrom_plan_info(CustomScan *cscan, pgstromPlanInfo *pp_info) __privs = lappend(__privs, makeInteger(pp_inner->gist_ctid_resno)); __privs = lappend(__privs, makeInteger(pp_inner->gist_func_oid)); __privs = lappend(__privs, makeInteger(pp_inner->gist_slot_id)); - __privs = lappend(__privs, pp_inner->gist_clause); + __exprs = lappend(__exprs, pp_inner->gist_clause); __privs = lappend(__privs, __makeFloat(pp_inner->gist_selectivity)); __privs = lappend(__privs, __makeFloat(pp_inner->gist_npages)); __privs = lappend(__privs, makeInteger(pp_inner->gist_height)); @@ -304,7 +304,7 @@ deform_pgstrom_plan_info(CustomScan *cscan) pp_inner->gist_ctid_resno = intVal(list_nth(__privs, __pindex++)); pp_inner->gist_func_oid = intVal(list_nth(__privs, __pindex++)); pp_inner->gist_slot_id = intVal(list_nth(__privs, __pindex++)); - pp_inner->gist_clause = list_nth(__privs, __pindex++); + pp_inner->gist_clause = list_nth(__exprs, __eindex++); pp_inner->gist_selectivity = floatVal(list_nth(__privs, __pindex++)); pp_inner->gist_npages = floatVal(list_nth(__privs, __pindex++)); pp_inner->gist_height = intVal(list_nth(__privs, __pindex++)); diff --git a/src/pg_strom.h b/src/pg_strom.h index 991c682b..8689c79a 100644 --- a/src/pg_strom.h +++ b/src/pg_strom.h @@ -424,6 +424,11 @@ typedef struct Relation gist_irel; ExprState *gist_clause; AttrNumber gist_ctid_resno; + /* + * CPU fallback (inner-loading) + */ + List *inner_load_src; /* resno of inner tuple */ + List *inner_load_dst; /* resno of fallback slot */ } pgstromTaskInnerState; struct pgstromTaskState @@ -468,7 +473,9 @@ struct pgstromTaskState char *fallback_buffer; TupleTableSlot *fallback_slot; /* host-side kvars-slot */ List *fallback_proj; -// ProjectionInfo *fallback_proj; /* base or fallback slot -> custom_tlist */ + + List *fallback_load_src; /* source resno of base-rel */ + List *fallback_load_dst; /* dest resno of fallback-slot */ /* request command buffer (+ status for table scan) */ TBMIterateResult *curr_tbm; Buffer curr_vm_buffer; /* for visibility-map */ diff --git a/test/parallel_schedule b/test/parallel_schedule index c9a85f16..08122ee9 100644 --- a/test/parallel_schedule +++ b/test/parallel_schedule @@ -13,12 +13,12 @@ test: pgstrom_guc # ---------- # Test for each data types # ---------- -#test: dtype_int dtype_float dtype_numeric dtype_time dtype_text dtype_jsonb additional_dtype +test: dtype_int dtype_float dtype_numeric dtype_time dtype_text dtype_jsonb additional_dtype # ---------- # Test for various functions / expressions # ---------- -#test: dfunc_math dfunc_mbtext dexpr_scalar_array_op dexpr_misc +test: dfunc_math dfunc_mbtext dexpr_scalar_array_op dexpr_misc # ---------- # Test for arrow_fdw