Skip to content

Commit

Permalink
Also, CPU-fallback of GpuJoin is revised, fallback code entirely use …
Browse files Browse the repository at this point in the history
…scan_slot

issue reported at heterodb#747
  • Loading branch information
kaigai committed Apr 13, 2024
1 parent 330bbef commit 58b1866
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 212 deletions.
121 changes: 84 additions & 37 deletions src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1270,36 +1270,8 @@ __setupTaskStateRequestBuffer(pgstromTaskState *pts,
pts->xcmd_buf.len = off;
}



/*
* __execInitTaskStateCpuFallback
*
* CPU fallback process moves the tuple as follows
*
* <Base Relation> (can be both of heap and arrow)
* |
* v
* [Fallback Slot]
* |
* v
* (Base Quals) ... WHERE-clause
* |
* v
* (xPU-Join) ... Join for each depth
* |
* v
* [Fallback Slot]
* |
* v
* (Fallback Projection) ... pts->fallback_proj
* |
* v
* [ CustomScan Slot ] ... Equivalent to GpuProj/GpuPreAgg results
*/

/*
* __execInitFallbackProjection
* __fixup_fallback_projection
*/
static Node *
__fixup_fallback_projection(Node *node, void *__data)
Expand Down Expand Up @@ -1334,6 +1306,32 @@ __fixup_fallback_projection(Node *node, void *__data)
return expression_tree_mutator(node, __fixup_fallback_projection, __data);
}

/*
* fallback_varload_mapping
*/
typedef struct {
int32_t src_depth;
int32_t src_resno;
int32_t dst_resno;
} fallback_varload_mapping;

static int
__compare_fallback_varload_mapping(const void *__a, const void *__b)
{
const fallback_varload_mapping *a = __a;
const fallback_varload_mapping *b = __b;

if (a->src_depth < b->src_depth)
return -1;
if (a->src_depth > b->src_depth)
return 1;
if (a->src_resno < b->src_resno)
return -1;
if (a->src_resno > b->src_resno)
return 1;
return 0;
}

/*
* __execInitTaskStateCpuFallback
*/
Expand All @@ -1345,7 +1343,13 @@ __execInitTaskStateCpuFallback(pgstromTaskState *pts)
Relation rel = pts->css.ss.ss_currentRelation;
List *fallback_proj = NIL;
ListCell *lc;
int nrooms = list_length(cscan->custom_scan_tlist);
int nitems = 0;
int last_depth = -1;
List *src_list = NIL;
List *dst_list = NIL;
bool compatible = true;
fallback_varload_mapping *vl_map;

/*
* WHERE-clause
Expand All @@ -1356,22 +1360,65 @@ __execInitTaskStateCpuFallback(pgstromTaskState *pts)
/*
* CPU-Projection
*/
vl_map = alloca(sizeof(fallback_varload_mapping) * nrooms);
foreach (lc, cscan->custom_scan_tlist)
{
TargetEntry *tle = lfirst(lc);
ExprState *state = NULL;
ExprState *state = NULL;
Node *expr;

if (!tle->resjunk && tle->resorigtbl == (Oid)UINT_MAX)
if (tle->resorigtbl >= 0 &&
tle->resorigtbl <= pts->num_rels)
{
vl_map[nitems].src_depth = tle->resorigtbl;
vl_map[nitems].src_resno = tle->resorigcol;
vl_map[nitems].dst_resno = tle->resno;
nitems++;
}
else if (!tle->resjunk)
{
Node *expr = __fixup_fallback_projection((Node *)tle->expr,
cscan->custom_scan_tlist);
Assert(tle->resorigtbl == (Oid)UINT_MAX);
expr = __fixup_fallback_projection((Node *)tle->expr,
cscan->custom_scan_tlist);
state = ExecInitExpr((Expr *)expr, &pts->css.ss.ps);
compatible = false;
}
fallback_proj = lappend(fallback_proj, state);
}
if (!compatible)
pts->fallback_proj = fallback_proj;

/* fallback var-loads */
qsort(vl_map, nitems,
sizeof(fallback_varload_mapping),
__compare_fallback_varload_mapping);

for (int i=0; i <= nitems; i++)
{
if (i == nitems ||
vl_map[i].src_depth != last_depth)
{
if (last_depth == 0)
{
pts->fallback_load_src = src_list;
pts->fallback_load_dst = dst_list;
}
else if (last_depth > 0 &&
last_depth <= pts->num_rels)
{
pts->inners[last_depth-1].inner_load_src = src_list;
pts->inners[last_depth-1].inner_load_dst = dst_list;
}
src_list = NIL;
dst_list = NIL;
if (i == nitems)
break;
}
last_depth = vl_map[i].src_depth;
src_list = lappend_int(src_list, vl_map[i].src_resno);
dst_list = lappend_int(dst_list, vl_map[i].dst_resno);
}
Assert(src_list == NIL && dst_list == NIL);
}

/*
Expand Down Expand Up @@ -1498,15 +1545,15 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags)
istate->econtext = CreateExprContext(estate);
istate->depth = depth_index + 1;
istate->join_type = pp_inner->join_type;
istate->join_quals = ExecInitQual(pp_inner->join_quals_fallback,
istate->join_quals = ExecInitQual(pp_inner->join_quals_original,
&pts->css.ss.ps);
istate->other_quals = ExecInitQual(pp_inner->other_quals_fallback,
istate->other_quals = ExecInitQual(pp_inner->other_quals_original,
&pts->css.ss.ps);
if (pp_inner->join_type == JOIN_FULL ||
pp_inner->join_type == JOIN_RIGHT)
has_right_outer = true;

foreach (cell, pp_inner->hash_outer_keys_fallback)
foreach (cell, pp_inner->hash_outer_keys_original)
{
Node *outer_key = (Node *)lfirst(cell);
ExprState *es;
Expand All @@ -1522,7 +1569,7 @@ pgstromExecInitTaskState(CustomScanState *node, EState *estate, int eflags)
dtype->type_hashfunc);
}
/* inner hash-keys references the result of inner-slot */
foreach (cell, pp_inner->hash_inner_keys_fallback)
foreach (cell, pp_inner->hash_inner_keys_original)
{
Node *inner_key = (Node *)lfirst(cell);
ExprState *es;
Expand Down
Loading

0 comments on commit 58b1866

Please sign in to comment.