Skip to content

Commit

Permalink
iimprovements
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm committed Jan 31, 2025
1 parent ff48d97 commit f550a5e
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 26 deletions.
2 changes: 1 addition & 1 deletion sql/sparse_index.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION _timescaledb_functions.ts_bloom1_matches(bytea, anyelement)
RETURNS bool
AS '@MODULE_PATHNAME@', 'ts_bloom1_matches'
LANGUAGE C IMMUTABLE STRICT;
LANGUAGE C IMMUTABLE PARALLEL SAFE;
24 changes: 24 additions & 0 deletions src/utils/bloom1_sparse_index_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
#pragma once

#include <postgres.h>

#include <common/hashfn.h>

#define BLOOM1_HASHES 4
Expand All @@ -18,3 +20,25 @@ bloom1_get_one_hash(uint32 value_hash, uint32 index)
const uint32 h2 = hash_combine(value_hash, BLOOM1_SEED_2);
return h1 + index * h2 + index * index;
}

static inline int
bloom1_bytea_alloc_size(int num_bits)
{
const int words = (num_bits + 63) / 64;
const int header = TYPEALIGN(8, VARHDRSZ);
return header + words * 8;
}

static inline uint64 *
bloom1_words(bytea *bloom)
{
uint64 *ptr = (uint64 *) TYPEALIGN(sizeof(ptr), VARDATA(bloom));
return ptr;
}

static inline int
bloom1_num_bits(const bytea *bloom)
{
const uint64 *words = bloom1_words((bytea *) bloom);
return 8 * (VARSIZE_ANY(bloom) + (char *) bloom - (char *) words);
}
22 changes: 20 additions & 2 deletions src/utils/ts_bloom1_matches.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ TS_FUNCTION_INFO_V1(ts_bloom1_matches);
Datum
ts_bloom1_matches(PG_FUNCTION_ARGS)
{
/*
* This function is not strict, because if we don't have a bloom filter, this
* means the condition can potentially be true.
*/
if (PG_ARGISNULL(0))
{
PG_RETURN_BOOL(true);
}

/*
* A null value cannot match the equality condition, although this probably
* should be optimized away by the planner.
*/
if (PG_ARGISNULL(1))
{
PG_RETURN_BOOL(false);
}

Oid val_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
Ensure(OidIsValid(val_type), "cannot determine argument type");
TypeCacheEntry *val_entry = lookup_type_cache(val_type, TYPECACHE_HASH_PROC);
Expand All @@ -33,8 +51,8 @@ ts_bloom1_matches(PG_FUNCTION_ARGS)

/* compute the requested number of hashes */
bytea *bloom = PG_GETARG_VARLENA_PP(0);
const int nbits = VARSIZE_ANY_EXHDR(bloom) * 8;
const uint64 *words = (const uint64 *) VARDATA_ANY(bloom);
const int nbits = bloom1_num_bits(bloom);
const uint64 *words = bloom1_words(bloom);
const int word_bits = sizeof(*words) * 8;
bool match = true;
for (int i = 0; i < BLOOM1_HASHES; i++)
Expand Down
61 changes: 50 additions & 11 deletions tsl/src/compression/batch_metadata_builder_bloom1.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "batch_metadata_builder.h"

#include "arrow_c_data_interface.h"

#include "utils/bloom1_sparse_index_params.h"

typedef struct Bloom1MetadataBuilder
Expand All @@ -22,7 +24,6 @@ typedef struct Bloom1MetadataBuilder

Oid type_oid;
bool empty;
bool has_null;

bool type_by_val;
int16 type_len;
Expand Down Expand Up @@ -65,7 +66,6 @@ batch_metadata_builder_bloom1_create(Oid type_oid, int bloom_attr_offset)
},
.type_oid = type_oid,
.empty = true,
.has_null = false,
.type_by_val = type->typbyval,
.type_len = type->typlen,
.bloom_attr_offset = bloom_attr_offset,
Expand All @@ -77,7 +77,7 @@ batch_metadata_builder_bloom1_create(Oid type_oid, int bloom_attr_offset)
};

Assert(builder->nbits % 64 == 0);
const int bytea_size = VARHDRSZ + builder->nbits / 8;
const int bytea_size = bloom1_bytea_alloc_size(builder->nbits);
builder->bloom_bytea = palloc0(bytea_size);
SET_VARSIZE(builder->bloom_bytea, bytea_size);

Expand All @@ -96,8 +96,8 @@ bloom1_update_val(void *builder_, Datum val)
DatumGetUInt32(OidFunctionCall1Coll(hash_proc_oid, C_COLLATION_OID, val));

