Skip to content

Commit

Permalink
add inner-siblings-id for reuse of inner-buffer (not implemented yet)
Browse files Browse the repository at this point in the history
issue related to heterodb#605
  • Loading branch information
kaigai committed Feb 25, 2024
1 parent f7346c2 commit c312bba
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 15 deletions.
3 changes: 3 additions & 0 deletions src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2430,6 +2430,9 @@ pgstromExplainTaskState(CustomScanState *node,
ExplainPropertyText(label, buf.data, es);
}
}
if (pp_info->sibling_param_id >= 0)
ExplainPropertyInteger("Inner Siblings-Id", NULL,
pp_info->sibling_param_id, es);

/*
* Storage related info
Expand Down
59 changes: 52 additions & 7 deletions src/gpu_join.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ __buildXpuJoinPlanInfo(PlannerInfo *root,
JoinType join_type,
List *restrict_clauses,
pgstromOuterPathLeafInfo *op_prev,
Path **p_inner_path)
Path **p_inner_path,
int sibling_param_id)
{
pgstromPlanInfo *pp_prev = op_prev->pp_info;
pgstromPlanInfo *pp_info;
Expand All @@ -72,6 +73,7 @@ __buildXpuJoinPlanInfo(PlannerInfo *root,
double xpu_tuple_cost;
Cost xpu_ratio;
Cost comp_cost = 0.0;
Cost inner_cost = 0.0;
Cost final_cost = 0.0;
QualCost join_quals_cost;
List *join_quals = NIL;
Expand Down Expand Up @@ -228,6 +230,7 @@ __buildXpuJoinPlanInfo(PlannerInfo *root,
* Setup pgstromPlanInfo
*/
pp_info = copy_pgstrom_plan_info(pp_prev);
pp_info->sibling_param_id = sibling_param_id;
pp_inner = &pp_info->inners[pp_info->num_rels++];
pp_inner->join_type = join_type;
pp_inner->join_nrows = joinrel->rows;
Expand All @@ -251,6 +254,7 @@ __buildXpuJoinPlanInfo(PlannerInfo *root,
if (gist_inner_path)
*p_inner_path = inner_path = gist_inner_path;
}

