Skip to content

Commit

Permalink
add basic 'cube' data type support
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed Nov 26, 2023
1 parent 0d45b42 commit 323340a
Show file tree
Hide file tree
Showing 7 changed files with 742 additions and 74 deletions.
3 changes: 2 additions & 1 deletion arrow-tools/arrow_pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,8 @@ assignArrowTypePgSQL(SQLfield *column,
if (extname != NULL)
{
/* contrib/cube (relocatable) */
if (strcmp(extname, "cube") == 0 &&
if (strcmp(typname, "cube") == 0 &&
strcmp(extname, "cube") == 0 &&
strcmp(extschema, typnamespace) == 0)
{
__assignArrowTypeHint(column, typname, typnamespace);
Expand Down
170 changes: 97 additions & 73 deletions src/arrow_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,13 @@ arrowFieldGetPGTypeHint(const ArrowField *field)
for (int i=0; i < field->_num_custom_metadata; i++)
{
ArrowKeyValue *kv = &field->custom_metadata[i];
char *namebuf, *pos;
Oid extension_oid = InvalidOid;
Oid namespace_oid = PG_CATALOG_NAMESPACE;
HeapTuple tup;
Oid hint_oid;
bool namespace_specified = false;
char *namebuf, *pos;

/* pg_type = NAMESPACE.TYPENAME@EXTENSION */
if (strcmp(kv->key, "pg_type") != 0)
continue;
namebuf = alloca(kv->_value_len + 10);
Expand All @@ -221,17 +224,50 @@ arrowFieldGetPGTypeHint(const ArrowField *field)
if (!OidIsValid(namespace_oid))
continue;
namebuf = pos;
namespace_specified = true;
}
pos = strchr(namebuf, '@');
if (pos)
{
*pos++ = '\0';
extension_oid = get_extension_oid(pos, true);
if (!OidIsValid(extension_oid))
continue;
}
tup = SearchSysCache2(TYPENAMENSP,
PointerGetDatum(namebuf),
ObjectIdGetDatum(namespace_oid));
if (HeapTupleIsValid(tup))
/* 1st try: user specified namespace or 'pg_catalog' */
hint_oid = GetSysCacheOid2(TYPENAMENSP,
Anum_pg_type_oid,
CStringGetDatum(namebuf),
ObjectIdGetDatum(namespace_oid));
if (OidIsValid(hint_oid))
{
Oid hint = ((Form_pg_type) GETSTRUCT(tup))->oid;
if (!OidIsValid(extension_oid) ||
getExtensionOfObject(TypeRelationId,
hint_oid) == extension_oid)
return hint_oid;
}
/* 2nd try: any namespace (if not specified) */
if (!namespace_specified)
{
CatCList *typelist;
HeapTuple htup;

ReleaseSysCache(tup);
typelist = SearchSysCacheList1(TYPENAMENSP,
CStringGetDatum(namebuf));
for (int k=0; k < typelist->n_members; k++)
{
htup = &typelist->members[k]->tuple;
hint_oid = ((Form_pg_type) GETSTRUCT(htup))->oid;

return hint;
if (!OidIsValid(extension_oid) ||
getExtensionOfObject(TypeRelationId,
hint_oid) == extension_oid)
{
ReleaseCatCacheList(typelist);
return hint_oid;
}
}
ReleaseCatCacheList(typelist);
}
}
return InvalidOid;
Expand Down Expand Up @@ -1301,11 +1337,7 @@ __arrowFieldTypeToPGType(const ArrowField *field,
{
case 8:
attopts.unitsz = sizeof(int8_t);
type_oid =
GetSysCacheOid2(TYPENAMENSP,
Anum_pg_type_oid,
CStringGetDatum("int1"),
ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
type_oid = get_int1_type_oid(false);
break;
case 16:
attopts.unitsz = sizeof(int16_t);
Expand Down Expand Up @@ -1333,11 +1365,7 @@ __arrowFieldTypeToPGType(const ArrowField *field,
{
case ArrowPrecision__Half:
attopts.unitsz = sizeof(float2_t);
type_oid =
GetSysCacheOid2(TYPENAMENSP,
Anum_pg_type_oid,
CStringGetDatum("float2"),
ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
type_oid = get_float2_type_oid(false);
break;
case ArrowPrecision__Single:
attopts.unitsz = sizeof(float4_t);
Expand Down Expand Up @@ -1490,13 +1518,21 @@ __arrowFieldTypeToPGType(const ArrowField *field,
case ArrowNodeTag__Binary:
attopts.tag = ArrowType__Binary;
attopts.unitsz = sizeof(uint32_t);
type_oid = BYTEAOID;
if (OidIsValid(hint_oid) &&
hint_oid == get_cube_type_oid(true))
type_oid = hint_oid;
else
type_oid = BYTEAOID;
break;

case ArrowNodeTag__LargeBinary:
attopts.tag = ArrowType__LargeBinary;
attopts.unitsz = sizeof(uint64_t);
type_oid = BYTEAOID;
if (OidIsValid(hint_oid) &&
hint_oid == get_cube_type_oid(true))
type_oid = hint_oid;
else
type_oid = BYTEAOID;
break;

case ArrowNodeTag__List:
Expand Down Expand Up @@ -2786,54 +2822,47 @@ static void pg_datum_arrow_ref(kern_data_store *kds,
size_t index,
Datum *p_datum,
bool *p_isnull);

static Datum
pg_varlena32_arrow_ref(kern_data_store *kds,
kern_colmeta *cmeta, size_t index)
kern_colmeta *cmeta,
size_t index, bool *p_isnull)
{
uint32_t *offset = (uint32_t *)((char *)kds +
__kds_unpack(cmeta->values_offset));
char *extra = (char *)kds + __kds_unpack(cmeta->extra_offset);
uint32_t len;
struct varlena *res;

if (sizeof(uint32_t) * (index+2) > __kds_unpack(cmeta->values_length))
elog(ERROR, "corruption? varlena index out of range");
len = offset[index+1] - offset[index];
if (offset[index] > offset[index+1] ||
offset[index+1] > __kds_unpack(cmeta->extra_length))
elog(ERROR, "corruption? varlena points out of extra buffer");
if (len >= (1UL<<VARLENA_EXTSIZE_BITS) - VARHDRSZ)
elog(ERROR, "variable size too large");
res = palloc(VARHDRSZ + len);
SET_VARSIZE(res, VARHDRSZ + len);
memcpy(VARDATA(res), extra + offset[index], len);
struct varlena *res = NULL;
const void *addr;
int length;

addr = KDS_ARROW_REF_VARLENA32_DATUM(kds, cmeta, index, &length);
if (!addr)
*p_isnull = true;
else
{
*p_isnull = false;
res = palloc(VARHDRSZ + length);
memcpy(res->vl_dat, addr, length);
SET_VARSIZE(res, VARHDRSZ + length);
}
return PointerGetDatum(res);
}

static Datum
pg_varlena64_arrow_ref(kern_data_store *kds,
kern_colmeta *cmeta, size_t index)
kern_colmeta *cmeta,
size_t index, bool *p_isnull)
{
uint64_t *offset = (uint64_t *)((char *)kds +
__kds_unpack(cmeta->values_offset));
char *extra = (char *)kds + __kds_unpack(cmeta->extra_offset);
uint64_t len;
struct varlena *res;

if (sizeof(uint64_t) * (index+2) > __kds_unpack(cmeta->values_length))
elog(ERROR, "corruption? varlena index out of range");
len = offset[index+1] - offset[index];
if (offset[index] > offset[index+1] ||
offset[index+1] > __kds_unpack(cmeta->extra_length))
elog(ERROR, "corruption? varlena points out of extra buffer");
if (len >= (1UL<<VARLENA_EXTSIZE_BITS) - VARHDRSZ)
elog(ERROR, "variable size too large");
res = palloc(VARHDRSZ + len);
SET_VARSIZE(res, VARHDRSZ + len);
memcpy(VARDATA(res), extra + offset[index], len);
struct varlena *res = NULL;
const void *addr;
int length;

addr = KDS_ARROW_REF_VARLENA64_DATUM(kds, cmeta, index, &length);
if (!addr)
*p_isnull = true;
else
{
*p_isnull = false;
res = palloc(VARHDRSZ + length);
memcpy(res->vl_dat, addr, length);
SET_VARSIZE(res, VARHDRSZ + length);
}
return PointerGetDatum(res);
}

Expand Down Expand Up @@ -2863,11 +2892,12 @@ pg_bool_arrow_ref(kern_data_store *kds,
{
uint8_t *bitmap = (uint8_t *)kds + __kds_unpack(cmeta->values_offset);
size_t length = __kds_unpack(cmeta->values_length);
uint8_t mask = (1 << (index & 7));
bool rv;

if (sizeof(uint8_t) * index >= length)
if (sizeof(uint8_t) * (index>>3) >= length)
elog(ERROR, "corruption? bool points out of range");
return BoolGetDatum((bitmap[index>>3] & mask) != 0 ? true : false);
rv = ((bitmap[index>>3] & (1<<(index&7))) != 0);
return BoolGetDatum(rv);
}

static Datum
Expand Down Expand Up @@ -3196,18 +3226,12 @@ pg_datum_arrow_ref(kern_data_store *kds,
Datum datum = 0;
bool isnull = false;

if (cmeta->nullmap_offset != 0)
if (KDS_ARROW_CHECK_ISNULL(kds, cmeta, index))
{
size_t nullmap_offset = __kds_unpack(cmeta->nullmap_offset);
uint8 *nullmap = (uint8 *)kds + nullmap_offset;

if (att_isnull(index, nullmap))
{
isnull = true;
goto out;
}
isnull = true;
goto out;
}

switch (cmeta->attopts.tag)
{
case ArrowType__Int:
Expand All @@ -3234,11 +3258,11 @@ pg_datum_arrow_ref(kern_data_store *kds,
break;
case ArrowType__Utf8:
case ArrowType__Binary:
datum = pg_varlena32_arrow_ref(kds, cmeta, index);
datum = pg_varlena32_arrow_ref(kds, cmeta, index, &isnull);
break;
case ArrowType__LargeUtf8:
case ArrowType__LargeBinary:
datum = pg_varlena64_arrow_ref(kds, cmeta, index);
datum = pg_varlena64_arrow_ref(kds, cmeta, index, &isnull);
break;

case ArrowType__FixedSizeBinary:
Expand Down
Loading

0 comments on commit 323340a

Please sign in to comment.