/* compute the requested number of hashes */
const int nbits = builder->nbits;
uint64 *restrict words = (uint64 *restrict) VARDATA(builder->bloom_bytea);
const int nbits = bloom1_num_bits(builder->bloom_bytea);
uint64 *restrict words = bloom1_words(builder->bloom_bytea);
const int word_bits = sizeof(*words) * 8;
for (int i = 0; i < BLOOM1_HASHES; i++)
{
Expand All @@ -111,18 +111,58 @@ bloom1_update_val(void *builder_, Datum val)
void
bloom1_update_null(void *builder_)
{
Bloom1MetadataBuilder *builder = (Bloom1MetadataBuilder *) builder_;
builder->has_null = true;
/*
* A null value cannot match an equality condition that we're optimizing
* with bloom filters, so we don't need to consider them here.
*/
}

PG_USED_FOR_ASSERTS_ONLY static int
bloom1_estimate_ndistinct(bytea *bloom)
{
const int nbits = bloom1_num_bits(bloom);
const uint64 *words = bloom1_words(bloom);
const int nset = arrow_num_valid(words, nbits);
return -(nbits / BLOOM1_HASHES) * log(1 - nset / (double) nbits);
}

static void
bloom1_insert_to_compressed_row(void *builder_, RowCompressor *compressor)
{
Bloom1MetadataBuilder *builder = (Bloom1MetadataBuilder *) builder_;

compressor->compressed_is_null[builder->bloom_attr_offset] = !builder->empty;
compressor->compressed_values[builder->bloom_attr_offset] =
PointerGetDatum(builder->bloom_bytea);
const int bits_set =
arrow_num_valid(bloom1_words(builder->bloom_bytea), bloom1_num_bits(builder->bloom_bytea));

if (bits_set == 0)
{
/*
* All elements turned out to be null, don't save the empty filter in
* that case.
*/
compressor->compressed_is_null[builder->bloom_attr_offset] = true;
compressor->compressed_values[builder->bloom_attr_offset] = NULL;
}
else
{
/*
* There is a simple compression technique for filters that turn out
* very sparse: you split the filter in half and bitwise OR the halves.
* Repeat this until you reach the occupancy that gives the desired
* false positive ratio, e.g. our case with 4 hashes the 1/3 occupancy
* would give 1% false positives. We don't apply it at the moment, the
* TOAST compression should help somewhat for sparse filters.
*/
compressor->compressed_is_null[builder->bloom_attr_offset] = false;
compressor->compressed_values[builder->bloom_attr_offset] =
PointerGetDatum(builder->bloom_bytea);
}

fprintf(stderr,
"bloom filter %d bits %d set %d estimate\n",
builder->nbits,
bits_set,
bloom1_estimate_ndistinct(builder->bloom_bytea));
}

static void
Expand All @@ -131,7 +171,6 @@ bloom1_reset(void *builder_, RowCompressor *compressor)
Bloom1MetadataBuilder *builder = (Bloom1MetadataBuilder *) builder_;

builder->empty = true;
builder->has_null = false;

builder->nbits_set = 0;
memset(VARDATA(builder->bloom_bytea), 0, VARSIZE_ANY_EXHDR(builder->bloom_bytea));
Expand Down
180 changes: 175 additions & 5 deletions tsl/test/expected/compress_bloom_sparse.out
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ Indexes:
"compress_hyper_2_2_chunk__ts_meta_min_1__ts_meta_max_1_idx" btree (_ts_meta_min_1, _ts_meta_max_1)
Options: toast_tuple_target=128

--\set VERBOSITY verbose
--select * from :chunk;
--CREATE OR REPLACE FUNCTION ts_bloom1_matches(anyelement, bytea) RETURNS bloom
--AS :TSL_MODULE_PATHNAME, 'ts_bloom1_matches'
--LANGUAGE C IMMUTABLE STRICT;
explain (analyze, verbose, costs off, timing off, summary off)
select count(*) from bloom where value = md5(7248::text);
QUERY PLAN
Expand All @@ -68,3 +63,178 @@ select count(*) from bloom where value = md5(7248::text);
1
(1 row)

-- The join condition is not pushed down to the compressed scan for some reason.
set enable_mergejoin to off;
set enable_hashjoin to off;
explain (analyze, verbose, costs off, timing off, summary off)
with query(value) as materialized (values (md5(3516::text)), (md5(9347::text)),
(md5(5773::text)))
select count(*) from bloom natural join query;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
Output: count(*)
CTE query
-> Values Scan on "*VALUES*" (actual rows=3 loops=1)
Output: "*VALUES*".column1
-> Nested Loop (actual rows=3 loops=1)
Join Filter: (_hyper_1_1_chunk.value = query.value)
Rows Removed by Join Filter: 29997
-> CTE Scan on query (actual rows=3 loops=1)
Output: query.value
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=10000 loops=3)
Output: _hyper_1_1_chunk.value
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10 loops=3)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.ts, compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, compress_hyper_2_2_chunk.value
(15 rows)

