Skip to content

Commit

Permalink
refactor: introduce Invariant levels, and make explicit how the post-…
Browse files Browse the repository at this point in the history
…optimization checker should be run
  • Loading branch information
wiedld committed Jan 19, 2025
1 parent e71ef9f commit 9d854a6
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 24 deletions.
167 changes: 145 additions & 22 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use datafusion_expr::{
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;
Expand Down Expand Up @@ -1875,6 +1876,10 @@ impl DefaultPhysicalPlanner {
displayable(plan.as_ref()).indent(true)
);

// This runs once before any optimization,
// to verify that the plan fulfills the base requirements.
InvariantChecker(InvariantLevel::Always).check(&plan)?;

let mut new_plan = Arc::clone(&plan);
for optimizer in optimizers {
let before_schema = new_plan.schema();
Expand All @@ -1884,9 +1889,9 @@ impl DefaultPhysicalPlanner {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;

// confirm optimizer change did not violate invariants
let mut validator = InvariantChecker::new(optimizer);
validator.check(&new_plan, before_schema)?;
// This only checks the schema in release build, and performs additional checks in debug mode.
OptimizationInvariantChecker::new(optimizer)
.check(&new_plan, before_schema)?;

trace!(
"Optimized physical plan by {}:\n{}\n",
Expand All @@ -1895,6 +1900,11 @@ impl DefaultPhysicalPlanner {
);
observer(new_plan.as_ref(), optimizer.as_ref())
}

// This runs once after all optimizer runs are complete,
// to verify that the plan is executable.
InvariantChecker(InvariantLevel::Executable).check(&new_plan)?;

debug!(
"Optimized physical plan:\n{}\n",
displayable(new_plan.as_ref()).indent(false)
Expand Down Expand Up @@ -2002,22 +2012,21 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
}
}

/// Confirms that a given [`PhysicalOptimizerRule`] run
/// did not violate the [`ExecutionPlan`] invariants.
struct InvariantChecker<'a> {
struct OptimizationInvariantChecker<'a> {
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
}

impl<'a> InvariantChecker<'a> {
/// Create an [`InvariantChecker`].
impl<'a> OptimizationInvariantChecker<'a> {
/// Create an [`OptimizationInvariantChecker`] that performs checking per tule.
pub fn new(rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>) -> Self {
Self { rule }
}

/// Checks that the plan change is permitted, returning an Error if not.
///
/// Conditionally performs schema checks per [PhysicalOptimizerRule::schema_check].
/// In debug mode, this recursively walks the entire physical plan
/// and performs [`ExecutionPlan::check_node_invariants`].
/// and performs [`ExecutionPlan::check_invariants`].
pub fn check(
&mut self,
plan: &Arc<dyn ExecutionPlan>,
Expand All @@ -2032,19 +2041,48 @@ impl<'a> InvariantChecker<'a> {
)?
}

// check invariants per ExecutionPlan extension
// check invariants per each ExecutionPlan node
#[cfg(debug_assertions)]
plan.visit(self)?;

Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for InvariantChecker<'_> {
impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
node.check_node_invariants().map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?;
// Checks for the more permissive `InvariantLevel::Always`.
// Plans are not guarenteed to be executable after each physical optimizer run.
node.check_invariants(InvariantLevel::Always).map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?;
Ok(TreeNodeRecursion::Continue)
}
}

/// Check [`ExecutionPlan`] invariants per [`InvariantLevel`].
struct InvariantChecker(InvariantLevel);

impl InvariantChecker {
/// Checks that the plan is executable, returning an Error if not.
pub fn check(&mut self, plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
// check invariants per each ExecutionPlan node
plan.visit(self)?;

Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
node.check_invariants(self.0).map_err(|e| {
e.context(format!(
"Invariant for ExecutionPlan node '{}' failed",
node.name()
))
})?;
Ok(TreeNodeRecursion::Continue)
}
}
Expand Down Expand Up @@ -2864,15 +2902,18 @@ digraph {
}
}

/// Extension Node which fails invariant checks
/// Extension Node which fails the [`OptimizationInvariantChecker`].
#[derive(Debug)]
struct InvariantFailsExtensionNode;
impl ExecutionPlan for InvariantFailsExtensionNode {
fn name(&self) -> &str {
"InvariantFailsExtensionNode"
}
fn check_node_invariants(&self) -> Result<()> {
plan_err!("extension node failed it's user-defined invariant check")
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"),
InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"),
}
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
Expand Down Expand Up @@ -2926,7 +2967,7 @@ digraph {
}

#[test]
fn test_invariant_checker() -> Result<()> {
fn test_optimization_invariant_checker() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(OptimizerRuleWithSchemaCheck);

Expand All @@ -2940,37 +2981,119 @@ digraph {

// Test: check should pass with same schema
let equal_schema = ok_plan.schema();
InvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;
OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;

// Test: should fail with schema changed
let different_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let expected_err = InvariantChecker::new(&rule)
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&ok_plan, different_schema)
.unwrap_err();
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema"));

// Test: should fail when extension node fails it's own invariant check
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let expected_err = InvariantChecker::new(&rule)
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&failing_node, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined invariant check"));
.contains("extension node failed it's user-defined always-invariant check"));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let invalid_plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = InvariantChecker::new(&rule)
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&invalid_plan, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined invariant check"));
.contains("extension node failed it's user-defined always-invariant check"));

Ok(())
}

/// Extension Node which fails the [`InvariantChecker`]
/// if, and only if, [`InvariantLevel::Executable`]
#[derive(Debug)]
struct ExecutableInvariantFails;
impl ExecutionPlan for ExecutableInvariantFails {
fn name(&self) -> &str {
"ExecutableInvariantFails"
}
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => Ok(()),
InvariantLevel::Executable => plan_err!(
"extension node failed it's user-defined executable-invariant check"
),
}
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for ExecutableInvariantFails {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

#[test]
fn test_invariant_checker_levels() -> Result<()> {
// plan that passes the always-invariant, but fails the executable check
let plan: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);

// Test: check should pass with less stringent Always check
InvariantChecker(InvariantLevel::Always).check(&plan)?;

// Test: should fail the executable check
let expected_err = InvariantChecker(InvariantLevel::Executable)
.check(&plan)
.unwrap_err();
assert!(expected_err.to_string().contains(
"extension node failed it's user-defined executable-invariant check"
));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
let child = Arc::clone(&ok_node);
let plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = InvariantChecker(InvariantLevel::Executable)
.check(&plan)
.unwrap_err();
assert!(expected_err.to_string().contains(
"extension node failed it's user-defined executable-invariant check"
));

Ok(())
}
Expand Down
14 changes: 12 additions & 2 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
///
/// A default set of invariants is provided in the default implementation.
/// Extension nodes can provide their own invariants.
fn check_node_invariants(&self) -> Result<()> {
// TODO
fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
Ok(())
}

Expand Down Expand Up @@ -434,6 +433,17 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}

#[derive(Clone, Copy)]
pub enum InvariantLevel {
/// Invariants that are always true for the [`ExecutionPlan`] node
/// such as the number of expected children.
Always,
/// Invariants that must hold true for the [`ExecutionPlan`] node
/// to be "executable", such as ordering and/or distribution requirements
/// being fulfilled.
Executable,
}

/// Extension trait provides an easy API to fetch various properties of
/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
pub trait ExecutionPlanProperties {
Expand Down

0 comments on commit 9d854a6

Please sign in to comment.