/*
* Cost estimation
*/
Expand All @@ -268,8 +272,9 @@ __buildXpuJoinPlanInfo(PlannerInfo *root,
startup_cost = __pp_inner->join_startup_cost;
run_cost = __pp_inner->join_run_cost;
}
startup_cost += (inner_path->total_cost +
inner_path->rows * cpu_tuple_cost);
inner_cost += (inner_path->total_cost +
inner_path->rows * cpu_tuple_cost);
startup_cost += inner_cost;
/* cost for join_quals */
cost_qual_eval(&join_quals_cost, join_quals, root);
startup_cost += join_quals_cost.startup;
Expand Down Expand Up @@ -337,6 +342,7 @@ __buildXpuJoinPlanInfo(PlannerInfo *root,
final_cost += (joinrel->reltarget->cost.per_tuple *
joinrel->rows / pp_info->parallel_divisor);

pp_info->join_inner_cost += inner_cost;
pp_info->final_cost = final_cost;
pp_inner->join_nrows = (joinrel->rows / pp_info->parallel_divisor);
pp_inner->join_startup_cost = startup_cost;
Expand All @@ -360,6 +366,7 @@ __build_simple_xpujoin_path(PlannerInfo *root,
SpecialJoinInfo *sjinfo,
Relids param_source_rels,
bool try_parallel_path,
int sibling_param_id,
uint32_t xpu_task_flags,
const CustomPathMethods *xpujoin_path_methods)
{
Expand Down Expand Up @@ -396,7 +403,8 @@ __build_simple_xpujoin_path(PlannerInfo *root,
join_type,
restrict_clauses,
op_prev,
&inner_path);
&inner_path,
sibling_param_id);
if (!pp_info)
return NULL;
pp_info->xpu_task_flags &= ~DEVTASK__MASK;
Expand Down Expand Up @@ -479,6 +487,7 @@ try_add_xpujoin_simple_path(PlannerInfo *root,
extra->sjinfo,
extra->param_source_rels,
try_parallel_path,
-1, /* sibling_param_id */
xpu_task_flags,
xpujoin_path_methods);
if (!cpath)
Expand Down Expand Up @@ -623,14 +632,26 @@ try_add_xpujoin_partition_path(PlannerInfo *root,
List *op_prev_list = NIL;
List *op_leaf_list = NIL;
List *cpaths_list = NIL;
PathTarget *append_target = NULL;
Path *append_path;
Relids required_outer = NULL;
int parallel_nworkers = 0;
double total_nrows = 0.0;
bool identical_inners;
int sibling_param_id = -1;
ListCell *lc;

op_prev_list = pgstrom_find_op_leafs(outer_rel, try_parallel_path);
op_prev_list = pgstrom_find_op_leafs(outer_rel,
try_parallel_path,
&identical_inners);
if (identical_inners)
{
PlannerGlobal *glob = root->glob;

sibling_param_id = list_length(glob->paramExecTypes);
glob->paramExecTypes = lappend_oid(glob->paramExecTypes,
INTERNALOID);
}

foreach (lc, op_prev_list)
{
pgstromOuterPathLeafInfo *op_prev = lfirst(lc);
Expand Down Expand Up @@ -695,6 +716,7 @@ try_add_xpujoin_partition_path(PlannerInfo *root,
__sjinfo,
extra->param_source_rels,
try_parallel_path,
sibling_param_id,
xpu_task_flags,
xpujoin_path_methods);
if (!cpath)
Expand Down Expand Up @@ -726,7 +748,15 @@ try_add_xpujoin_partition_path(PlannerInfo *root,
(try_parallel_path ? parallel_nworkers : 0),
try_parallel_path,
total_nrows);
//append_path->target = append_target; //FIXME
if (sibling_param_id >= 0 && list_length(cpaths_list) > 1)
{
CustomPath *cpath = linitial(cpaths_list);
pgstromPlanInfo *pp_info = linitial(cpath->custom_private);
Cost discount = (pp_info->join_inner_cost *
(Cost)(list_length(cpaths_list) - 1));
append_path->startup_cost -= discount;
append_path->total_cost -= discount;
}
pgstrom_remember_op_leafs(join_rel, op_leaf_list,
try_parallel_path);
if (!try_parallel_path)
Expand Down Expand Up @@ -778,6 +808,21 @@ __xpuJoinAddCustomPathCommon(PlannerInfo *root,
if (!inner_path || inner_path->total_cost > path->total_cost)
inner_path = path;
}
if (!inner_path && IS_SIMPLE_REL(innerrel))
{
/*
* In case when inner relation is very small, PostgreSQL may
* skip to generate partial scan paths because it may calculate
* the number of parallel workers zero due to small size.
* Only if the innerrel is base relation, we add a partial
* SeqScan path to use parallel inner path.
*/
inner_path = (Path *)
create_seqscan_path(root,
innerrel,
innerrel->lateral_relids,
try_parallel);
}

if (inner_path)
{
Expand Down
36 changes: 33 additions & 3 deletions src/gpu_preagg.c
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,7 @@ typedef struct
PathTarget *target_final;
AggClauseCosts final_clause_costs;
pgstromPlanInfo *pp_info;
int sibling_param_id;
List *inner_paths_list;
List *inner_target_list;
List *groupby_keys;
Expand Down Expand Up @@ -1378,7 +1379,7 @@ __buildXpuPreAggCustomPath(xpugroupby_build_path_context *con)
Query *parse = con->root->parse;
CustomPath *cpath = makeNode(CustomPath);
PathTarget *target_partial = con->target_partial;
pgstromPlanInfo *pp_info = con->pp_info;
pgstromPlanInfo *pp_info = copy_pgstrom_plan_info(con->pp_info);
double input_nrows = PP_INFO_NUM_ROWS(pp_info);
double num_group_keys;
double xpu_ratio;
Expand Down Expand Up @@ -1411,6 +1412,7 @@ __buildXpuPreAggCustomPath(xpugroupby_build_path_context *con)
}
pp_info->xpu_task_flags &= ~DEVTASK__MASK;
pp_info->xpu_task_flags |= DEVTASK__PREAGG;
pp_info->sibling_param_id = con->sibling_param_id;

/* No tuples shall be generated until child JOIN/SCAN path completion */
startup_cost = (PP_INFO_STARTUP_COST(pp_info) +
Expand Down Expand Up @@ -1489,6 +1491,7 @@ __try_add_xpupreagg_normal_path(PlannerInfo *root,
con.target_partial = create_empty_pathtarget();
con.target_final = create_empty_pathtarget();
con.pp_info = op_leaf->pp_info;
con.sibling_param_id = -1;
con.inner_paths_list = op_leaf->inner_paths_list;
con.inner_target_list = inner_target_list;
/* construction of the target-list for each level */
Expand Down Expand Up @@ -1518,6 +1521,7 @@ __try_add_xpupreagg_partition_path(PlannerInfo *root,
GroupPathExtraData *gp_extra,
uint32_t xpu_task_flags,
bool try_parallel_path,
int sibling_param_id,
List *op_leaf_list)
{
xpugroupby_build_path_context con;
Expand Down Expand Up @@ -1566,6 +1570,7 @@ __try_add_xpupreagg_partition_path(PlannerInfo *root,
con.target_partial = create_empty_pathtarget();
con.target_final = create_empty_pathtarget();
con.pp_info = op_leaf->pp_info;
con.sibling_param_id = sibling_param_id;
con.inner_paths_list = op_leaf->inner_paths_list;
con.inner_target_list = inner_target_list;
/* construction of the target-list for each level */
Expand Down Expand Up @@ -1600,6 +1605,17 @@ __try_add_xpupreagg_partition_path(PlannerInfo *root,
try_parallel_path,
total_nrows);
part_path->pathtarget = part_target;

if (sibling_param_id >= 0 &&
list_length(preagg_cpath_list) > 1)
{
CustomPath *__cpath = linitial(preagg_cpath_list);
pgstromPlanInfo *__pp_info = linitial(__cpath->custom_private);
Cost discount = (__pp_info->join_inner_cost *
(Cost)(list_length(preagg_cpath_list) - 1));
part_path->startup_cost -= discount;
part_path->total_cost -= discount;
}
}
else
{
Expand Down Expand Up @@ -1663,15 +1679,29 @@ __xpuPreAggAddCustomPathCommon(PlannerInfo *root,
*/
if (consider_partition)
{
List *op_leaf_list = pgstrom_find_op_leafs(input_rel,
(try_parallel > 0));
List *op_leaf_list;
bool identical_inners;
int sibling_param_id = -1;

op_leaf_list = pgstrom_find_op_leafs(input_rel,
(try_parallel > 0),
&identical_inners);
if (identical_inners)
{
PlannerGlobal *glob = root->glob;

sibling_param_id = list_length(glob->paramExecTypes);
glob->paramExecTypes = lappend_oid(glob->paramExecTypes,
INTERNALOID);
}
if (op_leaf_list != NIL)
__try_add_xpupreagg_partition_path(root,
input_rel,
group_rel,
extra,
xpu_task_flags,
(try_parallel > 0),
sibling_param_id,
op_leaf_list);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/gpu_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ __buildSimpleScanPlanInfo(PlannerInfo *root,
pull_varattnos((Node *)pp_info->host_quals, baserel->relid, &outer_refs);
pull_varattnos((Node *)pp_info->scan_quals, baserel->relid, &outer_refs);
pp_info->outer_refs = outer_refs;
pp_info->sibling_param_id = -1;
return pp_info;
}

Expand Down
45 changes: 41 additions & 4 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ typedef struct
List *op_leaf_parallel;
Cost total_cost_single;
Cost total_cost_parallel;
bool identical_inners_single;
bool identical_inners_parallel;
} pgstromPathEntry;

static void
Expand Down Expand Up @@ -411,20 +413,46 @@ pgstrom_remember_op_leafs(RelOptInfo *parent_rel,
bool be_parallel)
{
pgstromPathEntry *pp_entry;
ListCell *lc;
ListCell *cell;
List *inner_paths_list = NIL;
int identical_inners = -1;
Cost total_cost = 0.0;
bool found;

__pgstrom_build_paths_htable();
/* calculation of total cost */
foreach (lc, op_leaf_list)
foreach (cell, op_leaf_list)
{
pgstromOuterPathLeafInfo *op_leaf = lfirst(lc);
pgstromOuterPathLeafInfo *op_leaf = lfirst(cell);

/* sanity checks */
Assert(list_length(op_leaf->inner_paths_list) == op_leaf->pp_info->num_rels);
op_leaf->outer_rel = parent_rel;
total_cost += op_leaf->leaf_cost;

if (cell == list_head(op_leaf_list))
{
inner_paths_list = op_leaf->inner_paths_list;
}
else if (identical_inners != 0)
{
ListCell *lc1, *lc2;

forboth (lc1, inner_paths_list,
lc2, op_leaf->inner_paths_list)
{
Path *__i_path1 = lfirst(lc1);
Path *__i_path2 = lfirst(lc2);

if (!bms_equal(__i_path1->parent->relids,
__i_path2->parent->relids))
break;
}
if (lc1 == NULL && lc2 == NULL)
identical_inners = 1;
else
identical_inners = 0;
}
}
pp_entry = (pgstromPathEntry *)
hash_search(pgstrom_paths_htable,
Expand All @@ -444,6 +472,7 @@ pgstrom_remember_op_leafs(RelOptInfo *parent_rel,
{
pp_entry->op_leaf_parallel = op_leaf_list;
pp_entry->total_cost_parallel = total_cost;
pp_entry->identical_inners_parallel = (identical_inners > 0);
}
}
else
Expand All @@ -453,6 +482,7 @@ pgstrom_remember_op_leafs(RelOptInfo *parent_rel,
{
pp_entry->op_leaf_single = op_leaf_list;
pp_entry->total_cost_single = total_cost;
pp_entry->identical_inners_single = (identical_inners > 0);
}
}
}
Expand All @@ -477,7 +507,8 @@ pgstrom_find_op_normal(RelOptInfo *outer_rel,
}

List *
pgstrom_find_op_leafs(RelOptInfo *parent_rel, bool be_parallel)
pgstrom_find_op_leafs(RelOptInfo *parent_rel, bool be_parallel,
bool *p_identical_inners)
{
if (pgstrom_paths_htable)
{
Expand All @@ -487,9 +518,15 @@ pgstrom_find_op_leafs(RelOptInfo *parent_rel, bool be_parallel)
HASH_FIND,
NULL);
if (pp_entry)
{
if (p_identical_inners)
*p_identical_inners = (be_parallel
? pp_entry->identical_inners_parallel
: pp_entry->identical_inners_single);
return (be_parallel
? pp_entry->op_leaf_parallel
: pp_entry->op_leaf_single);
}
}
return NIL;
}
Expand Down
Loading

0 comments on commit c312bba

Please sign in to comment.