;
with query(value) as materialized (values (md5(3516::text)), (md5(9347::text)),
(md5(5773::text)))
select count(*) from bloom natural join query;
count
-------
3
(1 row)

;
reset enable_mergejoin;
reset enable_hashjoin;
-- Stable expression that yields null
set timescaledb.enable_chunk_append to off;
explain (analyze, verbose, costs off, timing off, summary off)
select count(*) from bloom where value =
case when now() < '1970-01-01' then md5(2345::text) else null end
;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=0 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.value = CASE WHEN (now() < 'Thu Jan 01 00:00:00 1970 PST'::timestamp with time zone) THEN '81b073de9370ea873f548e31b8adc081'::text ELSE NULL::text END)
Rows Removed by Filter: 10000
Batches Removed by Filter: 10
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.ts, compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, compress_hyper_2_2_chunk.value
(9 rows)

reset timescaledb.enable_chunk_append;
-- Stable expression that yields not null
explain (analyze, verbose, costs off, timing off, summary off)
select count(*) from bloom where value =
case when now() < '1970-01-01' then md5(2345::text) else md5(5837::text) end
;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Custom Scan (ChunkAppend) on public.bloom (actual rows=1 loops=1)
Startup Exclusion: true
Runtime Exclusion: false
Chunks excluded during startup: 0
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=1 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.value = CASE WHEN (now() < 'Thu Jan 01 00:00:00 1970 PST'::timestamp with time zone) THEN '81b073de9370ea873f548e31b8adc081'::text ELSE 'd1e39c9bda5c80ac3d8ea9d658163967'::text END)
Rows Removed by Filter: 9999
Batches Removed by Filter: 9
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.ts, compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, compress_hyper_2_2_chunk.value
(13 rows)

-- Stable expression on minmax index
explain (analyze, verbose, costs off, timing off, summary off)
select count(*) from bloom where ts <
case when now() < '1970-01-01' then 1 else 1000 end
;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Custom Scan (ChunkAppend) on public.bloom (actual rows=999 loops=1)
Startup Exclusion: true
Runtime Exclusion: false
Chunks excluded during startup: 0
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=999 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.ts < CASE WHEN (now() < 'Thu Jan 01 00:00:00 1970 PST'::timestamp with time zone) THEN 1 ELSE 1000 END)
Rows Removed by Filter: 9001
Batches Removed by Filter: 9
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.ts, compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, compress_hyper_2_2_chunk.value
(13 rows)

-- Parameter on minmax index
set plan_cache_mode to 'force_generic_plan';
prepare p as
select count(*) from bloom where ts < $1;
explain (analyze, verbose, costs off, timing off, summary off)
execute p(1000);
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Custom Scan (ChunkAppend) on public.bloom (actual rows=999 loops=1)
Startup Exclusion: true
Runtime Exclusion: false
Chunks excluded during startup: 0
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=999 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.ts < $1)
Rows Removed by Filter: 1
Bulk Decompression: true
-> Index Scan using compress_hyper_2_2_chunk__ts_meta_min_1__ts_meta_max_1_idx on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=1 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.ts, compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, compress_hyper_2_2_chunk.value
Index Cond: (compress_hyper_2_2_chunk._ts_meta_min_1 < $1)
(13 rows)

deallocate p;
-- Parameter on bloom index
prepare p as
select count(*) from bloom where value = $1;
explain (analyze, verbose, costs off, timing off, summary off)
execute p(md5('2345'));
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Custom Scan (ChunkAppend) on public.bloom (actual rows=1 loops=1)
Startup Exclusion: true
Runtime Exclusion: false
Chunks excluded during startup: 0
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=1 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.value = $1)
Rows Removed by Filter: 1999
Batches Removed by Filter: 1
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=2 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.ts, compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, compress_hyper_2_2_chunk.value
Filter: _timescaledb_functions.ts_bloom1_matches(compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, $1)
Rows Removed by Filter: 8
(15 rows)

deallocate p;
-- Function of parameter on bloom index
prepare p as
select count(*) from bloom where value = md5($1);
explain (analyze, verbose, costs off, timing off, summary off)
execute p('2345');
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Custom Scan (ChunkAppend) on public.bloom (actual rows=1 loops=1)
Startup Exclusion: true
Runtime Exclusion: false
Chunks excluded during startup: 0
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=1 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.value = md5($1))
Rows Removed by Filter: 9999
Batches Removed by Filter: 9
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.ts, compress_hyper_2_2_chunk._ts_meta_v2_bloom1_value, compress_hyper_2_2_chunk.value
(13 rows)

deallocate p;
reset plan_cache_mode;
Loading

0 comments on commit f550a5e

Please sign in to comment.