Skip to content

Commit

Permalink
pg2arrow: launch worker threads more rapidly
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed Mar 31, 2024
1 parent 24b79b6 commit fa42d67
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
22 changes: 15 additions & 7 deletions arrow-tools/pgsql_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1034,17 +1034,16 @@ sqldb_begin_query(void *sqldb_state,
if (PQntuples(res) != 1 || PQnfields(res) != 1)
Elog("unexpected result for pg_export_snapshot()");
snapshot_identifier = pstrdup(PQgetvalue(res, 0, 0));
printf("snapshot_identifier = [%s]\n", snapshot_identifier);
PQclear(res);
}
else
{
char query[200];
char temp[200];

snprintf(query, sizeof(query),
snprintf(temp, sizeof(temp),
"SET TRANSACTION SNAPSHOT '%s'",
snapshot_identifier);
res = PQexec(conn, query);
res = PQexec(conn, temp);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
Elog("unable to import transaction shapshot: %s",
PQresultErrorMessage(res));
Expand All @@ -1059,9 +1058,18 @@ sqldb_begin_query(void *sqldb_state,
Elog("unable to declare a SQL cursor: %s", PQresultErrorMessage(res));
PQclear(res);

/* move to the first tuple(-set) */
if (!pgsql_move_next(pgstate, NULL))
return NULL;
/* fetch schema definition */
res = PQexecParams(conn,
"FETCH FORWARD 0 FROM " CURSOR_NAME,
0, NULL, NULL, NULL, NULL,
1); /* results in binary mode */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
Elog("SQL execution failed: %s", PQresultErrorMessage(res));
pgstate->res = res;
pgstate->nitems = PQntuples(res);
pgstate->index = 0;
assert(pgstate->nitems == 0);

return pgsql_create_buffer(pgstate,
af_info,
dictionary_list);
Expand Down
1 change: 0 additions & 1 deletion arrow-tools/sql2arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,6 @@ parse_options(int argc, char * const argv[])
worker_dist_keys = repalloc(worker_dist_keys,
sizeof(const char *) * nrooms);
}
puts(tok);
worker_dist_keys[nitems++] = tok;
}
num_worker_threads = nitems;
Expand Down

0 comments on commit fa42d67

Please sign in to comment.