Skip to content

Commit

Permalink
add statistics for EXPLAIN ANALYZE
Browse files Browse the repository at this point in the history
feature for the issue heterodb#602
  • Loading branch information
kaigai committed Jun 20, 2023
1 parent 245061c commit fccb4b9
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 20 deletions.
29 changes: 26 additions & 3 deletions src/cuda_gpujoin.cu
Original file line number Diff line number Diff line change
Expand Up @@ -859,14 +859,37 @@ kern_gpujoin_main(kern_session_info *session,
break;
}
__syncthreads();
/* suspend the execution context */

if (LaneId() == 0)
{
/* update the statistics */
if (depth < 0 && WARP_READ_POS(wp,n_rels) >= WARP_WRITE_POS(wp,n_rels))
{
/* number of raw-tuples fetched from the heap block */
atomicAdd(&kgtask->nitems_raw, wp->lp_wr_pos);
atomicAdd(&kgtask->nitems_in, WARP_WRITE_POS(wp, 0));
for (int i=0; i < n_rels; i++)
{
const kern_expression *kexp_gist
= SESSION_KEXP_GIST_EVALS(session, i);
if (kexp_gist)
{
int gist_depth = kexp_gist->u.gist.gist_depth;

assert(gist_depth > n_rels &&
gist_depth < kgtask->kvars_ndims);
atomicAdd(&kgtask->stats[i].nitems_gist,
WARP_WRITE_POS(wp, gist_depth));
}
atomicAdd(&kgtask->stats[i].nitems_out,
WARP_WRITE_POS(wp,i+1));
}
atomicAdd(&kgtask->nitems_out, WARP_WRITE_POS(wp, n_rels));
}
/* suspend the execution context */
wp->depth = depth;
wp->smx_row_count = smx_row_count;
memcpy(wp_saved, wp, wp_base_sz);

//printf("wp gid=%u {scan_done=%d depth=%d of %d pos[%u %u] [%u %u] [%u %u]}\n", get_global_id(), wp->scan_done, depth, n_rels, wp->pos[0].read, wp->pos[0].write, wp->pos[1].read, wp->pos[1].write, wp->pos[2].read, wp->pos[2].write);
}
STROM_WRITEBACK_ERROR_STATUS(&kgtask->kerror, kcxt);
}
2 changes: 2 additions & 0 deletions src/cuda_gpuscan.cu
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ execGpuScanLoadSource(kern_context *kcxt,
return -1;
}

#if 0
/*
* kern_gpuscan_main
*/
Expand Down Expand Up @@ -499,3 +500,4 @@ kern_gpuscan_main(kern_session_info *session,
}
STROM_WRITEBACK_ERROR_STATUS(&kgtask->kerror, kcxt);
}
#endif
85 changes: 72 additions & 13 deletions src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,30 @@ pgstromTaskStateResetScan(pgstromTaskState *pts)
memset(pts->rjoin_devs_count, 0, sizeof(pg_atomic_uint32) * num_devs);
}

/*
* __updateStatsXpuCommand
*/
static void
__updateStatsXpuCommand(pgstromTaskState *pts, const XpuCommand *xcmd)
{
if (xcmd->tag == XpuCommandTag__Success)
{
pgstromSharedState *ps_state = pts->ps_state;
int n_rels = Min(pts->num_rels, xcmd->u.results.num_rels);

pg_atomic_fetch_add_u64(&ps_state->source_ntuples, xcmd->u.results.nitems_raw);
pg_atomic_fetch_add_u64(&ps_state->source_nvalids, xcmd->u.results.nitems_in);
for (int i=0; i < n_rels; i++)
{
pg_atomic_fetch_add_u64(&ps_state->inners[i].stats_gist,
xcmd->u.results.stats[i].nitems_gist);
pg_atomic_fetch_add_u64(&ps_state->inners[i].stats_join,
xcmd->u.results.stats[i].nitems_out);
}
pg_atomic_fetch_add_u64(&ps_state->result_ntuples, xcmd->u.results.nitems_out);
}
}

