Skip to content

Commit

Permalink
feat(13652): perform invariant checking on the execution plan, condit…
Browse files Browse the repository at this point in the history
…ionally based upon the expected/stated behavior of the optimizer rule
  • Loading branch information
wiedld committed Jan 14, 2025
1 parent 5760792 commit 94482d1
Showing 1 changed file with 48 additions and 16 deletions.
64 changes: 48 additions & 16 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::logical_expr::{
UserDefinedLogicalNode,
};
use crate::physical_expr::{create_physical_expr, create_physical_exprs};
use crate::physical_optimizer::sanity_checker::check_plan_sanity;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::analyze::AnalyzeExec;
use crate::physical_plan::empty::EmptyExec;
Expand Down Expand Up @@ -64,6 +65,7 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::RecordBatch;
use datafusion_common::config::OptimizerOptions;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
Expand Down Expand Up @@ -1873,7 +1875,8 @@ impl DefaultPhysicalPlanner {
displayable(plan.as_ref()).indent(true)
);

let mut new_plan = plan;
let mut new_plan = Arc::clone(&plan);
let mut input_plan_is_valid = true;
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
Expand All @@ -1883,9 +1886,12 @@ impl DefaultPhysicalPlanner {
})?;

// confirm optimizer change did not violate invariants
InvariantChecker
.check(&new_plan, optimizer, before_schema)
.map_err(|e| e.context(optimizer.name().to_string()))?;
let mut validator = InvariantChecker::new(
&session_state.config_options().optimizer,
optimizer,
);
validator.check(&new_plan, before_schema, input_plan_is_valid)?;
input_plan_is_valid = optimizer.executable_check(input_plan_is_valid);

trace!(
"Optimized physical plan by {}:\n{}\n",
Expand Down Expand Up @@ -2001,41 +2007,67 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
}
}

#[derive(Default)]
struct InvariantChecker;
/// Confirms that a given [`PhysicalOptimizerRule`] run conforms
/// to the invariants per rule, and per [`ExecutionPlan`] invariants.
struct InvariantChecker<'a> {
options: &'a OptimizerOptions,
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
}

impl<'a> InvariantChecker<'a> {
/// Create an [`InvariantChecker`].
pub fn new(
options: &'a OptimizerOptions,
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
) -> Self {
Self { options, rule }
}

impl InvariantChecker {
/// Checks that the plan change is permitted, returning an Error if not.
///
/// In debug mode, this recursively walks the entire physical plan and
/// performs additional checks using [`ExecutionPlan::check_node_invariants`].
/// performs additional checks using Datafusions's [`check_plan_sanity`]
/// and any user defined [`ExecutionPlan::check_node_invariants`] extensions.
pub fn check(
&mut self,
plan: &Arc<dyn ExecutionPlan>,
rule: &Arc<dyn PhysicalOptimizerRule + Send + Sync>,
previous_schema: Arc<Schema>,
input_plan_is_valid: bool,
) -> Result<()> {
// Invariant: in most cases, the schema cannot be changed
// since the plan's output cannot change after the optimizer pass.
if rule.schema_check() && plan.schema() != previous_schema {
// if the rule is not permitted to change the schema, confirm that it did not change.
if self.rule.schema_check() && plan.schema() != previous_schema {
internal_err!("PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
rule.name(),
self.rule.name(),
previous_schema,
plan.schema()
)?
}

// if the rule requires that the new plan is executable, confirm that it is.
#[cfg(debug_assertions)]
plan.visit(self)?;
if self.rule.executable_check(input_plan_is_valid) {
plan.visit(self)?;
}

Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
impl<'n> TreeNodeVisitor<'n> for InvariantChecker<'_> {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
node.check_node_invariants()?;
// Datafusion's defined physical plan invariants
check_plan_sanity(Arc::clone(node), self.options).map_err(|e| {
e.context(format!(
"SanityCheckPlan failed for PhysicalOptimizer rule '{}'",
self.rule.name()
))
})?;

// user defined invariants per ExecutionPlan extension
node.check_node_invariants().map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?;

Ok(TreeNodeRecursion::Continue)
}
}
Expand Down

0 comments on commit 94482d1

Please sign in to comment.