Skip to content

Commit

Permalink
Interface for physical plan invariant checking. (#13986)
Browse files Browse the repository at this point in the history
* feat(13652): provide interfaces for checking physical plan invariants, and perform check as part of the default physical planner

* feat(13652): define PhysicalOptimizerRule::executable_check interface which allows each optimizer rule to state the  of the output plan

* feat(13652): perform invariant checking on the execution plan, conditionally based upon the expected/stated behavior of the optimizer rule

* test: update tests to reflect updated invariant checking paradigm

* Revert "feat(13652): define PhysicalOptimizerRule::executable_check interface which allows each optimizer rule to state the  of the output plan"

This reverts commit 5760792.

* Revert "test: update tests to reflect updated invariant checking paradigm"

This reverts commit ad15c85.

* refactor: remove vestiges of sanity_check from the InvariantChecker

* refactor: introduce Invariant levels, and make explicit how the post-optimization checker should be run

* feat: provide invariant for UnionExec

* chore: update docs and error messages
  • Loading branch information
wiedld authored Jan 20, 2025
1 parent 163314d commit 2f28327
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 15 deletions.
343 changes: 330 additions & 13 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::RecordBatch;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
ScalarValue,
Expand All @@ -82,6 +83,7 @@ use datafusion_expr::{
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;
Expand Down Expand Up @@ -1874,33 +1876,35 @@ impl DefaultPhysicalPlanner {
displayable(plan.as_ref()).indent(true)
);

let mut new_plan = plan;
// This runs once before any optimization,
// to verify that the plan fulfills the base requirements.
InvariantChecker(InvariantLevel::Always).check(&plan)?;

let mut new_plan = Arc::clone(&plan);
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
.optimize(new_plan, session_state.config_options())
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
if optimizer.schema_check() && new_plan.schema() != before_schema {
let e = DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
optimizer.name(),
before_schema,
new_plan.schema()
));
return Err(DataFusionError::Context(
optimizer.name().to_string(),
Box::new(e),
));
}

// This only checks the schema in release build, and performs additional checks in debug mode.
OptimizationInvariantChecker::new(optimizer)
.check(&new_plan, before_schema)?;

trace!(
"Optimized physical plan by {}:\n{}\n",
optimizer.name(),
displayable(new_plan.as_ref()).indent(false)
);
observer(new_plan.as_ref(), optimizer.as_ref())
}

// This runs once after all optimizer runs are complete,
// to verify that the plan is executable.
InvariantChecker(InvariantLevel::Executable).check(&new_plan)?;

debug!(
"Optimized physical plan:\n{}\n",
displayable(new_plan.as_ref()).indent(false)
Expand Down Expand Up @@ -2008,6 +2012,83 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
}
}

struct OptimizationInvariantChecker<'a> {
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
}

impl<'a> OptimizationInvariantChecker<'a> {
/// Create an [`OptimizationInvariantChecker`] that performs checking per tule.
pub fn new(rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>) -> Self {
Self { rule }
}

/// Checks that the plan change is permitted, returning an Error if not.
///
/// Conditionally performs schema checks per [PhysicalOptimizerRule::schema_check].
/// In debug mode, this recursively walks the entire physical plan
/// and performs [`ExecutionPlan::check_invariants`].
pub fn check(
&mut self,
plan: &Arc<dyn ExecutionPlan>,
previous_schema: Arc<Schema>,
) -> Result<()> {
// if the rule is not permitted to change the schema, confirm that it did not change.
if self.rule.schema_check() && plan.schema() != previous_schema {
internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}",
self.rule.name(),
previous_schema,
plan.schema()
)?
}

// check invariants per each ExecutionPlan node
#[cfg(debug_assertions)]
plan.visit(self)?;

Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
// Checks for the more permissive `InvariantLevel::Always`.
// Plans are not guarenteed to be executable after each physical optimizer run.
node.check_invariants(InvariantLevel::Always).map_err(|e|
e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name()))
)?;
Ok(TreeNodeRecursion::Continue)
}
}

/// Check [`ExecutionPlan`] invariants per [`InvariantLevel`].
struct InvariantChecker(InvariantLevel);

impl InvariantChecker {
/// Checks that the plan is executable, returning an Error if not.
pub fn check(&mut self, plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
// check invariants per each ExecutionPlan node
plan.visit(self)?;

Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
node.check_invariants(self.0).map_err(|e| {
e.context(format!(
"Invariant for ExecutionPlan node '{}' failed",
node.name()
))
})?;
Ok(TreeNodeRecursion::Continue)
}
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand All @@ -2028,6 +2109,7 @@ mod tests {
use crate::execution::session_state::SessionStateBuilder;
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, DFSchemaRef, TableReference};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -2782,4 +2864,239 @@ digraph {

assert_contains!(generated_graph, expected_tooltip);
}

/// Extension Node which passes invariant checks
#[derive(Debug)]
struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
impl ExecutionPlan for OkExtensionNode {
fn name(&self) -> &str {
"always ok"
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self(children)))
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for OkExtensionNode {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

/// Extension Node which fails the [`OptimizationInvariantChecker`].
#[derive(Debug)]
struct InvariantFailsExtensionNode;
impl ExecutionPlan for InvariantFailsExtensionNode {
fn name(&self) -> &str {
"InvariantFailsExtensionNode"
}
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"),
InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"),
}
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for InvariantFailsExtensionNode {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

/// Extension Optimizer rule that requires the schema check
#[derive(Debug)]
struct OptimizerRuleWithSchemaCheck;
impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
fn name(&self) -> &str {
"OptimizerRuleWithSchemaCheck"
}
fn schema_check(&self) -> bool {
true
}
}

#[test]
fn test_optimization_invariant_checker() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(OptimizerRuleWithSchemaCheck);

// ok plan
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
let child = Arc::clone(&ok_node);
let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
Arc::clone(&child),
])?;

// Test: check should pass with same schema
let equal_schema = ok_plan.schema();
OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;

// Test: should fail with schema changed
let different_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&ok_plan, different_schema)
.unwrap_err();
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed. Schema mismatch. Expected original schema"));

// Test: should fail when extension node fails it's own invariant check
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&failing_node, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined always-invariant check"));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let invalid_plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&invalid_plan, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined always-invariant check"));

Ok(())
}

/// Extension Node which fails the [`InvariantChecker`]
/// if, and only if, [`InvariantLevel::Executable`]
#[derive(Debug)]
struct ExecutableInvariantFails;
impl ExecutionPlan for ExecutableInvariantFails {
fn name(&self) -> &str {
"ExecutableInvariantFails"
}
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => Ok(()),
InvariantLevel::Executable => plan_err!(
"extension node failed it's user-defined executable-invariant check"
),
}
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for ExecutableInvariantFails {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

#[test]
fn test_invariant_checker_levels() -> Result<()> {
// plan that passes the always-invariant, but fails the executable check
let plan: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);

// Test: check should pass with less stringent Always check
InvariantChecker(InvariantLevel::Always).check(&plan)?;

// Test: should fail the executable check
let expected_err = InvariantChecker(InvariantLevel::Executable)
.check(&plan)
.unwrap_err();
assert!(expected_err.to_string().contains(
"extension node failed it's user-defined executable-invariant check"
));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
let child = Arc::clone(&ok_node);
let plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = InvariantChecker(InvariantLevel::Executable)
.check(&plan)
.unwrap_err();
assert!(expected_err.to_string().contains(
"extension node failed it's user-defined executable-invariant check"
));

Ok(())
}
}
Loading

0 comments on commit 2f28327

Please sign in to comment.