Skip to content

Commit

Permalink
misc bug fix related to arrow with count(*)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed May 12, 2023
1 parent c2445e2 commit bd1be92
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 25 deletions.
19 changes: 12 additions & 7 deletions src/arrow_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -2306,15 +2306,17 @@ arrowFdwSetupIOvector(RecordBatchState *rb_state,
{
arrowFdwSetupIOContext *con;
strom_io_vector *iovec;
unsigned int nr_chunks = 0;

Assert(kds->ncols <= kds->nr_colmeta &&
Assert(kds->format == KDS_FORMAT_ARROW &&
kds->ncols <= kds->nr_colmeta &&
kds->ncols == rb_state->nfields);
con = alloca(offsetof(arrowFdwSetupIOContext,
ioc[3 * kds->nr_colmeta]));
con->rb_offset = rb_state->rb_offset;
con->f_offset = ~0UL; /* invalid offset */
con->m_offset = 0;
con->kds_head_sz = KDS_HEAD_LENGTH(kds);
con->kds_head_sz = TYPEALIGN(64, KDS_HEAD_LENGTH(kds));
con->depth = 0;
con->io_index = -1; /* invalid index */
for (int j=0; j < kds->ncols; j++)
Expand All @@ -2336,12 +2338,13 @@ arrowFdwSetupIOvector(RecordBatchState *rb_state,

ioc->nr_pages = (TYPEALIGN(PAGE_SIZE, con->f_offset) / PAGE_SIZE -
ioc->fchunk_id);
con->m_offset = ioc->m_offset + PAGE_SIZE * ioc->nr_pages;
con->m_offset += PAGE_SIZE * ioc->nr_pages;
nr_chunks = con->io_index;
}
kds->length = con->m_offset;
kds->length = con->kds_head_sz + con->m_offset;

iovec = palloc0(offsetof(strom_io_vector, ioc[con->io_index]));
iovec->nr_chunks = con->io_index;
iovec = palloc0(offsetof(strom_io_vector, ioc[nr_chunks]));
iovec->nr_chunks = nr_chunks;
if (iovec->nr_chunks > 0)
memcpy(iovec->ioc, con->ioc, sizeof(strom_io_chunk) * con->io_index);
#if 0
Expand Down Expand Up @@ -2432,7 +2435,6 @@ arrowFdwLoadRecordBatch(Relation relation,
setup_kern_data_store(kds, tupdesc, 0, KDS_FORMAT_ARROW);
kds->nitems = rb_state->rb_nitems;
kds->table_oid = RelationGetRelid(relation);
Assert(head_sz == KDS_HEAD_LENGTH(kds));
Assert(kds->ncols == rb_state->nfields);
for (int j=0; j < kds->ncols; j++)
__arrowKdsAssignAttrOptions(kds,
Expand Down Expand Up @@ -3579,7 +3581,10 @@ pgstromScanChunkArrowFdw(pgstromTaskState *pts,

rb_state = __arrowFdwNextRecordBatch(arrow_state);
if (!rb_state)
{
pts->scan_done = true;
return NULL;
}
af_state = rb_state->af_state;

/* XpuCommand header */
Expand Down
25 changes: 17 additions & 8 deletions src/codegen.c
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,18 @@ devtype_inet_hash(bool isnull, Datum value)
return 0;
}

static uint32_t
devtype_geometry_hash(bool isnull, Datum value)
{
elog(ERROR, "geometry type has no device hash function");
}

static uint32_t
devtype_box2df_hash(bool isnull, Datum value)
{
elog(ERROR, "box2df type has no device hash function");
}

/*
* Built-in device functions/operators
*/
Expand Down Expand Up @@ -1401,16 +1413,13 @@ __codegen_build_loadvars_one(codegen_context *context, int depth)
nloads++;
}
}
if (nloads == 0)
if (nloads > 0)
{
pfree(buf.data);
return NULL;
qsort(buf.data + offsetof(kern_expression, u.load.kvars),
nloads,
sizeof(kern_vars_defitem),
kern_vars_defitem_comp);
}
qsort(buf.data + offsetof(kern_expression, u.load.kvars),
nloads,
sizeof(kern_vars_defitem),
kern_vars_defitem_comp);

memset(&kexp, 0, sizeof(kexp));
kexp.exptype = TypeOpCode__int4;
kexp.expflags = context->kexp_flags;
Expand Down
21 changes: 21 additions & 0 deletions src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,7 @@ pgstromExplainTaskState(CustomScanState *node,
char label[100];
char *str;
double ntuples;
devtype_info *dtype;

/* setup deparse context */
dcontext = set_deparse_context_plan(es->deparse_cxt,
Expand Down Expand Up @@ -1855,6 +1856,26 @@ pgstromExplainTaskState(CustomScanState *node,
*/
if (es->verbose)
{
int kvars_nslots = list_length(pp_info->kvars_depth);
size_t kvars_nbytes = (sizeof(kern_variable) * kvars_nslots +
sizeof(int) * kvars_nslots);
foreach (lc, pp_info->kvars_types)
{
Oid type_oid = lfirst_oid(lc);

if (OidIsValid(type_oid) &&
(dtype = pgstrom_devtype_lookup(type_oid)) != NULL)
{
kvars_nbytes = TYPEALIGN(dtype->type_alignof, kvars_nbytes);
kvars_nbytes += dtype->type_sizeof;
}
}
resetStringInfo(&buf);
appendStringInfo(&buf, "nslots: %u, nbytes: %zu",
kvars_nslots,
kvars_nbytes);
ExplainPropertyText("KVars", buf.data, es);

pgstrom_explain_xpucode(&pts->css, es, dcontext,
"Scan VarLoads OpCode",
pp_info->kexp_scan_kvars_load);
Expand Down
4 changes: 3 additions & 1 deletion src/gpu_join.c
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,6 @@ pgstrom_build_groupby_dev(PlannerInfo *root,
}
}
}

__build_explain_tlist_junks(&context);
return context.tlist_dev;
}
Expand Down Expand Up @@ -1392,6 +1391,9 @@ PlanXpuJoinPathCommon(PlannerInfo *root,
input_rels_tlist);
pp_info->kexp_projection = codegen_build_projection(&context);
}
pull_varattnos((Node *)context.tlist_dev,
pp_info->scan_relid,
&outer_refs);
/* assign remaining PlanInfo members */
pp_info->kexp_join_quals_packed
= codegen_build_packed_joinquals(&context,
Expand Down
2 changes: 1 addition & 1 deletion src/gpu_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ gpuservHandleGpuTaskExec(gpuClient *gclient, XpuCommand *xcmd)
return;
}
/* inner buffer of GpuJoin */
if (gq_buf)
if (gq_buf && gq_buf->m_kmrels)
{
kern_multirels *h_kmrels = (kern_multirels *)gq_buf->h_kmrels;

Expand Down
7 changes: 7 additions & 0 deletions src/relscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ setup_kern_data_store(kern_data_store *kds,
return MAXALIGN(offsetof(kern_data_store, colmeta[kds->nr_colmeta]));
}

/*
* estimate_kern_data_store
*
* NOTE: This function estimates required buffer size for the KDS that
* follows the given TupleDesc, but may not be exact size.
* setup_kern_data_store() shall return exact header size.
*/
size_t
estimate_kern_data_store(TupleDesc tupdesc)
{
Expand Down
17 changes: 9 additions & 8 deletions src/xpu_common.cu
Original file line number Diff line number Diff line change
Expand Up @@ -989,9 +989,10 @@ __arrow_fetch_decimal_datum(kern_context *kcxt,
int128_t *base = (int128_t *)
((char *)kds + __kds_unpack(cmeta->values_offset));

assert((((uintptr_t)num) & 15) == 0);
kvar->xpu.offset = slot_off;
kvar->xpu.type_code = TypeOpCode__numeric;
set_normalized_numeric(num, base[kds_index],
set_normalized_numeric(num, __Fetch(base + kds_index),
cmeta->attopts.decimal.scale);
*vclass = KVAR_CLASS__XPU_DATUM;
}
Expand Down Expand Up @@ -1532,13 +1533,13 @@ kern_extract_arrow_tuple(kern_context *kcxt,
cmeta->nullmap_offset,
cmeta->nullmap_length))
{
if (__kern_extract_arrow_field(kcxt,
kds,
cmeta,
kds_index,
kvars->var_slot_off,
&kcxt->kvars_slot[slot_id],
&kcxt->kvars_class[slot_id]))
if (!__kern_extract_arrow_field(kcxt,
kds,
cmeta,
kds_index,
kvars->var_slot_off,
&kcxt->kvars_slot[slot_id],
&kcxt->kvars_class[slot_id]))
return -1;
}
else
Expand Down

0 comments on commit bd1be92

Please sign in to comment.