Skip to content

Commit

Permalink
fix: distinct cache panic on projection pushdown (#25988)
Browse files Browse the repository at this point in the history
Fixed a bug in the distinct cache where projection that skipped column
in the cache hierarchy caused a panic.

This simplifies the display of the projection in the DistinctCacheExec
in EXPLAIN output to not include the column index, and only the name.
  • Loading branch information
hiltontj authored Feb 11, 2025
1 parent dca5350 commit 04f10ad
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 30 deletions.
8 changes: 8 additions & 0 deletions influxdb3_cache/src/distinct_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::{ColumnId, TableId};
use influxdb3_wal::{DistinctCacheDefinition, FieldData, Row};
use iox_time::TimeProvider;
use observability_deps::tracing::debug;
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -229,6 +230,13 @@ impl DistinctCache {
projection: Option<&[usize]>,
limit: Option<usize>,
) -> Result<RecordBatch, ArrowError> {
debug!(
?schema,
?predicates,
?projection,
?limit,
">>> distinct cache record batches"
);
let n_columns = projection
.as_ref()
.and_then(|p| p.iter().max().copied())
Expand Down
117 changes: 98 additions & 19 deletions influxdb3_cache/src/distinct_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ mod tests {
/// EXPLAIN on the same query. The EXPLAIN output contains a line for the DistinctCacheExec, which
/// is the custom execution plan impl for the distinct value cache that captures the predicates that
/// are pushed down to the underlying [`DistinctCacahe::to_record_batch`] method, if any.
#[tokio::test]
#[test_log::test(tokio::test)]
async fn test_datafusion_distinct_cache_udtf() {
// create a test writer and do a write in to populate the catalog with a db/table:
let writer = TestWriter::new();
Expand Down Expand Up @@ -508,7 +508,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -522,7 +522,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -536,7 +536,7 @@ mod tests {
"| us-east | a |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -551,7 +551,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -565,7 +565,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -584,7 +584,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -603,7 +603,7 @@ mod tests {
"| eu-west | l |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -619,7 +619,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -634,7 +634,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -649,7 +649,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -664,7 +664,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -683,7 +683,7 @@ mod tests {
"| us-west |",
"+---------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -702,7 +702,7 @@ mod tests {
"| us-west |",
"+---------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1",
explain_contains: "DistinctCacheExec: projection=[region] inner=MemoryExec: partitions=1, partition_sizes=[1",
// it seems that DISTINCT changes around the order of results
use_sorted_assert: true,
},
Expand All @@ -727,7 +727,7 @@ mod tests {
"| l |", // commenting for no new line
"+------+", // commenting for no new line
],
explain_contains: "DistinctCacheExec: projection=[host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[host] inner=MemoryExec: partitions=1, partition_sizes=[1]",
// this column will not be sorted since the order of elements depends on the next level
// up in the cache, so the `region` column is iterated over in order, but the nested
// `host` values, although sorted within `region`s, will not be globally sorted.
Expand All @@ -743,7 +743,7 @@ mod tests {
"| f |", // commenting for no new line
"+------+", // commenting for no new line
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -763,7 +763,7 @@ mod tests {
"| eu-west | l |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -779,7 +779,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -796,7 +796,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region, host] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
];
Expand Down Expand Up @@ -838,4 +838,83 @@ mod tests {
);
}
}

#[test_log::test(tokio::test)]
async fn test_projection_pushdown_indexing() {
let writer = TestWriter::new();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let _ = writer.write_lp_to_rows(
"\
wind_data,city=Berlin,country=Germany,county=Berlin wind_speed=14.63,wind_direction=270i\n\
",
0,
);
let table_def = writer.db_schema().table_definition("wind_data").unwrap();
let column_ids: Vec<ColumnId> = ["country", "county", "city"]
.into_iter()
.map(|name| table_def.column_name_to_id_unchecked(name))
.collect();

let distinct_provider =
DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap();
distinct_provider
.create_cache(
writer.db_schema().id,
None,
CreateDistinctCacheArgs {
table_def,
max_cardinality: Default::default(),
max_age: Default::default(),
column_ids,
},
)
.unwrap();

let write_batch = writer.write_lp_to_write_batch(
"\
wind_data,city=Berlin,country=Germany,county=Berlin wind_speed=14.63,wind_direction=270i\n\
wind_data,city=Hamburg,country=Germany,county=Hamburg wind_speed=19.8,wind_direction=26i\n\
wind_data,city=Munich,country=Germany,county=Bavaria wind_speed=11.77,wind_direction=227i\n\
wind_data,city=Cologne,country=Germany,county=North\\ Rhine-Westphalia wind_speed=12.44,wind_direction=339i\n\
wind_data,city=Frankfurt,country=Germany,county=Hesse wind_speed=18.97,wind_direction=96i\n\
wind_data,city=Stuttgart,country=Germany,county=Baden-Württemberg wind_speed=12.75,wind_direction=332i\n\
wind_data,city=Dortmund,country=Germany,county=North\\ Rhine-Westphalia wind_speed=12.03,wind_direction=146i\n\
wind_data,city=Paris,country=France,county=Île-de-France wind_speed=10.3,wind_direction=302i\n\
wind_data,city=Marseille,country=France,county=Provence-Alpes-Côte\\ d'Azur wind_speed=24.65,wind_direction=288i\n\
wind_data,city=Lyon,country=France,county=Auvergne-Rhône-Alpes wind_speed=17.83,wind_direction=288i\n\
wind_data,city=Toulouse,country=France,county=Occitanie wind_speed=20.34,wind_direction=157i\n\
wind_data,city=Madrid,country=Spain,county=Community\\ of\\ Madrid wind_speed=9.36,wind_direction=348i\n\
wind_data,city=Barcelona,country=Spain,county=Catalonia wind_speed=16.52,wind_direction=14i\n\
", 100);
let wal_contents = influxdb3_wal::create::wal_contents(
(0, 100, 1),
[influxdb3_wal::create::write_batch_op(write_batch)],
);
distinct_provider.write_wal_contents_to_cache(&wal_contents);

let ctx = SessionContext::new();
let distinct_func =
DistinctCacheFunction::new(writer.db_schema().id, Arc::clone(&distinct_provider));
ctx.register_udtf(DISTINCT_CACHE_UDTF_NAME, Arc::new(distinct_func));

let results = ctx
.sql("select country, city from distinct_cache('wind_data') where country = 'Spain'")
.await
.unwrap()
.collect()
.await
.unwrap();

assert_batches_eq!(
[
"+---------+-----------+",
"| country | city |",
"+---------+-----------+",
"| Spain | Barcelona |",
"| Spain | Madrid |",
"+---------+-----------+",
],
&results
);
}
}
21 changes: 10 additions & 11 deletions influxdb3_cache/src/distinct_cache/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ impl TableProvider for DistinctCacheFunctionProvider {
predicates,
Arc::clone(&self.table_def),
&[batches],
self.schema(),
projection,
schema,
projection.is_some(),
limit,
)?;

Expand Down Expand Up @@ -292,7 +292,7 @@ struct DistinctCacheExec {
inner: MemoryExec,
table_def: Arc<TableDefinition>,
predicates: Option<IndexMap<ColumnId, Predicate>>,
projection: Option<Vec<usize>>,
is_projected: bool,
limit: Option<usize>,
}

Expand All @@ -302,15 +302,15 @@ impl DistinctCacheExec {
table_def: Arc<TableDefinition>,
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<&Vec<usize>>,
is_projected: bool,
limit: Option<usize>,
) -> Result<Self> {
Ok(Self {
// projection is handled prior, so we don't forward it down to the MemoryExec:
inner: MemoryExec::try_new(partitions, schema, None)?,
predicates,
table_def,
projection: projection.cloned(),
is_projected,
limit,
})
}
Expand All @@ -328,14 +328,13 @@ impl DisplayAs for DistinctCacheExec {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DistinctCacheExec:")?;
if let Some(projection) = &self.projection {
if self.is_projected {
write!(f, " projection=[")?;
let schema = self.schema();
let mut p_iter = projection.iter();
while let Some(i) = p_iter.next() {
let name = schema.fields().get(*i).ok_or(std::fmt::Error)?.name();
write!(f, "{name}@{i}")?;
if p_iter.size_hint().0 > 0 {
let mut field_iter = schema.fields().iter().peekable();
while let (Some(field), next) = (field_iter.next(), field_iter.peek()) {
write!(f, "{name}", name = field.name())?;
if next.is_some() {
write!(f, ", ")?;
}
}
Expand Down

0 comments on commit 04f10ad

Please sign in to comment.