diff --git a/src/arrow_defs.h b/src/arrow_defs.h index b78ca1ae4..ffcb88006 100644 --- a/src/arrow_defs.h +++ b/src/arrow_defs.h @@ -197,6 +197,7 @@ typedef struct ArrowTypeOptions { ArrowTypeTag tag; short unitsz; + unsigned short align; /* alignment of the values */ union { struct { unsigned short bitWidth; diff --git a/src/arrow_fdw.c b/src/arrow_fdw.c index 950f70aa4..f7eeaaa94 100644 --- a/src/arrow_fdw.c +++ b/src/arrow_fdw.c @@ -1297,6 +1297,7 @@ __arrowFieldTypeToPGType(const ArrowField *field, ArrowTypeOptions attopts; memset(&attopts, 0, sizeof(ArrowTypeOptions)); + attopts.align = ALIGNOF_LONG; /* some data types expand the alignment */ switch (t->node.tag) { case ArrowNodeTag__Int: @@ -1369,6 +1370,7 @@ __arrowFieldTypeToPGType(const ArrowField *field, elog(ERROR, "Arrow::Decimal%u is not supported", t->Decimal.bitWidth); attopts.tag = ArrowType__Decimal; attopts.unitsz = sizeof(int128_t); + attopts.align = sizeof(int128_t); attopts.decimal.precision = t->Decimal.precision; attopts.decimal.scale = t->Decimal.scale; attopts.decimal.bitWidth = t->Decimal.bitWidth; @@ -2155,7 +2157,7 @@ typedef struct off_t rb_offset; off_t f_offset; off_t m_offset; - size_t kds_head_sz; + off_t kds_head_sz; int32_t depth; int32_t io_index; strom_io_chunk ioc[FLEXIBLE_ARRAY_MEMBER]; @@ -2163,82 +2165,75 @@ typedef struct static void __setupIOvectorField(arrowFdwSetupIOContext *con, - off_t chunk_offset, - size_t chunk_length, + uint32_t chunk_align, + off_t chunk_offset, + size_t chunk_length, uint32_t *p_cmeta_offset, uint32_t *p_cmeta_length) { off_t f_pos = con->rb_offset + chunk_offset; - size_t __length = MAXALIGN(chunk_length); - - Assert(con->m_offset == MAXALIGN(con->m_offset)); + off_t f_gap; + off_t f_base; + off_t m_offset; + strom_io_chunk *ioc; - if (f_pos == con->f_offset) - { - /* good, buffer is fully continuous */ - *p_cmeta_offset = __kds_packed(con->kds_head_sz + - con->m_offset); - *p_cmeta_length = __kds_packed(__length); + if (chunk_length != MAXALIGN(chunk_length)) + elog(ERROR, "Arrow format corruption? chunk length is not aligned to 64bit"); - con->m_offset += __length; - con->f_offset += __length; - } - else if (f_pos > con->f_offset && - (f_pos & ~PAGE_MASK) == (con->f_offset & ~PAGE_MASK) && - (f_pos - con->f_offset) == MAXALIGN(f_pos - con->f_offset)) + if (f_pos >= con->f_offset && + (f_pos & ~PAGE_MASK) == (con->f_offset & ~PAGE_MASK)) { /* - * we can also consolidate the i/o of two chunks, if file position - * of the next chunk (f_pos) and the current file tail position - * (con->f_offset) locate within the same file page, and if gap bytes - * on the file does not break alignment. + * we can consolidate the two i/o chunks, if file position of the next + * chunk (f_pos) and the current file tail position (con->f_offset) locate + * within the same file page, and gap bytes does not break alignment. */ - size_t __gap = (f_pos - con->f_offset); - - /* put gap bytes */ - Assert(__gap < PAGE_SIZE); - con->m_offset += __gap; - con->f_offset += __gap; - - *p_cmeta_offset = __kds_packed(con->kds_head_sz + - con->m_offset); - *p_cmeta_length = __kds_packed(__length); + f_gap = f_pos - con->f_offset; + m_offset = con->m_offset + f_gap; - con->m_offset += __length; - con->f_offset += __length; + if (m_offset == TYPEALIGN(chunk_align, con->m_offset)) + { + /* put the gap bytes, if any */ + if (f_gap > 0) + { + con->m_offset += f_gap; + con->f_offset += f_gap; + } + *p_cmeta_offset = __kds_packed(con->kds_head_sz + + con->m_offset); + *p_cmeta_length = __kds_packed(chunk_length); + con->m_offset += chunk_length; + con->f_offset += chunk_length; + return; + } } + /* + * Elsewhere, we have to close the current i/o chunk once, then + * restart a new i/o chunk to load the disjoin chunks. + */ + if (con->io_index < 0) + con->io_index = 0; /* no current active i/o chunk */ else { - /* - * Elsewhere, we have no chance to consolidate this chunk to - * the previous i/o-chunk. So, make a new i/o-chunk. - */ - off_t f_base = TYPEALIGN_DOWN(PAGE_SIZE, f_pos); - off_t gap = f_pos - f_base; - strom_io_chunk *ioc; + off_t f_tail = PAGE_ALIGN(con->f_offset); - if (con->io_index < 0) - con->io_index = 0; /* no previous i/o chunks */ - else - { - off_t f_tail = PAGE_ALIGN(con->f_offset); - - ioc = &con->ioc[con->io_index++]; - ioc->nr_pages = f_tail / PAGE_SIZE - ioc->fchunk_id; - con->m_offset += (f_tail - con->f_offset); /* margin for alignment */ - } - Assert(con->m_offset == PAGE_ALIGN(con->m_offset)); - ioc = &con->ioc[con->io_index]; - ioc->m_offset = con->m_offset; - ioc->fchunk_id = f_base / PAGE_SIZE; - - con->m_offset += gap; - *p_cmeta_offset = __kds_packed(con->kds_head_sz + - con->m_offset); - *p_cmeta_length = __kds_packed(__length); - con->m_offset += __length; - con->f_offset = f_pos + __length; + ioc = &con->ioc[con->io_index++]; + ioc->nr_pages = (f_tail / PAGE_SIZE) - ioc->fchunk_id; + con->m_offset += (f_tail - con->f_offset); /* padding bytes */ } + + f_base = PAGE_ALIGN_DOWN(f_pos); + f_gap = f_pos - f_base; + m_offset = TYPEALIGN(chunk_align, con->m_offset + f_gap); + + ioc = &con->ioc[con->io_index]; + ioc->m_offset = m_offset - f_gap; + ioc->fchunk_id = f_base / PAGE_SIZE; + + *p_cmeta_offset = __kds_packed(con->kds_head_sz + m_offset); + *p_cmeta_length = __kds_packed(chunk_length); + con->m_offset = m_offset + chunk_length; + con->f_offset = f_pos + chunk_length; } static void @@ -2253,6 +2248,7 @@ arrowFdwSetupIOvectorField(arrowFdwSetupIOContext *con, { Assert(rb_field->null_count > 0); __setupIOvectorField(con, + sizeof(int64_t), /* 64bit alignment */ rb_field->nullmap_offset, rb_field->nullmap_length, &cmeta->nullmap_offset, @@ -2262,6 +2258,7 @@ arrowFdwSetupIOvectorField(arrowFdwSetupIOContext *con, if (rb_field->values_length > 0) { __setupIOvectorField(con, + rb_field->attopts.align, rb_field->values_offset, rb_field->values_length, &cmeta->values_offset, @@ -2271,6 +2268,7 @@ arrowFdwSetupIOvectorField(arrowFdwSetupIOContext *con, if (rb_field->extra_length > 0) { __setupIOvectorField(con, + sizeof(int64_t), /* 64bit alignment */ rb_field->extra_offset, rb_field->extra_length, &cmeta->extra_offset, @@ -2316,7 +2314,7 @@ arrowFdwSetupIOvector(RecordBatchState *rb_state, con->rb_offset = rb_state->rb_offset; con->f_offset = ~0UL; /* invalid offset */ con->m_offset = 0; - con->kds_head_sz = TYPEALIGN(64, KDS_HEAD_LENGTH(kds)); + con->kds_head_sz = KDS_HEAD_LENGTH(kds); con->depth = 0; con->io_index = -1; /* invalid index */ for (int j=0; j < kds->ncols; j++) @@ -2336,8 +2334,7 @@ arrowFdwSetupIOvector(RecordBatchState *rb_state, /* close the last I/O chunks */ strom_io_chunk *ioc = &con->ioc[con->io_index++]; - ioc->nr_pages = (TYPEALIGN(PAGE_SIZE, con->f_offset) / PAGE_SIZE - - ioc->fchunk_id); + ioc->nr_pages = (PAGE_ALIGN(con->f_offset) / PAGE_SIZE - ioc->fchunk_id); con->m_offset += PAGE_SIZE * ioc->nr_pages; nr_chunks = con->io_index; } diff --git a/src/pg_strom.h b/src/pg_strom.h index a22c60282..3a03cfb8f 100644 --- a/src/pg_strom.h +++ b/src/pg_strom.h @@ -449,6 +449,7 @@ extern int PAGE_SHIFT; extern long PHYS_PAGES; extern long PAGES_PER_BLOCK; /* (BLCKSZ / PAGE_SIZE) */ #define PAGE_ALIGN(x) TYPEALIGN(PAGE_SIZE,(x)) +#define PAGE_ALIGN_DOWN(x) TYPEALIGN_DOWN(PAGE_SIZE,(x)) #define PGSTROM_CHUNK_SIZE ((size_t)(65534UL << 10)) /* diff --git a/src/xpu_common.cu b/src/xpu_common.cu index e9b52a508..8d576ecf9 100644 --- a/src/xpu_common.cu +++ b/src/xpu_common.cu @@ -989,10 +989,10 @@ __arrow_fetch_decimal_datum(kern_context *kcxt, int128_t *base = (int128_t *) ((char *)kds + __kds_unpack(cmeta->values_offset)); - assert((((uintptr_t)num) & 15) == 0); + assert((((uintptr_t)base) & (sizeof(int128_t)-1)) == 0); kvar->xpu.offset = slot_off; kvar->xpu.type_code = TypeOpCode__numeric; - set_normalized_numeric(num, __Fetch(base + kds_index), + set_normalized_numeric(num, base[kds_index], cmeta->attopts.decimal.scale); *vclass = KVAR_CLASS__XPU_DATUM; }