Skip to content

Commit

Permalink
adjust alignment on load arrow files for the types wider than 64bit
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed May 13, 2023
1 parent bd1be92 commit 721ed2b
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 67 deletions.
1 change: 1 addition & 0 deletions src/arrow_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ typedef struct ArrowTypeOptions
{
ArrowTypeTag tag;
short unitsz;
unsigned short align; /* alignment of the values */
union {
struct {
unsigned short bitWidth;
Expand Down
127 changes: 62 additions & 65 deletions src/arrow_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,7 @@ __arrowFieldTypeToPGType(const ArrowField *field,
ArrowTypeOptions attopts;

memset(&attopts, 0, sizeof(ArrowTypeOptions));
attopts.align = ALIGNOF_LONG; /* some data types expand the alignment */
switch (t->node.tag)
{
case ArrowNodeTag__Int:
Expand Down Expand Up @@ -1369,6 +1370,7 @@ __arrowFieldTypeToPGType(const ArrowField *field,
elog(ERROR, "Arrow::Decimal%u is not supported", t->Decimal.bitWidth);
attopts.tag = ArrowType__Decimal;
attopts.unitsz = sizeof(int128_t);
attopts.align = sizeof(int128_t);
attopts.decimal.precision = t->Decimal.precision;
attopts.decimal.scale = t->Decimal.scale;
attopts.decimal.bitWidth = t->Decimal.bitWidth;
Expand Down Expand Up @@ -2155,90 +2157,83 @@ typedef struct
off_t rb_offset;
off_t f_offset;
off_t m_offset;
size_t kds_head_sz;
off_t kds_head_sz;
int32_t depth;
int32_t io_index;
strom_io_chunk ioc[FLEXIBLE_ARRAY_MEMBER];
} arrowFdwSetupIOContext;

static void
__setupIOvectorField(arrowFdwSetupIOContext *con,
off_t chunk_offset,
size_t chunk_length,
uint32_t chunk_align,
off_t chunk_offset,
size_t chunk_length,
uint32_t *p_cmeta_offset,
uint32_t *p_cmeta_length)
{
off_t f_pos = con->rb_offset + chunk_offset;
size_t __length = MAXALIGN(chunk_length);

Assert(con->m_offset == MAXALIGN(con->m_offset));
off_t f_gap;
off_t f_base;
off_t m_offset;
strom_io_chunk *ioc;

if (f_pos == con->f_offset)
{
/* good, buffer is fully continuous */
*p_cmeta_offset = __kds_packed(con->kds_head_sz +
con->m_offset);
*p_cmeta_length = __kds_packed(__length);
if (chunk_length != MAXALIGN(chunk_length))
elog(ERROR, "Arrow format corruption? chunk length is not aligned to 64bit");

con->m_offset += __length;
con->f_offset += __length;
}
else if (f_pos > con->f_offset &&
(f_pos & ~PAGE_MASK) == (con->f_offset & ~PAGE_MASK) &&
(f_pos - con->f_offset) == MAXALIGN(f_pos - con->f_offset))
if (f_pos >= con->f_offset &&
(f_pos & ~PAGE_MASK) == (con->f_offset & ~PAGE_MASK))
{
/*
* we can also consolidate the i/o of two chunks, if file position
* of the next chunk (f_pos) and the current file tail position
* (con->f_offset) locate within the same file page, and if gap bytes
* on the file does not break alignment.
* we can consolidate the two i/o chunks, if file position of the next
* chunk (f_pos) and the current file tail position (con->f_offset) locate
* within the same file page, and gap bytes does not break alignment.
*/
size_t __gap = (f_pos - con->f_offset);

/* put gap bytes */
Assert(__gap < PAGE_SIZE);
con->m_offset += __gap;
con->f_offset += __gap;

*p_cmeta_offset = __kds_packed(con->kds_head_sz +
con->m_offset);
*p_cmeta_length = __kds_packed(__length);
f_gap = f_pos - con->f_offset;
m_offset = con->m_offset + f_gap;

con->m_offset += __length;
con->f_offset += __length;
if (m_offset == TYPEALIGN(chunk_align, con->m_offset))
{
/* put the gap bytes, if any */
if (f_gap > 0)
{
con->m_offset += f_gap;
con->f_offset += f_gap;
}
*p_cmeta_offset = __kds_packed(con->kds_head_sz +
con->m_offset);
*p_cmeta_length = __kds_packed(chunk_length);
con->m_offset += chunk_length;
con->f_offset += chunk_length;
return;
}
}
/*
* Elsewhere, we have to close the current i/o chunk once, then
* restart a new i/o chunk to load the disjoin chunks.
*/
if (con->io_index < 0)
con->io_index = 0; /* no current active i/o chunk */
else
{
/*
* Elsewhere, we have no chance to consolidate this chunk to
* the previous i/o-chunk. So, make a new i/o-chunk.
*/
off_t f_base = TYPEALIGN_DOWN(PAGE_SIZE, f_pos);
off_t gap = f_pos - f_base;
strom_io_chunk *ioc;
off_t f_tail = PAGE_ALIGN(con->f_offset);

if (con->io_index < 0)
con->io_index = 0; /* no previous i/o chunks */
else
{
off_t f_tail = PAGE_ALIGN(con->f_offset);

ioc = &con->ioc[con->io_index++];
ioc->nr_pages = f_tail / PAGE_SIZE - ioc->fchunk_id;
con->m_offset += (f_tail - con->f_offset); /* margin for alignment */
}
Assert(con->m_offset == PAGE_ALIGN(con->m_offset));
ioc = &con->ioc[con->io_index];
ioc->m_offset = con->m_offset;
ioc->fchunk_id = f_base / PAGE_SIZE;

con->m_offset += gap;
*p_cmeta_offset = __kds_packed(con->kds_head_sz +
con->m_offset);
*p_cmeta_length = __kds_packed(__length);
con->m_offset += __length;
con->f_offset = f_pos + __length;
ioc = &con->ioc[con->io_index++];
ioc->nr_pages = (f_tail / PAGE_SIZE) - ioc->fchunk_id;
con->m_offset += (f_tail - con->f_offset); /* padding bytes */
}

f_base = PAGE_ALIGN_DOWN(f_pos);
f_gap = f_pos - f_base;
m_offset = TYPEALIGN(chunk_align, con->m_offset + f_gap);

ioc = &con->ioc[con->io_index];
ioc->m_offset = m_offset - f_gap;
ioc->fchunk_id = f_base / PAGE_SIZE;

*p_cmeta_offset = __kds_packed(con->kds_head_sz + m_offset);
*p_cmeta_length = __kds_packed(chunk_length);
con->m_offset = m_offset + chunk_length;
con->f_offset = f_pos + chunk_length;
}

static void
Expand All @@ -2253,6 +2248,7 @@ arrowFdwSetupIOvectorField(arrowFdwSetupIOContext *con,
{
Assert(rb_field->null_count > 0);
__setupIOvectorField(con,
sizeof(int64_t), /* 64bit alignment */
rb_field->nullmap_offset,
rb_field->nullmap_length,
&cmeta->nullmap_offset,
Expand All @@ -2262,6 +2258,7 @@ arrowFdwSetupIOvectorField(arrowFdwSetupIOContext *con,
if (rb_field->values_length > 0)
{
__setupIOvectorField(con,
rb_field->attopts.align,
rb_field->values_offset,
rb_field->values_length,
&cmeta->values_offset,
Expand All @@ -2271,6 +2268,7 @@ arrowFdwSetupIOvectorField(arrowFdwSetupIOContext *con,
if (rb_field->extra_length > 0)
{
__setupIOvectorField(con,
sizeof(int64_t), /* 64bit alignment */
rb_field->extra_offset,
rb_field->extra_length,
&cmeta->extra_offset,
Expand Down Expand Up @@ -2316,7 +2314,7 @@ arrowFdwSetupIOvector(RecordBatchState *rb_state,
con->rb_offset = rb_state->rb_offset;
con->f_offset = ~0UL; /* invalid offset */
con->m_offset = 0;
con->kds_head_sz = TYPEALIGN(64, KDS_HEAD_LENGTH(kds));
con->kds_head_sz = KDS_HEAD_LENGTH(kds);
con->depth = 0;
con->io_index = -1; /* invalid index */
for (int j=0; j < kds->ncols; j++)
Expand All @@ -2336,8 +2334,7 @@ arrowFdwSetupIOvector(RecordBatchState *rb_state,
/* close the last I/O chunks */
strom_io_chunk *ioc = &con->ioc[con->io_index++];

ioc->nr_pages = (TYPEALIGN(PAGE_SIZE, con->f_offset) / PAGE_SIZE -
ioc->fchunk_id);
ioc->nr_pages = (PAGE_ALIGN(con->f_offset) / PAGE_SIZE - ioc->fchunk_id);
con->m_offset += PAGE_SIZE * ioc->nr_pages;
nr_chunks = con->io_index;
}
Expand Down
1 change: 1 addition & 0 deletions src/pg_strom.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ extern int PAGE_SHIFT;
extern long PHYS_PAGES;
extern long PAGES_PER_BLOCK; /* (BLCKSZ / PAGE_SIZE) */
#define PAGE_ALIGN(x) TYPEALIGN(PAGE_SIZE,(x))
#define PAGE_ALIGN_DOWN(x) TYPEALIGN_DOWN(PAGE_SIZE,(x))
#define PGSTROM_CHUNK_SIZE ((size_t)(65534UL << 10))

/*
Expand Down
4 changes: 2 additions & 2 deletions src/xpu_common.cu
Original file line number Diff line number Diff line change
Expand Up @@ -989,10 +989,10 @@ __arrow_fetch_decimal_datum(kern_context *kcxt,
int128_t *base = (int128_t *)
((char *)kds + __kds_unpack(cmeta->values_offset));

assert((((uintptr_t)num) & 15) == 0);
assert((((uintptr_t)base) & (sizeof(int128_t)-1)) == 0);
kvar->xpu.offset = slot_off;
kvar->xpu.type_code = TypeOpCode__numeric;
set_normalized_numeric(num, __Fetch(base + kds_index),
set_normalized_numeric(num, base[kds_index],
cmeta->attopts.decimal.scale);
*vclass = KVAR_CLASS__XPU_DATUM;
}
Expand Down

0 comments on commit 721ed2b

Please sign in to comment.