From b3b57638811f99de1f885094a0b9ffea98959cd9 Mon Sep 17 00:00:00 2001 From: KaiGai Kohei Date: Sun, 31 Mar 2024 02:08:06 +0900 Subject: [PATCH] add parallel support in pg2arrow issue #723 --- arrow-tools/Makefile | 2 +- arrow-tools/arrow_pgsql.c | 286 +++++++++++++++++++++++++++++++- arrow-tools/pgsql_client.c | 4 +- arrow-tools/sql2arrow.c | 322 ++++++++++++++++++++++++++++++------- arrow-tools/sql2arrow.h | 1 + src/arrow_ipc.h | 29 +++- src/arrow_write.c | 205 ++++++++++++++++++++--- 7 files changed, 765 insertions(+), 84 deletions(-) diff --git a/arrow-tools/Makefile b/arrow-tools/Makefile index 5437dee84..33ed0c366 100644 --- a/arrow-tools/Makefile +++ b/arrow-tools/Makefile @@ -47,7 +47,7 @@ all: $(ALL_PROGS) # ifeq ($(HAS_PG_CONFIG),yes) pg2arrow: $(PG2ARROW_OBJS) - $(CC) -o $@ $(PG2ARROW_OBJS) -lpq \ + $(CC) -o $@ $(PG2ARROW_OBJS) -lpq -lpthread \ $(shell $(PG_CONFIG) --ldflags) \ -L $(shell $(PG_CONFIG) --libdir) diff --git a/arrow-tools/arrow_pgsql.c b/arrow-tools/arrow_pgsql.c index e96bc1394..39e3f3709 100644 --- a/arrow-tools/arrow_pgsql.c +++ b/arrow-tools/arrow_pgsql.c @@ -201,13 +201,35 @@ put_bool_value(SQLfield *column, const char *addr, int sz) value = *((const int8_t *)addr); sql_buffer_setbit(&column->nullmap, row_index); if (value) - sql_buffer_setbit(&column->values, row_index); + sql_buffer_setbit(&column->values, row_index); else - sql_buffer_clrbit(&column->values, row_index); + sql_buffer_clrbit(&column->values, row_index); } return __buffer_usage_inline_type(column); } +static size_t +move_bool_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + size_t dindex = dest->nitems++; + + if (!sql_buffer_getbit(&src->nullmap, sindex)) + { + dest->nullcount++; + sql_buffer_clrbit(&dest->nullmap, dindex); + sql_buffer_clrbit(&dest->values, dindex); + } + else + { + sql_buffer_setbit(&dest->nullmap, dindex); + if (sql_buffer_getbit(&src->values, sindex)) + sql_buffer_setbit(&dest->values, dindex); + else + sql_buffer_clrbit(&dest->values, dindex); + } + return __buffer_usage_inline_type(dest); +} + /* * utility function to set NULL value */ @@ -321,7 +343,6 @@ put_int32_value(SQLfield *column, const char *addr, int sz) value = __fetch_32bit(addr); sql_buffer_setbit(&column->nullmap, row_index); sql_buffer_append(&column->values, &value, sz); - STAT_UPDATES(column,i32,value); } return __buffer_usage_inline_type(column); @@ -425,7 +446,6 @@ write_float16_stat(SQLfield *attr, char *buf, size_t len, return snprintf(buf, len, "%u", (uint32_t)ival); } - static size_t put_float32_value(SQLfield *column, const char *addr, int sz) { @@ -620,6 +640,58 @@ put_decimal_value(SQLfield *column, const char *addr, int sz) return __buffer_usage_inline_type(column); } +#define MOVE_SCALAR_TEMPLATE(NAME,VALUE_TYPE,STAT_NAME) \ + static size_t \ + move_##NAME##_value(SQLfield *dest, const SQLfield *src, long sindex) \ + { \ + size_t dindex = dest->nitems++; \ + \ + if (!sql_buffer_getbit(&src->nullmap, sindex)) \ + __put_inline_null_value(dest, dindex, sizeof(VALUE_TYPE)); \ + else \ + { \ + VALUE_TYPE value; \ + \ + value = ((VALUE_TYPE *)src->values.data)[sindex]; \ + sql_buffer_setbit(&dest->nullmap, dindex); \ + sql_buffer_append(&dest->values, &value, \ + sizeof(VALUE_TYPE)); \ + STAT_UPDATES(dest,STAT_NAME,value); \ + } \ + return __buffer_usage_inline_type(dest); \ + } +MOVE_SCALAR_TEMPLATE(int8, int8_t, i8) +MOVE_SCALAR_TEMPLATE(uint8, uint8_t, u8) +MOVE_SCALAR_TEMPLATE(int16, int32_t, i32) +MOVE_SCALAR_TEMPLATE(uint16, uint32_t, u32) +MOVE_SCALAR_TEMPLATE(int32, int32_t, i32) +MOVE_SCALAR_TEMPLATE(uint32, uint32_t, u32) +MOVE_SCALAR_TEMPLATE(int64, int64_t, i64) +MOVE_SCALAR_TEMPLATE(uint64, uint64_t, u64) +static size_t +move_float16_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + size_t dindex = dest->nitems++; + + if (!sql_buffer_getbit(&src->nullmap, sindex)) + __put_inline_null_value(dest, dindex, sizeof(float2_t)); + else + { + float2_t value; + float4_t fval; + + value = ((float2_t *)src->values.data)[sindex]; + sql_buffer_setbit(&dest->nullmap, dindex); + sql_buffer_append(&dest->values, &value, sizeof(float2_t)); + fval = fp16_to_fp32(value); + STAT_UPDATES(dest, f32, fval); + } + return __buffer_usage_inline_type(dest); +} +MOVE_SCALAR_TEMPLATE(float32, float4_t, f32) +MOVE_SCALAR_TEMPLATE(float64, float8_t, f64) +MOVE_SCALAR_TEMPLATE(decimal, int128_t, i128) + /* * Date */ @@ -688,6 +760,24 @@ put_date_value(SQLfield *column, const char *addr, int sz) return column->put_value(column, addr, sz); } + +static size_t +move_date_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + assert(src->arrow_type.Date.unit == dest->arrow_type.Date.unit); + switch (src->arrow_type.Date.unit) + { + case ArrowDateUnit__Day: + return move_int32_value(dest, src, sindex); + case ArrowDateUnit__MilliSecond: + return move_int64_value(dest, src, sindex); + default: + break; + } + Elog("ArrowTypeDate has unknown unit (%d)", + src->arrow_type.Date.unit); +} + /* * Time */ @@ -813,6 +903,25 @@ put_time_value(SQLfield *column, const char *addr, int sz) return column->put_value(column, addr, sz); } +static size_t +move_time_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + assert(src->arrow_type.Time.unit == dest->arrow_type.Time.unit); + switch (src->arrow_type.Time.unit) + { + case ArrowTimeUnit__Second: + case ArrowTimeUnit__MilliSecond: + return move_int32_value(dest, src, sindex); + case ArrowTimeUnit__MicroSecond: + case ArrowTimeUnit__NanoSecond: + return move_int64_value(dest, src, sindex); + default: + break; + } + Elog("ArrowTypeTime has unknown unit (%d)", + src->arrow_type.Time.unit); +} + /* * Timestamp */ @@ -939,6 +1048,24 @@ put_timestamp_value(SQLfield *column, const char *addr, int sz) return column->put_value(column, addr, sz); } +static size_t +move_timestamp_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + assert(src->arrow_type.Timestamp.unit == dest->arrow_type.Timestamp.unit); + switch (src->arrow_type.Timestamp.unit) + { + case ArrowTimeUnit__Second: + case ArrowTimeUnit__MilliSecond: + case ArrowTimeUnit__MicroSecond: + case ArrowTimeUnit__NanoSecond: + return move_int64_value(dest, src, sindex); + default: + break; + } + Elog("ArrowTypeTimestamp has unknown unit (%d)", + dest->arrow_type.Timestamp.unit); +} + /* * Interval */ @@ -1005,7 +1132,7 @@ put_interval_value(SQLfield *sql_field, const char *addr, int sz) sql_field->put_value = __put_interval_day_time_value; break; default: - Elog("columnibute \"%s\" has unknown Arrow::Interval.unit(%d)", + Elog("column attribute \"%s\" has unknown Arrow::Interval.unit(%d)", sql_field->field_name, sql_field->arrow_type.Interval.unit); break; @@ -1013,6 +1140,23 @@ put_interval_value(SQLfield *sql_field, const char *addr, int sz) return sql_field->put_value(sql_field, addr, sz); } +static size_t +move_interval_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + assert(src->arrow_type.Interval.unit == dest->arrow_type.Interval.unit); + switch (src->arrow_type.Interval.unit) + { + case ArrowIntervalUnit__Year_Month: + return move_uint32_value(dest, src, sindex); + case ArrowIntervalUnit__Day_Time: + return move_uint64_value(dest, src, sindex); + default: + break; + } + Elog("Arrow::Interval.unit is unknown (%d)", + src->arrow_type.Interval.unit); +} + /* * Utf8, Binary */ @@ -1041,6 +1185,25 @@ put_variable_value(SQLfield *column, return __buffer_usage_varlena_type(column); } + +static size_t +move_variable_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + const char *addr = NULL; + int sz = 0; + + if (sql_buffer_getbit(&src->nullmap, sindex)) + { + uint32_t head = ((uint32_t *)src->values.data)[sindex]; + uint32_t tail = ((uint32_t *)src->values.data)[sindex+1]; + + assert(head <= tail && tail <= src->extra.usage); + addr = src->extra.data + head; + sz = tail - head; + } + return put_variable_value(dest, addr, sz); +} + /* * FixedSizeBinary */ @@ -1069,6 +1232,19 @@ put_bpchar_value(SQLfield *column, return __buffer_usage_inline_type(column); } +static size_t +move_bpchar_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + const char *addr = NULL; + int unitsz = src->arrow_type.FixedSizeBinary.byteWidth; + + if (sql_buffer_getbit(&src->nullmap, sindex)) + { + addr = src->values.data + unitsz * sindex; + } + return put_bpchar_value(dest, addr, unitsz); +} + /* * List:: type */ @@ -1182,6 +1358,37 @@ put_array_value(SQLfield *column, return __buffer_usage_inline_type(column) + element->__curr_usage__; } +static size_t +move_array_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + SQLfield *d_elem = dest->element; + long dindex = dest->nitems++; + + if (!sql_buffer_getbit(&src->nullmap, sindex)) + { + /* add NULL */ + dest->nullcount++; + sql_buffer_clrbit(&dest->nullmap, sindex); + sql_buffer_append(&dest->values, &d_elem->nitems, sizeof(int32_t)); + } + else + { + SQLfield *s_elem = src->element; + uint32_t head = ((uint32_t *)src->values.data)[sindex]; + uint32_t tail = ((uint32_t *)src->values.data)[sindex+1]; + uint32_t curr; + + assert(head <= tail); + assert(IsSQLfieldCompatible(d_elem, s_elem)); + for (curr = head; curr < tail; curr++) + sql_field_move_value(d_elem, s_elem, curr); + + sql_buffer_setbit(&dest->nullmap, dindex); + sql_buffer_append(&dest->values, &d_elem->nitems, sizeof(int32_t)); + } + return __buffer_usage_inline_type(dest) + d_elem->__curr_usage__; +} + /* * Arrow::Struct */ @@ -1301,6 +1508,38 @@ put_composite_value(SQLfield *column, return usage; } +static size_t +move_composite_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + long dindex = dest->nitems++; + size_t usage = 0; + + if (!sql_buffer_getbit(&src->nullmap, sindex)) + { + dest->nullcount++; + sql_buffer_clrbit(&dest->nullmap, dindex); + for (int j=0; j < dest->nfields; j++) + { + usage += sql_field_put_value(&dest->subfields[j], NULL, 0); + } + } + else + { + for (int j=0; j < dest->nfields; j++) + { + usage += sql_field_move_value(&dest->subfields[j], + &src->subfields[j], sindex); + } + sql_buffer_setbit(&dest->nullmap, dindex); + } + if (dest->nullcount > 0) + usage += ARROWALIGN(dest->nullmap.usage); + return usage; +} + +/* + * Enum values + */ static size_t put_dictionary_value(SQLfield *column, const char *addr, int sz) @@ -1317,7 +1556,7 @@ put_dictionary_value(SQLfield *column, { SQLdictionary *enumdict = column->enumdict; hashItem *hitem; - uint32_t hash; + uint32_t hash; hash = hash_any((const unsigned char *)addr, sz); for (hitem = enumdict->hslots[hash % enumdict->nslots]; @@ -1337,6 +1576,20 @@ put_dictionary_value(SQLfield *column, return __buffer_usage_inline_type(column); } +static size_t +move_dictionary_value(SQLfield *dest, const SQLfield *src, long sindex) +{ + if (!sql_buffer_getbit(&src->nullmap, sindex)) + return put_dictionary_value(dest, NULL, 0); + if (dest->enumdict == src->enumdict) + { + uint32_t enum_id = ((uint32_t *)src->values.data)[sindex]; + + return put_uint32_value(dest, (char *)&enum_id, sizeof(uint32_t)); + } + Elog("Different Enum dictionary is not compatible"); +} + /* * put_value handler for contrib/cube module */ @@ -1396,21 +1649,25 @@ assignArrowTypeInt(SQLfield *column, bool is_signed, case sizeof(char): column->arrow_type.Int.bitWidth = 8; column->put_value = (is_signed ? put_int8_value : put_uint8_value); + column->move_value = (is_signed ? move_int8_value : move_uint8_value); column->write_stat = write_int8_stat; break; case sizeof(short): column->arrow_type.Int.bitWidth = 16; column->put_value = (is_signed ? put_int16_value : put_uint16_value); + column->move_value = (is_signed ? move_int16_value : move_uint16_value); column->write_stat = write_int16_stat; break; case sizeof(int): column->arrow_type.Int.bitWidth = 32; column->put_value = (is_signed ? put_int32_value : put_uint32_value); + column->move_value = (is_signed ? move_int32_value : move_uint32_value); column->write_stat = write_int32_stat; break; case sizeof(long): column->arrow_type.Int.bitWidth = 64; column->put_value = (is_signed ? put_int64_value : put_uint64_value); + column->move_value = (is_signed ? move_int64_value : move_uint64_value); column->write_stat = write_int64_stat; break; default: @@ -1441,18 +1698,21 @@ assignArrowTypeFloatingPoint(SQLfield *column, ArrowField *arrow_field) column->arrow_type.FloatingPoint.precision = ArrowPrecision__Half; column->put_value = put_float16_value; + column->move_value = move_float16_value; column->write_stat = write_float16_stat; break; case sizeof(float): column->arrow_type.FloatingPoint.precision = ArrowPrecision__Single; column->put_value = put_float32_value; + column->move_value = move_float32_value; column->write_stat = write_int32_stat; break; case sizeof(double): column->arrow_type.FloatingPoint.precision = ArrowPrecision__Double; column->put_value = put_float64_value; + column->move_value = move_float64_value; column->write_stat = write_int64_stat; break; default: @@ -1480,6 +1740,7 @@ assignArrowTypeBinary(SQLfield *column, ArrowField *arrow_field) Elog("attribute '%s' is not compatible", column->field_name); initArrowNode(&column->arrow_type, Binary); column->put_value = put_variable_value; + column->move_value = move_variable_value; return 3; /* nullmap + index + extra */ } @@ -1491,6 +1752,7 @@ assignArrowTypeUtf8(SQLfield *column, ArrowField *arrow_field) Elog("attribute '%s' is not compatible", column->field_name); initArrowNode(&column->arrow_type, Utf8); column->put_value = put_variable_value; + column->move_value = move_variable_value; return 3; /* nullmap + index + extra */ } @@ -1511,7 +1773,7 @@ assignArrowTypeBpchar(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, FixedSizeBinary); column->arrow_type.FixedSizeBinary.byteWidth = byteWidth; column->put_value = put_bpchar_value; - + column->move_value = move_bpchar_value; return 2; /* nullmap + values */ } @@ -1524,6 +1786,7 @@ assignArrowTypeBool(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, Bool); column->put_value = put_bool_value; + column->move_value = move_bool_value; return 2; /* nullmap + values */ } @@ -1553,6 +1816,7 @@ assignArrowTypeDecimal(SQLfield *column, ArrowField *arrow_field) column->arrow_type.Decimal.scale = scale; column->arrow_type.Decimal.bitWidth = 128; column->put_value = put_decimal_value; + column->move_value = move_decimal_value; column->write_stat = write_int128_stat; return 2; /* nullmap + values */ @@ -1572,6 +1836,7 @@ assignArrowTypeDate(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, Date); column->arrow_type.Date.unit = unit; column->put_value = put_date_value; + column->move_value = move_date_value; column->write_stat = write_null_stat; return 2; /* nullmap + values */ @@ -1592,6 +1857,7 @@ assignArrowTypeTime(SQLfield *column, ArrowField *arrow_field) column->arrow_type.Time.unit = unit; column->arrow_type.Time.bitWidth = 64; column->put_value = put_time_value; + column->move_value = move_time_value; column->write_stat = write_null_stat; return 2; /* nullmap + values */ @@ -1617,6 +1883,7 @@ assignArrowTypeTimestamp(SQLfield *column, const char *tz_name, column->arrow_type.Timestamp._timezone_len = strlen(tz_name); } column->put_value = put_timestamp_value; + column->move_value = move_timestamp_value; column->write_stat = write_null_stat; return 2; /* nullmap + values */ @@ -1636,6 +1903,7 @@ assignArrowTypeInterval(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, Interval); column->arrow_type.Interval.unit = unit; column->put_value = put_interval_value; + column->move_value = move_interval_value; return 2; /* nullmap + values */ } @@ -1649,6 +1917,7 @@ assignArrowTypeList(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, List); column->put_value = put_array_value; + column->move_value = move_array_value; return 2; /* nullmap + offset vector */ } @@ -1662,6 +1931,7 @@ assignArrowTypeStruct(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, Struct); column->put_value = put_composite_value; + column->move_value = move_composite_value; return 1; /* only nullmap */ } @@ -1686,6 +1956,7 @@ assignArrowTypeDictionary(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, Utf8); column->put_value = put_dictionary_value; + column->move_value = move_dictionary_value; return 2; /* nullmap + values */ } @@ -1699,6 +1970,7 @@ assignArrowTypeExtraCube(SQLfield *column, ArrowField *arrow_field) initArrowNode(&column->arrow_type, Binary); column->put_value = put_extra_cube_value; + column->move_value = move_variable_value; return 3; /* nullmap + index + extra */ } diff --git a/arrow-tools/pgsql_client.c b/arrow-tools/pgsql_client.c index 42eb3efc1..64ab67bc7 100644 --- a/arrow-tools/pgsql_client.c +++ b/arrow-tools/pgsql_client.c @@ -1029,7 +1029,9 @@ sqldb_begin_query(void *sqldb_state, /* move to the first tuple(-set) */ if (!pgsql_move_next(pgstate, NULL)) return NULL; - return pgsql_create_buffer(pgstate, af_info, dictionary_list); + return pgsql_create_buffer(pgstate, + af_info, + dictionary_list); } /* diff --git a/arrow-tools/sql2arrow.c b/arrow-tools/sql2arrow.c index 7cb0a2ac1..c13414a2e 100644 --- a/arrow-tools/sql2arrow.c +++ b/arrow-tools/sql2arrow.c @@ -35,6 +35,16 @@ static int shows_progress = 0; static userConfigOption *sqldb_session_configs = NULL; static nestLoopOption *sqldb_nestloop_options = NULL; +/* + * Per-worker state variables + */ +static volatile bool worker_setup_done = false; +static pthread_mutex_t worker_setup_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t worker_setup_cond = PTHREAD_COND_INITIALIZER; +static pthread_t *worker_threads; +static SQLtable **worker_tables; +static pthread_mutex_t main_table_mutex = PTHREAD_MUTEX_INITIALIZER; + /* * __trim */ @@ -329,37 +339,6 @@ setup_output_file(SQLtable *table, const char *output_filename) writeArrowSchema(table); } -static void -shows_record_batch_progress(SQLtable *table, size_t nitems) -{ - if (shows_progress) - { - ArrowBlock *block; - int index = table->numRecordBatches - 1; - time_t tv = time(NULL); - struct tm tm; - - localtime_r(&tv, &tm); - assert(index >= 0); - block = &table->recordBatches[index]; - printf("%04d-%02d-%02d %02d:%02d:%02d " - "RecordBatch[%d]: " - "offset=%lu length=%lu (meta=%u, body=%lu) nitems=%zu\n", - tm.tm_year + 1900, - tm.tm_mon + 1, - tm.tm_mday, - tm.tm_hour, - tm.tm_min, - tm.tm_sec, - index, - block->offset, - block->metaDataLength + block->bodyLength, - block->metaDataLength, - block->bodyLength, - nitems); - } -} - static int dumpArrowFile(const char *filename) { @@ -936,6 +915,221 @@ parse_options(int argc, char * const argv[]) batch_segment_sz = (1UL << 28); /* 256MB in default */ } +/* + * sqldb_command_apply_worker_id + */ +const char * +sqldb_command_apply_worker_id(const char *command, int worker_id) +{ + const char *src = command; + char *buf = palloc(strlen(command) + 100); + char *dst = buf; + int c; + + while ((c = *src++) != '\0') + { + if (c == '$') + { + if (strncmp(src, "(WORKER_ID)", 11) == 0) + { + dst += sprintf(dst, "%d", worker_id); + src += 11; + continue; + } + if (strncmp(src, "(N_WORKERS)", 11) == 0) + { + dst += sprintf(dst, "%d", num_worker_threads); + src += 11; + continue; + } + } + *dst++ = c; + } + *dst = '\0'; + + return buf; +} + +/* + * sql_table_merge_one_row + */ +static void +mergeArrowChunkOneRow(SQLtable *dst_table, + SQLtable *src_table, size_t src_index) +{ + size_t usage = 0; + + for (int j=0; j < src_table->nfields; j++) + { + usage += sql_field_move_value(&dst_table->columns[j], + &src_table->columns[j], src_index); + } + dst_table->nitems++; + dst_table->usage = usage; +} + +/* + * shows_record_batch_progress + */ +static void +shows_record_batch_progress(const ArrowBlock *block, + int rb_index, size_t nitems, + int worker_id) +{ + time_t tv = time(NULL); + struct tm tm; + char namebuf[100]; + + if (num_worker_threads == 1) + namebuf[0] = '\0'; + else + sprintf(namebuf, " by worker:%d", worker_id); + + localtime_r(&tv, &tm); + printf("%04d-%02d-%02d %02d:%02d:%02d " + "RecordBatch[%d]: " + "offset=%lu length=%lu (meta=%u, body=%lu) nitems=%zu%s\n", + tm.tm_year + 1900, + tm.tm_mon + 1, + tm.tm_mday, + tm.tm_hour, + tm.tm_min, + tm.tm_sec, + rb_index, + block->offset, + block->metaDataLength + block->bodyLength, + block->metaDataLength, + block->bodyLength, + nitems, + namebuf); +} + +/* + * execute_sql2arrow + */ +static void +sql2arrow_common(void *sqldb_conn, uint32_t worker_id) +{ + SQLtable *main_table = worker_tables[0]; + SQLtable *data_table = worker_tables[worker_id]; + ArrowBlock __block; + int __rb_index; + + /* fetch results and write record batches */ + while (sqldb_fetch_results(sqldb_conn, data_table)) + { + if (data_table->usage >= batch_segment_sz) + { + __rb_index = writeArrowRecordBatchMT(main_table, + data_table, + &main_table_mutex, + &__block); + if (shows_progress) + shows_record_batch_progress(&__block, + __rb_index, + data_table->nitems, + worker_id); + sql_table_clear(data_table); + } + } + /* wait and merge results */ + for (uint32_t k=1; (worker_id & k) == 0; k <<= 1) + { + uint32_t buddy = (worker_id | k); + SQLtable *buddy_table; + + if (buddy >= num_worker_threads) + break; + if ((errno = pthread_join(worker_threads[buddy], NULL)) != 0) + Elog("failed on pthread_join[%u]: %m", buddy); + + buddy_table = worker_tables[buddy]; + for (size_t i=0; i < buddy_table->nitems; i++) + { + /* merge one row */ + mergeArrowChunkOneRow(data_table, buddy_table, i); + /* write out buffer */ + if (data_table->usage >= batch_segment_sz) + { + __rb_index = writeArrowRecordBatchMT(main_table, + data_table, + &main_table_mutex, + &__block); + if (shows_progress) + shows_record_batch_progress(&__block, + __rb_index, + data_table->nitems, + worker_id); + sql_table_clear(data_table); + } + } + if (shows_progress && buddy_table->nitems > 0) + printf("worker:%u merged pending results by worker:%u\n", + worker_id, buddy); + } +} + +/* + * worker_main + */ +static void * +worker_main(void *__worker_id) +{ + uintptr_t worker_id = (uintptr_t)__worker_id; + SQLtable *main_table; + SQLtable *data_table; + void *sqldb_conn; + const char *worker_command; + + /* wait for the initial setup */ + pthread_mutex_lock(&worker_setup_mutex); + while (!worker_setup_done) + { + pthread_cond_wait(&worker_setup_cond, + &worker_setup_mutex); + } + pthread_mutex_unlock(&worker_setup_mutex); + + /* + * OK, now worker:0 already has started. + */ + main_table = worker_tables[0]; + sqldb_conn = sqldb_server_connect(sqldb_hostname, + sqldb_port_num, + sqldb_username, + sqldb_password, + sqldb_database, + sqldb_session_configs, + sqldb_nestloop_options); + worker_command = sqldb_command_apply_worker_id(sqldb_command, worker_id); + data_table = sqldb_begin_query(sqldb_conn, + worker_command, + NULL, + main_table->sql_dict_list); + if (!data_table) + Elog("Empty results by the query: %s", sqldb_command); + data_table->segment_sz = batch_segment_sz; + /* enables embedded min/max statistics, if any */ + enable_embedded_stats(data_table); + /* print SQL */ + if (shows_progress) + printf("worker:%lu SQL=[%s]\n", worker_id, worker_command); + /* check compatibility */ + if (!IsSQLtableCompatible(main_table, data_table)) + Elog("Schema definition by the query in worker:%lu is not compatible: %s", + worker_id, worker_command); + worker_tables[worker_id] = data_table; + /* main loop to fetch and write results */ + sql2arrow_common(sqldb_conn, worker_id); + /* close the connection */ + sqldb_close_connection(sqldb_conn); + + if (shows_progress) + printf("worker:%lu terminated\n", worker_id); + + return NULL; +} + /* * Entrypoint of pg2arrow / mysql2arrow */ @@ -943,7 +1137,8 @@ int main(int argc, char * const argv[]) { int append_fdesc = -1; ArrowFileInfo af_info; - void *sqldb_state; + void *sqldb_conn; + const char *prime_command; SQLtable *table; ArrowKeyValue *kv; SQLdictionary *sql_dict_list = NULL; @@ -955,14 +1150,26 @@ int main(int argc, char * const argv[]) if (dump_arrow_filename) return dumpArrowFile(dump_arrow_filename); + /* setup workers */ + assert(num_worker_threads > 0); + worker_threads = palloc0(sizeof(pthread_t) * num_worker_threads); + worker_tables = palloc0(sizeof(SQLtable *) * num_worker_threads); + for (uintptr_t i = 1; i < num_worker_threads; i++) + { + if ((errno = pthread_create(&worker_threads[i], + NULL, + worker_main, + (void *)i)) != 0) + Elog("failed on pthread_create: %m"); + } /* open connection */ - sqldb_state = sqldb_server_connect(sqldb_hostname, - sqldb_port_num, - sqldb_username, - sqldb_password, - sqldb_database, - sqldb_session_configs, - sqldb_nestloop_options); + sqldb_conn = sqldb_server_connect(sqldb_hostname, + sqldb_port_num, + sqldb_username, + sqldb_password, + sqldb_database, + sqldb_session_configs, + sqldb_nestloop_options); /* read the original arrow file, if --append mode */ if (append_filename) { @@ -973,8 +1180,9 @@ int main(int argc, char * const argv[]) sql_dict_list = loadArrowDictionaryBatches(append_fdesc, &af_info); } /* begin SQL command execution */ - table = sqldb_begin_query(sqldb_state, - sqldb_command, + prime_command = sqldb_command_apply_worker_id(sqldb_command, 0); + table = sqldb_begin_query(sqldb_conn, + prime_command, append_filename ? &af_info : NULL, sql_dict_list); if (!table) @@ -1004,28 +1212,32 @@ int main(int argc, char * const argv[]) } /* write out dictionary batch, if any */ writeArrowDictionaryBatches(table); - - /* main loop to fetch and write result */ - while (sqldb_fetch_results(sqldb_state, table)) - { - if (table->usage > batch_segment_sz) - { - writeArrowRecordBatch(table); - shows_record_batch_progress(table, table->nitems); - sql_table_clear(table); - } - } + /* the primary SQLtable become visible to other workers */ + pthread_mutex_lock(&worker_setup_mutex); + worker_tables[0] = table; + worker_setup_done = true; + pthread_cond_broadcast(&worker_setup_cond); + pthread_mutex_unlock(&worker_setup_mutex); + /* main loop to fetch and write results*/ + sql2arrow_common(sqldb_conn, 0); if (table->nitems > 0) { - writeArrowRecordBatch(table); - shows_record_batch_progress(table, table->nitems); + ArrowBlock __block; + int __rb_index; + + __rb_index = writeArrowRecordBatch(table, &__block); + if (shows_progress) + shows_record_batch_progress(&__block, + __rb_index, + table->nitems, + 0); sql_table_clear(table); } /* write out footer portion */ writeArrowFooter(table); /* cleanup */ - sqldb_close_connection(sqldb_state); + sqldb_close_connection(sqldb_conn); close(table->fdesc); if (shows_progress) diff --git a/arrow-tools/sql2arrow.h b/arrow-tools/sql2arrow.h index 909c49631..a216115d7 100644 --- a/arrow-tools/sql2arrow.h +++ b/arrow-tools/sql2arrow.h @@ -1,6 +1,7 @@ #ifndef SQL2ARROW_H #define SQL2ARROW_H #include "arrow_ipc.h" +#include typedef struct userConfigOption userConfigOption; struct userConfigOption diff --git a/src/arrow_ipc.h b/src/arrow_ipc.h index ea0d9f50d..c43c621bd 100644 --- a/src/arrow_ipc.h +++ b/src/arrow_ipc.h @@ -148,6 +148,7 @@ struct SQLfield ArrowType arrow_type; /* type in apache arrow */ /* data save as Apache Arrow datum */ size_t (*put_value)(SQLfield *attr, const char *addr, int sz); + size_t (*move_value)(SQLfield *dest, const SQLfield *src, long index); int (*write_stat)(SQLfield *attr, char *buf, size_t len, const SQLstat__datum *stat_datum); /* data buffers of the field */ @@ -172,6 +173,12 @@ sql_field_put_value(SQLfield *column, const char *addr, int sz) return (column->__curr_usage__ = column->put_value(column, addr, sz)); } +static inline size_t +sql_field_move_value(SQLfield *dest, const SQLfield *src, long index) +{ + return (dest->__curr_usage__ = dest->move_value(dest, src, index)); +} + #ifndef FLEXIBLE_ARRAY_MEMBER #define FLEXIBLE_ARRAY_MEMBER #endif @@ -225,13 +232,22 @@ struct SQLdictionary }; /* arrow_write.c */ +extern bool IsSQLfieldCompatible(const SQLfield *field1, + const SQLfield *field2); +extern bool IsSQLtableCompatible(const SQLtable *table1, + const SQLtable *table2); extern void arrowFileWrite(SQLtable *table, const char *buffer, ssize_t length); extern void arrowFileWriteIOV(SQLtable *table); extern void writeArrowSchema(SQLtable *table); extern void writeArrowDictionaryBatches(SQLtable *table); -extern int writeArrowRecordBatch(SQLtable *table); +extern int writeArrowRecordBatch(SQLtable *table, + ArrowBlock *p_arrow_block); +extern int writeArrowRecordBatchMT(SQLtable *main_table, + SQLtable *data_table, + pthread_mutex_t *main_table_mutex, + ArrowBlock *p_arrow_block); extern void writeArrowFooter(SQLtable *table); extern size_t setupArrowRecordBatchIOV(SQLtable *table); @@ -388,6 +404,17 @@ sql_buffer_append_char(SQLbuffer *buf, int c, size_t len) assert(buf->usage <= buf->length); } +static inline bool +sql_buffer_getbit(const SQLbuffer *buf, size_t __index) +{ + size_t index = __index >> 3; + int mask = (1 << (__index & 7)); + + if (index < buf->length && (buf->data[index] & mask) != 0) + return true; + return false; +} + static inline void sql_buffer_setbit(SQLbuffer *buf, size_t __index) { diff --git a/src/arrow_write.c b/src/arrow_write.c index 5a9cdd27f..5efafc23d 100644 --- a/src/arrow_write.c +++ b/src/arrow_write.c @@ -13,6 +13,7 @@ #include "postgres.h" #endif #include +#include #include "arrow_ipc.h" /* alignment macros, if not */ @@ -886,6 +887,97 @@ createArrowFooter(ArrowFooter *node) return makeBufferFlatten(buf); } +/* ---------------------------------------------------------------- + * Compatibility Checks for SQLtable / SQLfield + * ---------------------------------------------------------------- */ +bool +IsSQLfieldCompatible(const SQLfield *field1, + const SQLfield *field2) +{ + if (field1->arrow_type.node.tag != field2->arrow_type.node.tag) + return false; +#define __COMPARE(__TYPE,__ATTR) \ + (field1->arrow_type.__TYPE.__ATTR != field2->arrow_type.__TYPE.__ATTR) + + switch (field1->arrow_type.node.tag) + { + case ArrowNodeTag__Int: + if (__COMPARE(Int, bitWidth) || + (field1->arrow_type.Int.is_signed && !field2->arrow_type.Int.is_signed)|| + (!field1->arrow_type.Int.is_signed && field2->arrow_type.Int.is_signed)) + return false; + break; + case ArrowNodeTag__FloatingPoint: + if (__COMPARE(FloatingPoint, precision)) + return false; + break; + case ArrowNodeTag__Utf8: + case ArrowNodeTag__Binary: + case ArrowNodeTag__Bool: + break; + case ArrowNodeTag__Decimal: + if (__COMPARE(Decimal, precision) || + __COMPARE(Decimal, scale) || + __COMPARE(Decimal, bitWidth)) + return false; + break; + case ArrowNodeTag__Date: + if (__COMPARE(Date, unit)) + return false; + break; + case ArrowNodeTag__Time: + if (__COMPARE(Time, unit)) + return false; + break; + case ArrowNodeTag__Timestamp: + if (__COMPARE(Timestamp, unit)) + return false; + break; + case ArrowNodeTag__Interval: + if (__COMPARE(Interval, unit)) + return false; + break; + case ArrowNodeTag__List: + case ArrowNodeTag__Struct: + break; + case ArrowNodeTag__FixedSizeBinary: + if (__COMPARE(FixedSizeBinary, byteWidth)) + return false; + break; + case ArrowNodeTag__FixedSizeList: + if (__COMPARE(FixedSizeList, listSize)) + return false; + break; + case ArrowNodeTag__LargeBinary: + case ArrowNodeTag__LargeUtf8: + case ArrowNodeTag__LargeList: + break; + default: + Elog("unknown / unsupported Arrow Type (%d)", + field1->arrow_type.node.tag); + } +#undef __COMPARE + return true; +} + +bool +IsSQLtableCompatible(const SQLtable *table1, + const SQLtable *table2) +{ + if (table1->nfields != table2->nfields) + return false; + if ((table1->has_statistics && !table2->has_statistics) || + (!table1->has_statistics && table2->has_statistics)) + return false; + for (int j=0; j < table1->nfields; j++) + { + if (!IsSQLfieldCompatible(&table1->columns[j], + &table2->columns[j])) + return false; + } + return true; +} + /* ---------------------------------------------------------------- * Routines for File I/O * ---------------------------------------------------------------- */ @@ -912,20 +1004,24 @@ arrowFileWrite(SQLtable *table, const char *buffer, ssize_t length) table->f_pos += length; } -void -arrowFileWriteIOV(SQLtable *table) +static size_t +__arrowFileWriteIOV(const char *filename, + int fdesc, + off_t f_pos, + SQLtable *table) { - int index = 0; - ssize_t nbytes; + int index = 0; + off_t f_pos_saved = f_pos; while (index < table->__iov_cnt) { - int __iov_cnt = table->__iov_cnt - index; + int __iov_cnt = table->__iov_cnt - index; + ssize_t nbytes; - nbytes = pwritev(table->fdesc, + nbytes = pwritev(fdesc, table->__iov + index, __iov_cnt < IOV_MAX ? __iov_cnt : IOV_MAX, - table->f_pos); + f_pos); if (nbytes < 0) { if (errno == EINTR) @@ -937,7 +1033,7 @@ arrowFileWriteIOV(SQLtable *table) Elog("unable to write on '%s' any more", table->filename); } - table->f_pos += nbytes; + f_pos += nbytes; while (index < table->__iov_cnt && nbytes >= table->__iov[index].iov_len) { @@ -952,6 +1048,17 @@ arrowFileWriteIOV(SQLtable *table) } } table->__iov_cnt = 0; + + return (f_pos - f_pos_saved); +} + +void +arrowFileWriteIOV(SQLtable *table) +{ + table->f_pos += __arrowFileWriteIOV(table->filename, + table->fdesc, + table->f_pos, + table); } static void @@ -1636,25 +1743,25 @@ setupArrowRecordBatchIOV(SQLtable *table) } static void -__saveArrowRecordBatchStats(int rb_index, SQLfield *field) +__saveArrowRecordBatchStats(SQLfield *main_field, + SQLfield *data_field, int rb_index) { - if (field->stat_datum.is_valid) + if (data_field->stat_datum.is_valid) { SQLstat *item = palloc(sizeof(SQLstat)); /* save the SQLstat item for the record-batch */ - memcpy(item, &field->stat_datum, sizeof(SQLstat)); + memcpy(item, &data_field->stat_datum, sizeof(SQLstat)); item->rb_index = rb_index; - item->next = field->stat_list; - field->stat_list = item; - + item->next = main_field->stat_list; + main_field->stat_list = item; /* reset statistics */ - memset(&field->stat_datum, 0, sizeof(SQLstat)); + memset(&data_field->stat_datum, 0, sizeof(SQLstat)); } } int -writeArrowRecordBatch(SQLtable *table) +writeArrowRecordBatch(SQLtable *table, ArrowBlock *p_arrow_block) { ArrowBlock block; size_t length; @@ -1678,12 +1785,72 @@ writeArrowRecordBatch(SQLtable *table) { for (j=0; j < table->nfields; j++) { - SQLfield *field = &table->columns[j]; + if (table->columns[j].stat_enabled) + __saveArrowRecordBatchStats(&table->columns[j], + &table->columns[j], rb_index); + } + } + if (p_arrow_block) + memcpy(p_arrow_block, &block, sizeof(ArrowBlock)); + return rb_index; +} - if (field->stat_enabled) - __saveArrowRecordBatchStats(rb_index, field); +int +writeArrowRecordBatchMT(SQLtable *main_table, + SQLtable *data_table, + pthread_mutex_t *main_table_mutex, + ArrowBlock *p_arrow_block) +{ + ArrowBlock block; + size_t length; + size_t meta_sz; + int rb_index; + int fdesc; + off_t f_pos; + const char *filename; + + data_table->__iov_cnt = 0; /* reset iov */ + length = setupArrowRecordBatchIOV(data_table); + assert(data_table->__iov_cnt > 0 && + data_table->__iov[0].iov_len <= length); + meta_sz = data_table->__iov[0].iov_len; /* metadata chunk */ + + /* critical section */ + if ((errno = pthread_mutex_lock(main_table_mutex)) != 0) + Elog("failed on pthread_mutex_lock: %m"); + + /* move forward the destination file pointer */ + filename = main_table->filename; + fdesc = main_table->fdesc; + f_pos = main_table->f_pos; + main_table->f_pos += length; + + initArrowNode(&block, Block); + block.offset = f_pos; + block.metaDataLength = meta_sz; + block.bodyLength = length - meta_sz; + + /* register the record batch and statistics */ + rb_index = sql_table_append_record_batch(main_table, &block); + if (main_table->has_statistics) + { + assert(data_table->has_statistics); + for (int j=0; j < data_table->nfields; j++) + { + if (main_table->columns[j].stat_enabled) + __saveArrowRecordBatchStats(&main_table->columns[j], + &data_table->columns[j], rb_index); } } + if ((errno = pthread_mutex_unlock(main_table_mutex)) != 0) + Elog("failed on pthread_mutex_unlock: %m"); + + /* write file i/o */ + __arrowFileWriteIOV(filename, fdesc, f_pos, data_table); + + /* result back */ + if (p_arrow_block) + memcpy(p_arrow_block, &block, sizeof(ArrowBlock)); return rb_index; }