From 3b03f6c642abec816f4f55e53b2beca3b2292245 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 2 Jan 2025 12:28:31 -0800 Subject: [PATCH 01/10] feat(13652): provide interfaces for checking physical plan invariants, and perform check as part of the default physical planner --- datafusion/core/src/physical_planner.rs | 209 +++++++++++++++++- .../physical-plan/src/execution_plan.rs | 10 + 2 files changed, 207 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47b31d2f4e2d..38db4c6a92c7 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -66,6 +66,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow_array::builder::StringBuilder; use arrow_array::RecordBatch; use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, ScalarValue, @@ -1880,18 +1881,12 @@ impl DefaultPhysicalPlanner { .map_err(|e| { DataFusionError::Context(optimizer.name().to_string(), Box::new(e)) })?; - if optimizer.schema_check() && new_plan.schema() != before_schema { - let e = DataFusionError::Internal(format!( - "PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}", - optimizer.name(), - before_schema, - new_plan.schema() - )); - return Err(DataFusionError::Context( - optimizer.name().to_string(), - Box::new(e), - )); - } + + // confirm optimizer change did not violate invariants + InvariantChecker + .check(&new_plan, optimizer, before_schema) + .map_err(|e| e.context(optimizer.name().to_string()))?; + trace!( "Optimized physical plan by {}:\n{}\n", optimizer.name(), @@ -2006,6 +2001,45 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } +#[derive(Default)] +struct InvariantChecker; + +impl InvariantChecker { + /// Checks that the plan change is permitted, returning an Error if not. + /// + /// In debug mode, this recursively walks the entire physical plan and + /// performs additional checks using [`ExecutionPlan::check_node_invariants`]. + pub fn check( + &mut self, + plan: &Arc, + rule: &Arc, + previous_schema: Arc, + ) -> Result<()> { + // Invariant: in most cases, the schema cannot be changed + // since the plan's output cannot change after the optimizer pass. + if rule.schema_check() && plan.schema() != previous_schema { + internal_err!("PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}", + rule.name(), + previous_schema, + plan.schema() + )? + } + + #[cfg(debug_assertions)] + plan.visit(self)?; + Ok(()) + } +} + +impl<'n> TreeNodeVisitor<'n> for InvariantChecker { + type Node = Arc; + + fn f_down(&mut self, node: &'n Self::Node) -> Result { + node.check_node_invariants()?; + Ok(TreeNodeRecursion::Continue) + } +} + #[cfg(test)] mod tests { use std::any::Any; @@ -2026,6 +2060,7 @@ mod tests { use crate::execution::session_state::SessionStateBuilder; use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type}; + use datafusion_common::config::ConfigOptions; use datafusion_common::{assert_contains, DFSchemaRef, TableReference}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; @@ -2780,4 +2815,154 @@ digraph { assert_contains!(generated_graph, expected_tooltip); } + + /// Extension Node which passes invariant checks + #[derive(Debug)] + struct OkExtensionNode(Vec>); + impl ExecutionPlan for OkExtensionNode { + fn name(&self) -> &str { + "always ok" + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self(children))) + } + fn schema(&self) -> SchemaRef { + Arc::new(Schema::empty()) + } + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + fn children(&self) -> Vec<&Arc> { + self.0.iter().collect::>() + } + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + } + impl DisplayAs for OkExtensionNode { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.name()) + } + } + + /// Extension Node which fails invariant checks + #[derive(Debug)] + struct InvariantFailsExtensionNode; + impl ExecutionPlan for InvariantFailsExtensionNode { + fn name(&self) -> &str { + "InvariantFailsExtensionNode" + } + fn check_node_invariants(&self) -> Result<()> { + plan_err!("extension node failed it's user-defined invariant check") + } + fn schema(&self) -> SchemaRef { + Arc::new(Schema::empty()) + } + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + fn children(&self) -> Vec<&Arc> { + unimplemented!() + } + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + } + impl DisplayAs for InvariantFailsExtensionNode { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.name()) + } + } + + /// Extension Optimizer rule that requires the schema check + #[derive(Debug)] + struct OptimizerRuleWithSchemaCheck; + impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + Ok(plan) + } + fn name(&self) -> &str { + "OptimizerRuleWithSchemaCheck" + } + fn schema_check(&self) -> bool { + true + } + } + + #[test] + fn test_invariant_checker() -> Result<()> { + let rule: Arc = + Arc::new(OptimizerRuleWithSchemaCheck); + + // ok plan + let ok_node: Arc = Arc::new(OkExtensionNode(vec![])); + let child = Arc::clone(&ok_node); + let ok_plan = Arc::clone(&ok_node).with_new_children(vec![ + Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?, + Arc::clone(&child), + ])?; + + // Test: check should pass with same schema + let equal_schema = ok_plan.schema(); + InvariantChecker.check(&ok_plan, &rule, equal_schema)?; + + // Test: should fail with schema changed + let different_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let expected_err = InvariantChecker + .check(&ok_plan, &rule, different_schema) + .unwrap_err(); + assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema")); + + // Test: should fail when extension node fails it's own invariant check + let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); + let expected_err = InvariantChecker + .check(&failing_node, &rule, ok_plan.schema()) + .unwrap_err(); + assert!(expected_err + .to_string() + .contains("extension node failed it's user-defined invariant check")); + + // Test: should fail when descendent extension node fails + let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); + let invalid_plan = ok_node.with_new_children(vec![ + Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, + Arc::clone(&child), + ])?; + let expected_err = InvariantChecker + .check(&invalid_plan, &rule, ok_plan.schema()) + .unwrap_err(); + assert!(expected_err + .to_string() + .contains("extension node failed it's user-defined invariant check")); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 5f0b229ce92a..e736c468cd2f 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -110,6 +110,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// trait, which is implemented for all `ExecutionPlan`s. fn properties(&self) -> &PlanProperties; + /// Returns an error if this individual node does not conform to its invariants. + /// These invariants are typically only checked in debug mode. + /// + /// A default set of invariants is provided in the default implementation. + /// Extension nodes can provide their own invariants. + fn check_node_invariants(&self) -> Result<()> { + // TODO + Ok(()) + } + /// Specifies the data distribution requirements for all the /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child, fn required_input_distribution(&self) -> Vec { From 5760792a7a4e051305adbeb2e84fd4d5c265e5ea Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 13 Jan 2025 21:10:20 -0800 Subject: [PATCH 02/10] feat(13652): define PhysicalOptimizerRule::executable_check interface which allows each optimizer rule to state the of the output plan --- .../src/physical_optimizer/join_selection.rs | 5 +++++ .../src/physical_optimizer/sanity_checker.rs | 5 +++++ .../physical-optimizer/src/optimizer.rs | 19 +++++++++++++++++++ .../src/output_requirements.rs | 5 +++++ 4 files changed, 34 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 736c3fbd0184..dc1f13f9290e 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -200,6 +200,11 @@ impl PhysicalOptimizerRule for JoinSelection { fn schema_check(&self) -> bool { true } + + /// [`JoinSelection`] is expected to produce invalid plans. + fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { + false + } } /// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when possible. diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index 8e8787aec96b..35ef4e275c61 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -70,6 +70,11 @@ impl PhysicalOptimizerRule for SanityCheckPlan { fn schema_check(&self) -> bool { true } + + /// [`SanityCheckPlan`] confirms the plan is executable. + fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { + true + } } /// This function propagates finiteness information and rejects any plan with diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 609890e2d43f..ad1422ed1703 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -46,4 +46,23 @@ pub trait PhysicalOptimizerRule: Debug { /// Some of the optimization rules might change the nullable properties of the schema /// and should disable the schema check. fn schema_check(&self) -> bool; + + /// A flag to indicate whether the PhysicalOptimizerRule is + /// expected to produce an executable physical plan as output, + /// dependent upon whether the input plan is executable. + /// + /// For example, some rules may mutate a plan and produce a non-executable + /// output. For those [`PhysicalOptimizerRule`]s it is important to + /// perform additional enforcement runs (e.g. to enforce distribution and sorting) + /// prior to execution. + /// + /// Whereas for other [`PhysicalOptimizerRule`]s, it is guarenteed that the output + /// plan is executable. + fn executable_check(&self, previous_plan_is_valid: bool) -> bool { + // Default is that the optimizer run does not impact the current level of executable. + // + // Non-executable plans remain non-executable, + // executable plans remain executable. + previous_plan_is_valid + } } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index e107bb85d7b8..cb70189d7e24 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -223,6 +223,11 @@ impl PhysicalOptimizerRule for OutputRequirements { fn schema_check(&self) -> bool { true } + + /// [`OutputRequirements`] is expected to produce invalid plans. + fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { + false + } } /// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that From 94482d1ab9091bfe1395e8e02c14bb07c59f40e4 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 13 Jan 2025 21:11:45 -0800 Subject: [PATCH 03/10] feat(13652): perform invariant checking on the execution plan, conditionally based upon the expected/stated behavior of the optimizer rule --- datafusion/core/src/physical_planner.rs | 64 ++++++++++++++++++------- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ca9e0714a582..eeddd769cbba 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -36,6 +36,7 @@ use crate::logical_expr::{ UserDefinedLogicalNode, }; use crate::physical_expr::{create_physical_expr, create_physical_exprs}; +use crate::physical_optimizer::sanity_checker::check_plan_sanity; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::empty::EmptyExec; @@ -64,6 +65,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use arrow_array::builder::StringBuilder; use arrow_array::RecordBatch; +use datafusion_common::config::OptimizerOptions; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ @@ -1873,7 +1875,8 @@ impl DefaultPhysicalPlanner { displayable(plan.as_ref()).indent(true) ); - let mut new_plan = plan; + let mut new_plan = Arc::clone(&plan); + let mut input_plan_is_valid = true; for optimizer in optimizers { let before_schema = new_plan.schema(); new_plan = optimizer @@ -1883,9 +1886,12 @@ impl DefaultPhysicalPlanner { })?; // confirm optimizer change did not violate invariants - InvariantChecker - .check(&new_plan, optimizer, before_schema) - .map_err(|e| e.context(optimizer.name().to_string()))?; + let mut validator = InvariantChecker::new( + &session_state.config_options().optimizer, + optimizer, + ); + validator.check(&new_plan, before_schema, input_plan_is_valid)?; + input_plan_is_valid = optimizer.executable_check(input_plan_is_valid); trace!( "Optimized physical plan by {}:\n{}\n", @@ -2001,41 +2007,67 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } -#[derive(Default)] -struct InvariantChecker; +/// Confirms that a given [`PhysicalOptimizerRule`] run conforms +/// to the invariants per rule, and per [`ExecutionPlan`] invariants. +struct InvariantChecker<'a> { + options: &'a OptimizerOptions, + rule: &'a Arc, +} + +impl<'a> InvariantChecker<'a> { + /// Create an [`InvariantChecker`]. + pub fn new( + options: &'a OptimizerOptions, + rule: &'a Arc, + ) -> Self { + Self { options, rule } + } -impl InvariantChecker { /// Checks that the plan change is permitted, returning an Error if not. /// /// In debug mode, this recursively walks the entire physical plan and - /// performs additional checks using [`ExecutionPlan::check_node_invariants`]. + /// performs additional checks using Datafusions's [`check_plan_sanity`] + /// and any user defined [`ExecutionPlan::check_node_invariants`] extensions. pub fn check( &mut self, plan: &Arc, - rule: &Arc, previous_schema: Arc, + input_plan_is_valid: bool, ) -> Result<()> { - // Invariant: in most cases, the schema cannot be changed - // since the plan's output cannot change after the optimizer pass. - if rule.schema_check() && plan.schema() != previous_schema { + // if the rule is not permitted to change the schema, confirm that it did not change. + if self.rule.schema_check() && plan.schema() != previous_schema { internal_err!("PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}", - rule.name(), + self.rule.name(), previous_schema, plan.schema() )? } + // if the rule requires that the new plan is executable, confirm that it is. #[cfg(debug_assertions)] - plan.visit(self)?; + if self.rule.executable_check(input_plan_is_valid) { + plan.visit(self)?; + } + Ok(()) } } -impl<'n> TreeNodeVisitor<'n> for InvariantChecker { +impl<'n> TreeNodeVisitor<'n> for InvariantChecker<'_> { type Node = Arc; fn f_down(&mut self, node: &'n Self::Node) -> Result { - node.check_node_invariants()?; + // Datafusion's defined physical plan invariants + check_plan_sanity(Arc::clone(node), self.options).map_err(|e| { + e.context(format!( + "SanityCheckPlan failed for PhysicalOptimizer rule '{}'", + self.rule.name() + )) + })?; + + // user defined invariants per ExecutionPlan extension + node.check_node_invariants().map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?; + Ok(TreeNodeRecursion::Continue) } } From ad15c85e229e2e1da9e54d3f2732e237c4760b0f Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 13 Jan 2025 21:15:57 -0800 Subject: [PATCH 04/10] test: update tests to reflect updated invariant checking paradigm --- datafusion/core/src/physical_planner.rs | 217 ++++++++++++++++-- .../physical-plan/src/execution_plan.rs | 1 - 2 files changed, 196 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index eeddd769cbba..67258ef6c95c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2082,6 +2082,8 @@ mod tests { use super::*; use crate::datasource::file_format::options::CsvReadOptions; use crate::datasource::MemTable; + use crate::physical_optimizer::enforce_sorting::EnforceSorting; + use crate::physical_optimizer::sanity_checker::SanityCheckPlan; use crate::physical_plan::{ expressions, DisplayAs, DisplayFormatType, PlanProperties, SendableRecordBatchStream, @@ -2098,7 +2100,11 @@ mod tests { use datafusion_execution::TaskContext; use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore}; use datafusion_functions_aggregate::expr_fn::sum; - use datafusion_physical_expr::EquivalenceProperties; + use datafusion_physical_expr::expressions::{ + col as physical_expr_col, lit as physical_expr_lit, + }; + use datafusion_physical_expr::{Distribution, EquivalenceProperties, LexRequirement}; + use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; fn make_session_state() -> SessionState { @@ -2848,9 +2854,29 @@ digraph { assert_contains!(generated_graph, expected_tooltip); } + fn default_plan_props() -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(Arc::new(Schema::empty())), + Partitioning::RoundRobinBatch(1), + EmissionType::Final, + Boundedness::Bounded, + ) + } + /// Extension Node which passes invariant checks #[derive(Debug)] - struct OkExtensionNode(Vec>); + struct OkExtensionNode { + children: Vec>, + properties: PlanProperties, + } + impl OkExtensionNode { + fn new(children: Vec>) -> Self { + Self { + children, + properties: default_plan_props(), + } + } + } impl ExecutionPlan for OkExtensionNode { fn name(&self) -> &str { "always ok" @@ -2859,19 +2885,19 @@ digraph { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(Self(children))) + Ok(Arc::new(Self::new(children))) } fn schema(&self) -> SchemaRef { Arc::new(Schema::empty()) } fn as_any(&self) -> &dyn Any { - unimplemented!() + self } fn children(&self) -> Vec<&Arc> { - self.0.iter().collect::>() + self.children.iter().collect::>() } fn properties(&self) -> &PlanProperties { - unimplemented!() + &self.properties } fn execute( &self, @@ -2889,7 +2915,16 @@ digraph { /// Extension Node which fails invariant checks #[derive(Debug)] - struct InvariantFailsExtensionNode; + struct InvariantFailsExtensionNode { + properties: PlanProperties, + } + impl InvariantFailsExtensionNode { + fn new() -> Self { + Self { + properties: default_plan_props(), + } + } + } impl ExecutionPlan for InvariantFailsExtensionNode { fn name(&self) -> &str { "InvariantFailsExtensionNode" @@ -2907,13 +2942,13 @@ digraph { unimplemented!() } fn as_any(&self) -> &dyn Any { - unimplemented!() + self } fn children(&self) -> Vec<&Arc> { - unimplemented!() + vec![] } fn properties(&self) -> &PlanProperties { - unimplemented!() + &self.properties } fn execute( &self, @@ -2946,15 +2981,18 @@ digraph { fn schema_check(&self) -> bool { true } + fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { + true + } } #[test] - fn test_invariant_checker() -> Result<()> { + fn test_invariant_checker_with_execution_plan_extensions() -> Result<()> { let rule: Arc = Arc::new(OptimizerRuleWithSchemaCheck); // ok plan - let ok_node: Arc = Arc::new(OkExtensionNode(vec![])); + let ok_node: Arc = Arc::new(OkExtensionNode::new(vec![])); let child = Arc::clone(&ok_node); let ok_plan = Arc::clone(&ok_node).with_new_children(vec![ Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?, @@ -2963,38 +3001,175 @@ digraph { // Test: check should pass with same schema let equal_schema = ok_plan.schema(); - InvariantChecker.check(&ok_plan, &rule, equal_schema)?; + InvariantChecker::new(&Default::default(), &rule).check( + &ok_plan, + equal_schema, + true, + )?; // Test: should fail with schema changed let different_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let expected_err = InvariantChecker - .check(&ok_plan, &rule, different_schema) + let expected_err = InvariantChecker::new(&Default::default(), &rule) + .check(&ok_plan, different_schema, true) .unwrap_err(); assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema")); // Test: should fail when extension node fails it's own invariant check - let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); - let expected_err = InvariantChecker - .check(&failing_node, &rule, ok_plan.schema()) + let failing_node: Arc = + Arc::new(InvariantFailsExtensionNode::new()); + let expected_err = InvariantChecker::new(&Default::default(), &rule) + .check(&failing_node, ok_plan.schema(), true) .unwrap_err(); assert!(expected_err .to_string() .contains("extension node failed it's user-defined invariant check")); // Test: should fail when descendent extension node fails - let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); + let failing_node: Arc = + Arc::new(InvariantFailsExtensionNode::new()); let invalid_plan = ok_node.with_new_children(vec![ Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, Arc::clone(&child), ])?; - let expected_err = InvariantChecker - .check(&invalid_plan, &rule, ok_plan.schema()) + let expected_err = InvariantChecker::new(&Default::default(), &rule) + .check(&invalid_plan, ok_plan.schema(), true) .unwrap_err(); assert!(expected_err .to_string() .contains("extension node failed it's user-defined invariant check")); + // Test: confirm error message contains both the user-defined extension name and the optimizer rule name + assert!(expected_err + .to_string() + .contains("Invariant for ExecutionPlan node 'InvariantFailsExtensionNode' failed for PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck'")); + + Ok(()) + } + + fn wrap_in_nonexecutable( + plan: Arc, + ) -> Result> { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, false)])); + let col_a = physical_expr_col("a", &schema)?; + + // mutate the tree is such a way that is NOT yet executable + Ok(Arc::new(OutputRequirementExec::new( + plan, + Some(LexRequirement::from_lex_ordering( + vec![PhysicalSortExpr::new_default(col_a)].into(), + )), + Distribution::UnspecifiedDistribution, + ))) + } + + /// Extension where the physical plan mutation creates a non-executable plan. + /// + /// This is a "failing" extension, since it doesn't implement [`PhysicalOptimizerRule::executable_check`] + /// as false. + #[derive(Debug)] + struct ExtensionRuleDoesBadMutation; + impl PhysicalOptimizerRule for ExtensionRuleDoesBadMutation { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + wrap_in_nonexecutable(plan) + } + fn name(&self) -> &str { + "ExtensionRuleDoesBadMutation" + } + fn schema_check(&self) -> bool { + true + } + } + + /// Extension where the physical plan mutation creates a non-executable plan. + /// + /// This extension properly implements [`PhysicalOptimizerRule::executable_check`] => false. + /// And then the follow up optimizer runs may be performed. + #[derive(Debug)] + struct ExtensionRuleNeedsMoreRuns; + impl PhysicalOptimizerRule for ExtensionRuleNeedsMoreRuns { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + wrap_in_nonexecutable(plan) + } + fn name(&self) -> &str { + "ExtensionRuleNeedsMoreRuns" + } + fn schema_check(&self) -> bool { + true + } + fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { + false + } + } + + #[test] + fn test_invariant_checker_with_optimization_extension() -> Result<()> { + let planner = DefaultPhysicalPlanner { + extension_planners: vec![], + }; + + // ok plan + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])); + let ok_plan = Arc::new(MemoryExec::try_new_as_values( + schema, + vec![vec![physical_expr_lit(ScalarValue::UInt32(None))]], + )?); + + // Test: check should pass with valid OpimizerRule mutation + let session = SessionStateBuilder::new() + .with_physical_optimizer_rules(vec![Arc::new(OptimizerRuleWithSchemaCheck)]) + .build(); + assert_eq!( + session.physical_optimizers().len(), + 1, + "should have the 1 valid optimizer rule" + ); + planner.optimize_physical_plan(ok_plan.clone(), &session, |_, _| {})?; + + // Test: should fail with invalid OpimizerRule mutation that leaves plan not executable + let session = SessionStateBuilder::new() + .with_physical_optimizer_rules(vec![ + Arc::new(SanityCheckPlan::new()), // should produce executable plan + Arc::new(ExtensionRuleDoesBadMutation), // will fail executable check + ]) + .build(); + assert_eq!( + session.physical_optimizers().len(), + 2, + "should have 2 optimizer rules" + ); + let expected_err = planner + .optimize_physical_plan(ok_plan.clone(), &session, |_, _| {}) + .unwrap_err(); + assert!(expected_err + .to_string() + .contains("SanityCheckPlan failed for PhysicalOptimizer rule 'ExtensionRuleDoesBadMutation'")); + + // Test: should pass once the proper additional optimizer rules are applied after the Extension rule + let session = SessionStateBuilder::new() + .with_physical_optimizer_rules(vec![ + Arc::new(SanityCheckPlan::new()), // should produce executable plan + Arc::new(ExtensionRuleNeedsMoreRuns), // Extension states that the returned plan is not executable + Arc::new(EnforceSorting::new()), // should mutate plan + Arc::new(SanityCheckPlan::new()), // should produce executable plan + ]) + .build(); + assert_eq!( + session.physical_optimizers().len(), + 4, + "should have 4 optimizer rules" + ); + planner.optimize_physical_plan(ok_plan, &session, |_, _| {})?; + Ok(()) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index e736c468cd2f..4f05667df4b4 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -116,7 +116,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// A default set of invariants is provided in the default implementation. /// Extension nodes can provide their own invariants. fn check_node_invariants(&self) -> Result<()> { - // TODO Ok(()) } From 5e065e1a9993dfb2d9cea79cc5b445a22172afcd Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 18 Jan 2025 19:27:12 -0800 Subject: [PATCH 05/10] Revert "feat(13652): define PhysicalOptimizerRule::executable_check interface which allows each optimizer rule to state the of the output plan" This reverts commit 5760792a7a4e051305adbeb2e84fd4d5c265e5ea. --- .../src/physical_optimizer/join_selection.rs | 5 ----- .../src/physical_optimizer/sanity_checker.rs | 5 ----- .../physical-optimizer/src/optimizer.rs | 19 ------------------- .../src/output_requirements.rs | 5 ----- 4 files changed, 34 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index dc1f13f9290e..736c3fbd0184 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -200,11 +200,6 @@ impl PhysicalOptimizerRule for JoinSelection { fn schema_check(&self) -> bool { true } - - /// [`JoinSelection`] is expected to produce invalid plans. - fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { - false - } } /// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when possible. diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index 35ef4e275c61..8e8787aec96b 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -70,11 +70,6 @@ impl PhysicalOptimizerRule for SanityCheckPlan { fn schema_check(&self) -> bool { true } - - /// [`SanityCheckPlan`] confirms the plan is executable. - fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { - true - } } /// This function propagates finiteness information and rejects any plan with diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index ad1422ed1703..609890e2d43f 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -46,23 +46,4 @@ pub trait PhysicalOptimizerRule: Debug { /// Some of the optimization rules might change the nullable properties of the schema /// and should disable the schema check. fn schema_check(&self) -> bool; - - /// A flag to indicate whether the PhysicalOptimizerRule is - /// expected to produce an executable physical plan as output, - /// dependent upon whether the input plan is executable. - /// - /// For example, some rules may mutate a plan and produce a non-executable - /// output. For those [`PhysicalOptimizerRule`]s it is important to - /// perform additional enforcement runs (e.g. to enforce distribution and sorting) - /// prior to execution. - /// - /// Whereas for other [`PhysicalOptimizerRule`]s, it is guarenteed that the output - /// plan is executable. - fn executable_check(&self, previous_plan_is_valid: bool) -> bool { - // Default is that the optimizer run does not impact the current level of executable. - // - // Non-executable plans remain non-executable, - // executable plans remain executable. - previous_plan_is_valid - } } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index cb70189d7e24..e107bb85d7b8 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -223,11 +223,6 @@ impl PhysicalOptimizerRule for OutputRequirements { fn schema_check(&self) -> bool { true } - - /// [`OutputRequirements`] is expected to produce invalid plans. - fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { - false - } } /// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that From 10af0bd51c46eb5f56b3da4e154b4d797dc2ac1a Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 18 Jan 2025 19:29:26 -0800 Subject: [PATCH 06/10] Revert "test: update tests to reflect updated invariant checking paradigm" This reverts commit ad15c85e229e2e1da9e54d3f2732e237c4760b0f. --- datafusion/core/src/physical_planner.rs | 217 ++---------------- .../physical-plan/src/execution_plan.rs | 1 + 2 files changed, 22 insertions(+), 196 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0bf94a5ae946..a6e372a2de1c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2084,8 +2084,6 @@ mod tests { use super::*; use crate::datasource::file_format::options::CsvReadOptions; use crate::datasource::MemTable; - use crate::physical_optimizer::enforce_sorting::EnforceSorting; - use crate::physical_optimizer::sanity_checker::SanityCheckPlan; use crate::physical_plan::{ expressions, DisplayAs, DisplayFormatType, PlanProperties, SendableRecordBatchStream, @@ -2102,11 +2100,7 @@ mod tests { use datafusion_execution::TaskContext; use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore}; use datafusion_functions_aggregate::expr_fn::sum; - use datafusion_physical_expr::expressions::{ - col as physical_expr_col, lit as physical_expr_lit, - }; - use datafusion_physical_expr::{Distribution, EquivalenceProperties, LexRequirement}; - use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; + use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; fn make_session_state() -> SessionState { @@ -2856,29 +2850,9 @@ digraph { assert_contains!(generated_graph, expected_tooltip); } - fn default_plan_props() -> PlanProperties { - PlanProperties::new( - EquivalenceProperties::new(Arc::new(Schema::empty())), - Partitioning::RoundRobinBatch(1), - EmissionType::Final, - Boundedness::Bounded, - ) - } - /// Extension Node which passes invariant checks #[derive(Debug)] - struct OkExtensionNode { - children: Vec>, - properties: PlanProperties, - } - impl OkExtensionNode { - fn new(children: Vec>) -> Self { - Self { - children, - properties: default_plan_props(), - } - } - } + struct OkExtensionNode(Vec>); impl ExecutionPlan for OkExtensionNode { fn name(&self) -> &str { "always ok" @@ -2887,19 +2861,19 @@ digraph { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(Self::new(children))) + Ok(Arc::new(Self(children))) } fn schema(&self) -> SchemaRef { Arc::new(Schema::empty()) } fn as_any(&self) -> &dyn Any { - self + unimplemented!() } fn children(&self) -> Vec<&Arc> { - self.children.iter().collect::>() + self.0.iter().collect::>() } fn properties(&self) -> &PlanProperties { - &self.properties + unimplemented!() } fn execute( &self, @@ -2917,16 +2891,7 @@ digraph { /// Extension Node which fails invariant checks #[derive(Debug)] - struct InvariantFailsExtensionNode { - properties: PlanProperties, - } - impl InvariantFailsExtensionNode { - fn new() -> Self { - Self { - properties: default_plan_props(), - } - } - } + struct InvariantFailsExtensionNode; impl ExecutionPlan for InvariantFailsExtensionNode { fn name(&self) -> &str { "InvariantFailsExtensionNode" @@ -2944,13 +2909,13 @@ digraph { unimplemented!() } fn as_any(&self) -> &dyn Any { - self + unimplemented!() } fn children(&self) -> Vec<&Arc> { - vec![] + unimplemented!() } fn properties(&self) -> &PlanProperties { - &self.properties + unimplemented!() } fn execute( &self, @@ -2983,18 +2948,15 @@ digraph { fn schema_check(&self) -> bool { true } - fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { - true - } } #[test] - fn test_invariant_checker_with_execution_plan_extensions() -> Result<()> { + fn test_invariant_checker() -> Result<()> { let rule: Arc = Arc::new(OptimizerRuleWithSchemaCheck); // ok plan - let ok_node: Arc = Arc::new(OkExtensionNode::new(vec![])); + let ok_node: Arc = Arc::new(OkExtensionNode(vec![])); let child = Arc::clone(&ok_node); let ok_plan = Arc::clone(&ok_node).with_new_children(vec![ Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?, @@ -3003,175 +2965,38 @@ digraph { // Test: check should pass with same schema let equal_schema = ok_plan.schema(); - InvariantChecker::new(&Default::default(), &rule).check( - &ok_plan, - equal_schema, - true, - )?; + InvariantChecker.check(&ok_plan, &rule, equal_schema)?; // Test: should fail with schema changed let different_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let expected_err = InvariantChecker::new(&Default::default(), &rule) - .check(&ok_plan, different_schema, true) + let expected_err = InvariantChecker + .check(&ok_plan, &rule, different_schema) .unwrap_err(); assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema")); // Test: should fail when extension node fails it's own invariant check - let failing_node: Arc = - Arc::new(InvariantFailsExtensionNode::new()); - let expected_err = InvariantChecker::new(&Default::default(), &rule) - .check(&failing_node, ok_plan.schema(), true) + let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); + let expected_err = InvariantChecker + .check(&failing_node, &rule, ok_plan.schema()) .unwrap_err(); assert!(expected_err .to_string() .contains("extension node failed it's user-defined invariant check")); // Test: should fail when descendent extension node fails - let failing_node: Arc = - Arc::new(InvariantFailsExtensionNode::new()); + let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); let invalid_plan = ok_node.with_new_children(vec![ Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, Arc::clone(&child), ])?; - let expected_err = InvariantChecker::new(&Default::default(), &rule) - .check(&invalid_plan, ok_plan.schema(), true) + let expected_err = InvariantChecker + .check(&invalid_plan, &rule, ok_plan.schema()) .unwrap_err(); assert!(expected_err .to_string() .contains("extension node failed it's user-defined invariant check")); - // Test: confirm error message contains both the user-defined extension name and the optimizer rule name - assert!(expected_err - .to_string() - .contains("Invariant for ExecutionPlan node 'InvariantFailsExtensionNode' failed for PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck'")); - - Ok(()) - } - - fn wrap_in_nonexecutable( - plan: Arc, - ) -> Result> { - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, false)])); - let col_a = physical_expr_col("a", &schema)?; - - // mutate the tree is such a way that is NOT yet executable - Ok(Arc::new(OutputRequirementExec::new( - plan, - Some(LexRequirement::from_lex_ordering( - vec![PhysicalSortExpr::new_default(col_a)].into(), - )), - Distribution::UnspecifiedDistribution, - ))) - } - - /// Extension where the physical plan mutation creates a non-executable plan. - /// - /// This is a "failing" extension, since it doesn't implement [`PhysicalOptimizerRule::executable_check`] - /// as false. - #[derive(Debug)] - struct ExtensionRuleDoesBadMutation; - impl PhysicalOptimizerRule for ExtensionRuleDoesBadMutation { - fn optimize( - &self, - plan: Arc, - _config: &ConfigOptions, - ) -> Result> { - wrap_in_nonexecutable(plan) - } - fn name(&self) -> &str { - "ExtensionRuleDoesBadMutation" - } - fn schema_check(&self) -> bool { - true - } - } - - /// Extension where the physical plan mutation creates a non-executable plan. - /// - /// This extension properly implements [`PhysicalOptimizerRule::executable_check`] => false. - /// And then the follow up optimizer runs may be performed. - #[derive(Debug)] - struct ExtensionRuleNeedsMoreRuns; - impl PhysicalOptimizerRule for ExtensionRuleNeedsMoreRuns { - fn optimize( - &self, - plan: Arc, - _config: &ConfigOptions, - ) -> Result> { - wrap_in_nonexecutable(plan) - } - fn name(&self) -> &str { - "ExtensionRuleNeedsMoreRuns" - } - fn schema_check(&self) -> bool { - true - } - fn executable_check(&self, _previous_plan_is_valid: bool) -> bool { - false - } - } - - #[test] - fn test_invariant_checker_with_optimization_extension() -> Result<()> { - let planner = DefaultPhysicalPlanner { - extension_planners: vec![], - }; - - // ok plan - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])); - let ok_plan = Arc::new(MemoryExec::try_new_as_values( - schema, - vec![vec![physical_expr_lit(ScalarValue::UInt32(None))]], - )?); - - // Test: check should pass with valid OpimizerRule mutation - let session = SessionStateBuilder::new() - .with_physical_optimizer_rules(vec![Arc::new(OptimizerRuleWithSchemaCheck)]) - .build(); - assert_eq!( - session.physical_optimizers().len(), - 1, - "should have the 1 valid optimizer rule" - ); - planner.optimize_physical_plan(ok_plan.clone(), &session, |_, _| {})?; - - // Test: should fail with invalid OpimizerRule mutation that leaves plan not executable - let session = SessionStateBuilder::new() - .with_physical_optimizer_rules(vec![ - Arc::new(SanityCheckPlan::new()), // should produce executable plan - Arc::new(ExtensionRuleDoesBadMutation), // will fail executable check - ]) - .build(); - assert_eq!( - session.physical_optimizers().len(), - 2, - "should have 2 optimizer rules" - ); - let expected_err = planner - .optimize_physical_plan(ok_plan.clone(), &session, |_, _| {}) - .unwrap_err(); - assert!(expected_err - .to_string() - .contains("SanityCheckPlan failed for PhysicalOptimizer rule 'ExtensionRuleDoesBadMutation'")); - - // Test: should pass once the proper additional optimizer rules are applied after the Extension rule - let session = SessionStateBuilder::new() - .with_physical_optimizer_rules(vec![ - Arc::new(SanityCheckPlan::new()), // should produce executable plan - Arc::new(ExtensionRuleNeedsMoreRuns), // Extension states that the returned plan is not executable - Arc::new(EnforceSorting::new()), // should mutate plan - Arc::new(SanityCheckPlan::new()), // should produce executable plan - ]) - .build(); - assert_eq!( - session.physical_optimizers().len(), - 4, - "should have 4 optimizer rules" - ); - planner.optimize_physical_plan(ok_plan, &session, |_, _| {})?; - Ok(()) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 4d32ab9dac4e..98e188a11d7e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -116,6 +116,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// A default set of invariants is provided in the default implementation. /// Extension nodes can provide their own invariants. fn check_node_invariants(&self) -> Result<()> { + // TODO Ok(()) } From e71ef9f20944ed573fb76ea902bcd08bb160964f Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 18 Jan 2025 19:54:54 -0800 Subject: [PATCH 07/10] refactor: remove vestiges of sanity_check from the InvariantChecker --- datafusion/core/src/physical_planner.rs | 59 +++++++------------------ 1 file changed, 17 insertions(+), 42 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a6e372a2de1c..c80ec4d6d7a9 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -36,7 +36,6 @@ use crate::logical_expr::{ UserDefinedLogicalNode, }; use crate::physical_expr::{create_physical_expr, create_physical_exprs}; -use crate::physical_optimizer::sanity_checker::check_plan_sanity; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::empty::EmptyExec; @@ -65,7 +64,6 @@ use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use arrow_array::builder::StringBuilder; use arrow_array::RecordBatch; -use datafusion_common::config::OptimizerOptions; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ @@ -1878,7 +1876,6 @@ impl DefaultPhysicalPlanner { ); let mut new_plan = Arc::clone(&plan); - let mut input_plan_is_valid = true; for optimizer in optimizers { let before_schema = new_plan.schema(); new_plan = optimizer @@ -1888,12 +1885,8 @@ impl DefaultPhysicalPlanner { })?; // confirm optimizer change did not violate invariants - let mut validator = InvariantChecker::new( - &session_state.config_options().optimizer, - optimizer, - ); - validator.check(&new_plan, before_schema, input_plan_is_valid)?; - input_plan_is_valid = optimizer.executable_check(input_plan_is_valid); + let mut validator = InvariantChecker::new(optimizer); + validator.check(&new_plan, before_schema)?; trace!( "Optimized physical plan by {}:\n{}\n", @@ -2009,32 +2002,26 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } -/// Confirms that a given [`PhysicalOptimizerRule`] run conforms -/// to the invariants per rule, and per [`ExecutionPlan`] invariants. +/// Confirms that a given [`PhysicalOptimizerRule`] run +/// did not violate the [`ExecutionPlan`] invariants. struct InvariantChecker<'a> { - options: &'a OptimizerOptions, rule: &'a Arc, } impl<'a> InvariantChecker<'a> { /// Create an [`InvariantChecker`]. - pub fn new( - options: &'a OptimizerOptions, - rule: &'a Arc, - ) -> Self { - Self { options, rule } + pub fn new(rule: &'a Arc) -> Self { + Self { rule } } /// Checks that the plan change is permitted, returning an Error if not. /// - /// In debug mode, this recursively walks the entire physical plan and - /// performs additional checks using Datafusions's [`check_plan_sanity`] - /// and any user defined [`ExecutionPlan::check_node_invariants`] extensions. + /// In debug mode, this recursively walks the entire physical plan + /// and performs [`ExecutionPlan::check_node_invariants`]. pub fn check( &mut self, plan: &Arc, previous_schema: Arc, - input_plan_is_valid: bool, ) -> Result<()> { // if the rule is not permitted to change the schema, confirm that it did not change. if self.rule.schema_check() && plan.schema() != previous_schema { @@ -2045,11 +2032,9 @@ impl<'a> InvariantChecker<'a> { )? } - // if the rule requires that the new plan is executable, confirm that it is. + // check invariants per ExecutionPlan extension #[cfg(debug_assertions)] - if self.rule.executable_check(input_plan_is_valid) { - plan.visit(self)?; - } + plan.visit(self)?; Ok(()) } @@ -2059,17 +2044,7 @@ impl<'n> TreeNodeVisitor<'n> for InvariantChecker<'_> { type Node = Arc; fn f_down(&mut self, node: &'n Self::Node) -> Result { - // Datafusion's defined physical plan invariants - check_plan_sanity(Arc::clone(node), self.options).map_err(|e| { - e.context(format!( - "SanityCheckPlan failed for PhysicalOptimizer rule '{}'", - self.rule.name() - )) - })?; - - // user defined invariants per ExecutionPlan extension node.check_node_invariants().map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?; - Ok(TreeNodeRecursion::Continue) } } @@ -2965,20 +2940,20 @@ digraph { // Test: check should pass with same schema let equal_schema = ok_plan.schema(); - InvariantChecker.check(&ok_plan, &rule, equal_schema)?; + InvariantChecker::new(&rule).check(&ok_plan, equal_schema)?; // Test: should fail with schema changed let different_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let expected_err = InvariantChecker - .check(&ok_plan, &rule, different_schema) + let expected_err = InvariantChecker::new(&rule) + .check(&ok_plan, different_schema) .unwrap_err(); assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema")); // Test: should fail when extension node fails it's own invariant check let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); - let expected_err = InvariantChecker - .check(&failing_node, &rule, ok_plan.schema()) + let expected_err = InvariantChecker::new(&rule) + .check(&failing_node, ok_plan.schema()) .unwrap_err(); assert!(expected_err .to_string() @@ -2990,8 +2965,8 @@ digraph { Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, Arc::clone(&child), ])?; - let expected_err = InvariantChecker - .check(&invalid_plan, &rule, ok_plan.schema()) + let expected_err = InvariantChecker::new(&rule) + .check(&invalid_plan, ok_plan.schema()) .unwrap_err(); assert!(expected_err .to_string() From 9d854a62149394379276bf661128a43c0243dc6d Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 18 Jan 2025 21:33:55 -0800 Subject: [PATCH 08/10] refactor: introduce Invariant levels, and make explicit how the post-optimization checker should be run --- datafusion/core/src/physical_planner.rs | 167 +++++++++++++++--- .../physical-plan/src/execution_plan.rs | 14 +- 2 files changed, 157 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c80ec4d6d7a9..8b0685b1a509 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -83,6 +83,7 @@ use datafusion_expr::{ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_plan::execution_plan::InvariantLevel; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; use datafusion_sql::utils::window_expr_common_partition_keys; @@ -1875,6 +1876,10 @@ impl DefaultPhysicalPlanner { displayable(plan.as_ref()).indent(true) ); + // This runs once before any optimization, + // to verify that the plan fulfills the base requirements. + InvariantChecker(InvariantLevel::Always).check(&plan)?; + let mut new_plan = Arc::clone(&plan); for optimizer in optimizers { let before_schema = new_plan.schema(); @@ -1884,9 +1889,9 @@ impl DefaultPhysicalPlanner { DataFusionError::Context(optimizer.name().to_string(), Box::new(e)) })?; - // confirm optimizer change did not violate invariants - let mut validator = InvariantChecker::new(optimizer); - validator.check(&new_plan, before_schema)?; + // This only checks the schema in release build, and performs additional checks in debug mode. + OptimizationInvariantChecker::new(optimizer) + .check(&new_plan, before_schema)?; trace!( "Optimized physical plan by {}:\n{}\n", @@ -1895,6 +1900,11 @@ impl DefaultPhysicalPlanner { ); observer(new_plan.as_ref(), optimizer.as_ref()) } + + // This runs once after all optimizer runs are complete, + // to verify that the plan is executable. + InvariantChecker(InvariantLevel::Executable).check(&new_plan)?; + debug!( "Optimized physical plan:\n{}\n", displayable(new_plan.as_ref()).indent(false) @@ -2002,22 +2012,21 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } -/// Confirms that a given [`PhysicalOptimizerRule`] run -/// did not violate the [`ExecutionPlan`] invariants. -struct InvariantChecker<'a> { +struct OptimizationInvariantChecker<'a> { rule: &'a Arc, } -impl<'a> InvariantChecker<'a> { - /// Create an [`InvariantChecker`]. +impl<'a> OptimizationInvariantChecker<'a> { + /// Create an [`OptimizationInvariantChecker`] that performs checking per tule. pub fn new(rule: &'a Arc) -> Self { Self { rule } } /// Checks that the plan change is permitted, returning an Error if not. /// + /// Conditionally performs schema checks per [PhysicalOptimizerRule::schema_check]. /// In debug mode, this recursively walks the entire physical plan - /// and performs [`ExecutionPlan::check_node_invariants`]. + /// and performs [`ExecutionPlan::check_invariants`]. pub fn check( &mut self, plan: &Arc, @@ -2032,7 +2041,7 @@ impl<'a> InvariantChecker<'a> { )? } - // check invariants per ExecutionPlan extension + // check invariants per each ExecutionPlan node #[cfg(debug_assertions)] plan.visit(self)?; @@ -2040,11 +2049,40 @@ impl<'a> InvariantChecker<'a> { } } -impl<'n> TreeNodeVisitor<'n> for InvariantChecker<'_> { +impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> { type Node = Arc; fn f_down(&mut self, node: &'n Self::Node) -> Result { - node.check_node_invariants().map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?; + // Checks for the more permissive `InvariantLevel::Always`. + // Plans are not guarenteed to be executable after each physical optimizer run. + node.check_invariants(InvariantLevel::Always).map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?; + Ok(TreeNodeRecursion::Continue) + } +} + +/// Check [`ExecutionPlan`] invariants per [`InvariantLevel`]. +struct InvariantChecker(InvariantLevel); + +impl InvariantChecker { + /// Checks that the plan is executable, returning an Error if not. + pub fn check(&mut self, plan: &Arc) -> Result<()> { + // check invariants per each ExecutionPlan node + plan.visit(self)?; + + Ok(()) + } +} + +impl<'n> TreeNodeVisitor<'n> for InvariantChecker { + type Node = Arc; + + fn f_down(&mut self, node: &'n Self::Node) -> Result { + node.check_invariants(self.0).map_err(|e| { + e.context(format!( + "Invariant for ExecutionPlan node '{}' failed", + node.name() + )) + })?; Ok(TreeNodeRecursion::Continue) } } @@ -2864,15 +2902,18 @@ digraph { } } - /// Extension Node which fails invariant checks + /// Extension Node which fails the [`OptimizationInvariantChecker`]. #[derive(Debug)] struct InvariantFailsExtensionNode; impl ExecutionPlan for InvariantFailsExtensionNode { fn name(&self) -> &str { "InvariantFailsExtensionNode" } - fn check_node_invariants(&self) -> Result<()> { - plan_err!("extension node failed it's user-defined invariant check") + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + match check { + InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"), + InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"), + } } fn schema(&self) -> SchemaRef { Arc::new(Schema::empty()) @@ -2926,7 +2967,7 @@ digraph { } #[test] - fn test_invariant_checker() -> Result<()> { + fn test_optimization_invariant_checker() -> Result<()> { let rule: Arc = Arc::new(OptimizerRuleWithSchemaCheck); @@ -2940,24 +2981,24 @@ digraph { // Test: check should pass with same schema let equal_schema = ok_plan.schema(); - InvariantChecker::new(&rule).check(&ok_plan, equal_schema)?; + OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?; // Test: should fail with schema changed let different_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let expected_err = InvariantChecker::new(&rule) + let expected_err = OptimizationInvariantChecker::new(&rule) .check(&ok_plan, different_schema) .unwrap_err(); assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema")); // Test: should fail when extension node fails it's own invariant check let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); - let expected_err = InvariantChecker::new(&rule) + let expected_err = OptimizationInvariantChecker::new(&rule) .check(&failing_node, ok_plan.schema()) .unwrap_err(); assert!(expected_err .to_string() - .contains("extension node failed it's user-defined invariant check")); + .contains("extension node failed it's user-defined always-invariant check")); // Test: should fail when descendent extension node fails let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); @@ -2965,12 +3006,94 @@ digraph { Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, Arc::clone(&child), ])?; - let expected_err = InvariantChecker::new(&rule) + let expected_err = OptimizationInvariantChecker::new(&rule) .check(&invalid_plan, ok_plan.schema()) .unwrap_err(); assert!(expected_err .to_string() - .contains("extension node failed it's user-defined invariant check")); + .contains("extension node failed it's user-defined always-invariant check")); + + Ok(()) + } + + /// Extension Node which fails the [`InvariantChecker`] + /// if, and only if, [`InvariantLevel::Executable`] + #[derive(Debug)] + struct ExecutableInvariantFails; + impl ExecutionPlan for ExecutableInvariantFails { + fn name(&self) -> &str { + "ExecutableInvariantFails" + } + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + match check { + InvariantLevel::Always => Ok(()), + InvariantLevel::Executable => plan_err!( + "extension node failed it's user-defined executable-invariant check" + ), + } + } + fn schema(&self) -> SchemaRef { + Arc::new(Schema::empty()) + } + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + fn children(&self) -> Vec<&Arc> { + vec![] + } + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + } + impl DisplayAs for ExecutableInvariantFails { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.name()) + } + } + + #[test] + fn test_invariant_checker_levels() -> Result<()> { + // plan that passes the always-invariant, but fails the executable check + let plan: Arc = Arc::new(ExecutableInvariantFails); + + // Test: check should pass with less stringent Always check + InvariantChecker(InvariantLevel::Always).check(&plan)?; + + // Test: should fail the executable check + let expected_err = InvariantChecker(InvariantLevel::Executable) + .check(&plan) + .unwrap_err(); + assert!(expected_err.to_string().contains( + "extension node failed it's user-defined executable-invariant check" + )); + + // Test: should fail when descendent extension node fails + let failing_node: Arc = Arc::new(ExecutableInvariantFails); + let ok_node: Arc = Arc::new(OkExtensionNode(vec![])); + let child = Arc::clone(&ok_node); + let plan = ok_node.with_new_children(vec![ + Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, + Arc::clone(&child), + ])?; + let expected_err = InvariantChecker(InvariantLevel::Executable) + .check(&plan) + .unwrap_err(); + assert!(expected_err.to_string().contains( + "extension node failed it's user-defined executable-invariant check" + )); Ok(()) } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 98e188a11d7e..047e1761bce5 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -115,8 +115,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// A default set of invariants is provided in the default implementation. /// Extension nodes can provide their own invariants. - fn check_node_invariants(&self) -> Result<()> { - // TODO + fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { Ok(()) } @@ -434,6 +433,17 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } } +#[derive(Clone, Copy)] +pub enum InvariantLevel { + /// Invariants that are always true for the [`ExecutionPlan`] node + /// such as the number of expected children. + Always, + /// Invariants that must hold true for the [`ExecutionPlan`] node + /// to be "executable", such as ordering and/or distribution requirements + /// being fulfilled. + Executable, +} + /// Extension trait provides an easy API to fetch various properties of /// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`]. pub trait ExecutionPlanProperties { From 7b2f54bd4a47aae7e487c039da356d66eae9248e Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 18 Jan 2025 22:39:16 -0800 Subject: [PATCH 09/10] feat: provide invariant for UnionExec --- datafusion/physical-plan/src/union.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index cfa919425c54..bcd9572f45c7 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -32,14 +32,16 @@ use super::{ ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; +use crate::execution_plan::{ + boundedness_from_children, emission_type_from_children, InvariantLevel, +}; use crate::metrics::BaselineMetrics; use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{calculate_union, EquivalenceProperties}; @@ -172,6 +174,14 @@ impl ExecutionPlan for UnionExec { &self.cache } + fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + (self.inputs().len() >= 2) + .then_some(()) + .ok_or(DataFusionError::Internal( + "UnionExec should have at least 2 children".into(), + )) + } + fn children(&self) -> Vec<&Arc> { self.inputs.iter().collect() } From 8da07d085b1b0a26acabba86b22e19b1d84493d2 Mon Sep 17 00:00:00 2001 From: wiedld Date: Sun, 19 Jan 2025 15:42:48 -0800 Subject: [PATCH 10/10] chore: update docs and error messages --- datafusion/core/src/physical_planner.rs | 8 +++++--- datafusion/physical-plan/src/execution_plan.rs | 5 +++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 8b0685b1a509..64f1318098be 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2034,7 +2034,7 @@ impl<'a> OptimizationInvariantChecker<'a> { ) -> Result<()> { // if the rule is not permitted to change the schema, confirm that it did not change. if self.rule.schema_check() && plan.schema() != previous_schema { - internal_err!("PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}", + internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}", self.rule.name(), previous_schema, plan.schema() @@ -2055,7 +2055,9 @@ impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> { fn f_down(&mut self, node: &'n Self::Node) -> Result { // Checks for the more permissive `InvariantLevel::Always`. // Plans are not guarenteed to be executable after each physical optimizer run. - node.check_invariants(InvariantLevel::Always).map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?; + node.check_invariants(InvariantLevel::Always).map_err(|e| + e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())) + )?; Ok(TreeNodeRecursion::Continue) } } @@ -2989,7 +2991,7 @@ digraph { let expected_err = OptimizationInvariantChecker::new(&rule) .check(&ok_plan, different_schema) .unwrap_err(); - assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema")); + assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed. Schema mismatch. Expected original schema")); // Test: should fail when extension node fails it's own invariant check let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 047e1761bce5..753234c09994 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -433,6 +433,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } } +/// [`ExecutionPlan`] Invariant Level +/// +/// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan` +/// +/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science #[derive(Clone, Copy)] pub enum InvariantLevel { /// Invariants that are always true for the [`ExecutionPlan`] node