/*
* __pickupNextXpuCommand
*
Expand Down Expand Up @@ -788,7 +812,7 @@ __waitAndFetchNextXpuCommand(pgstromTaskState *pts, bool try_final_callback)
}
xcmd = __pickupNextXpuCommand(conn);
pthreadMutexUnlock(&conn->mutex);

__updateStatsXpuCommand(pts, xcmd);
return xcmd;
}

Expand Down Expand Up @@ -845,7 +869,7 @@ __fetchNextXpuCommand(pgstromTaskState *pts)
{
xcmd = __pickupNextXpuCommand(conn);
pthreadMutexUnlock(&conn->mutex);

__updateStatsXpuCommand(pts, xcmd);
return xcmd;
}
else if (conn->num_running_cmds > 0)
Expand Down Expand Up @@ -1649,6 +1673,7 @@ pgstromExplainTaskState(CustomScanState *node,
ExplainState *es)
{
pgstromTaskState *pts = (pgstromTaskState *)node;
pgstromSharedState *ps_state = pts->ps_state;
pgstromPlanInfo *pp_info = pts->pp_info;
CustomScan *cscan = (CustomScan *)node->ss.ps.plan;
List *dcontext;
Expand All @@ -1658,6 +1683,7 @@ pgstromExplainTaskState(CustomScanState *node,
char label[100];
char *str;
double ntuples;
uint64_t stat_ntuples = 0;
devtype_info *dtype;

/* setup deparse context */
Expand Down Expand Up @@ -1689,6 +1715,7 @@ pgstromExplainTaskState(CustomScanState *node,
ExplainPropertyText(label, buf.data, es);

/* xPU Scan Quals */
stat_ntuples = pg_atomic_read_u64(&ps_state->source_nvalids);
if (pp_info->scan_quals)
{
List *scan_quals = pp_info->scan_quals;
Expand All @@ -1700,10 +1727,21 @@ pgstromExplainTaskState(CustomScanState *node,
else
expr = linitial(scan_quals);
str = deparse_expression((Node *)expr, dcontext, false, true);
appendStringInfo(&buf, "%s [rows: %.0f -> %.0f]",
str,
pp_info->scan_tuples,
pp_info->scan_rows);
appendStringInfoString(&buf, str);
if (!es->analyze)
{
appendStringInfo(&buf, " [rows: %.0f -> %.0f]",
pp_info->scan_tuples,
pp_info->scan_rows);
}
else
{
appendStringInfo(&buf, " [plan: %.0f -> %.0f, exec: %lu -> %lu]",
pp_info->scan_tuples,
pp_info->scan_rows,
pg_atomic_read_u64(&ps_state->source_ntuples),
stat_ntuples);
}
snprintf(label, sizeof(label), "%s Scan Quals", xpu_label);
ExplainPropertyText(label, buf.data, es);
}
Expand Down Expand Up @@ -1740,8 +1778,22 @@ pgstromExplainTaskState(CustomScanState *node,
appendStringInfo(&buf, "[%s]", str);
}
}
appendStringInfo(&buf, " ... [nrows: %.0f -> %.0f]",
ntuples, pp_inner->join_nrows);
if (!es->analyze)
{
appendStringInfo(&buf, " ... [nrows: %.0f -> %.0f]",
ntuples, pp_inner->join_nrows);
}
else
{
uint64_t next_ntuples;

next_ntuples = pg_atomic_read_u64(&ps_state->inners[i].stats_join);
appendStringInfo(&buf, " ... [plan: %.0f -> %.0f, exec: %lu -> %lu]",
ntuples, pp_inner->join_nrows,
stat_ntuples,
next_ntuples);
stat_ntuples = next_ntuples;
}
switch (pp_inner->join_type)
{
case JOIN_INNER: join_label = "Join"; break;
Expand Down Expand Up @@ -1802,6 +1854,11 @@ pgstromExplainTaskState(CustomScanState *node,
appendStringInfoString(&buf, str);
if (idxname && colname)
appendStringInfo(&buf, " on %s (%s)", idxname, colname);
if (es->analyze)
{
appendStringInfo(&buf, " [fetched: %lu]",
pg_atomic_read_u64(&ps_state->inners[i].stats_gist));
}
snprintf(label, sizeof(label),
"%s GiST Join [%d]", xpu_label, i+1);
ExplainPropertyText(label, buf.data, es);
Expand Down Expand Up @@ -1844,18 +1901,20 @@ pgstromExplainTaskState(CustomScanState *node,
}
else
{
pgstromSharedState *ps_state = pts->ps_state;
XpuConnection *conn = pts->conn;
uint32 count;
uint64 count;

count = pg_atomic_read_u32(&ps_state->heap_direct_nblocks);
appendStringInfo(&buf, "enabled (%s; direct=%u", conn->devname, count);
appendStringInfo(&buf, "enabled (%s; direct=%lu", conn->devname, count);
count = pg_atomic_read_u32(&ps_state->heap_normal_nblocks);
if (count > 0)
appendStringInfo(&buf, ", buffer=%u", count);
appendStringInfo(&buf, ", buffer=%lu", count);
count = pg_atomic_read_u32(&ps_state->heap_fallback_nblocks);
if (count > 0)
appendStringInfo(&buf, ", fallback=%u", count);
appendStringInfo(&buf, ", fallback=%lu", count);
count = pg_atomic_read_u64(&ps_state->source_ntuples);
if (count > 0)
appendStringInfo(&buf, ", ntuples=%lu", count);
appendStringInfo(&buf, ")");
}
ExplainPropertyText("GPU-Direct SQL", buf.data, es);
Expand Down
1 change: 0 additions & 1 deletion src/gpu_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,6 @@ gpuservHandleGpuTaskFinal(gpuClient *gclient, XpuCommand *xcmd)
}
}
//fprintf(stderr, "gpuservHandleGpuTaskFinal: kfin => {final_this_device=%d final_plan_node=%d} resp => {final_this_device=%d final_plan_node=%d}\n", kfin->final_this_device, kfin->final_plan_node, resp.u.results.final_this_device, resp.u.results.final_plan_node);

gpuClientWriteBack(gclient, &resp,
resp.u.results.chunks_offset,
resp.u.results.chunks_nitems,
Expand Down
8 changes: 5 additions & 3 deletions src/pg_strom.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ typedef struct
{
pg_atomic_uint64 inner_nitems;
pg_atomic_uint64 inner_usage;
pg_atomic_uint64 stats_gist; /* only GiST-index */
pg_atomic_uint64 stats_join; /* # of tuples by this join */
} pgstromSharedInnerState;

typedef struct
Expand All @@ -321,9 +323,9 @@ typedef struct
pg_atomic_uint32 parallel_task_control;
pg_atomic_uint32 __rjoin_exit_count;
/* statistics */
pg_atomic_uint64 source_ntuples;
pg_atomic_uint64 source_nvalids;
pg_atomic_uint32 source_nblocks; /* only KDS_FORMAT_BLOCK */
pg_atomic_uint64 source_ntuples; /* only KDS_FORMAT_BLOCK */
pg_atomic_uint64 source_nvalids; /* # of tuples scan'ed */
pg_atomic_uint64 result_ntuples; /* # of tuples generated */
/* for arrow_fdw */
pg_atomic_uint32 arrow_rbatch_index;
pg_atomic_uint32 arrow_rbatch_nload; /* # of loaded record-batches */
Expand Down

0 comments on commit fccb4b9

Please sign in to comment.