From dbf832a76084f5fb7b4776bab3a242387eaa772b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 14 Aug 2025 11:40:22 +0200 Subject: [PATCH 1/6] Drop redundant param from WindowFunctionDefinition::return_field It became redundant when `input_expr_types: &[DataType]` was replaced with `input_expr_fields: &[Field]`, i.e. in 577c42458e8ab7a0869345678e7f0cd5dbf82771 commit. The benefit of passing in a "field" is that it combines data type and nullable attribute. --- datafusion/expr/src/expr.rs | 1 - datafusion/physical-plan/src/windows/mod.rs | 7 +------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index dbae1058890a..046f2445cc90 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1039,7 +1039,6 @@ impl WindowFunctionDefinition { pub fn return_field( &self, input_expr_fields: &[FieldRef], - _input_expr_nullable: &[bool], display_name: &str, ) -> Result { match self { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 085b17cab9bc..519bed3a8460 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -69,12 +69,7 @@ pub fn schema_add_window_field( .iter() .map(|e| Arc::clone(e).as_ref().return_field(schema)) .collect::>>()?; - let nullability = args - .iter() - .map(|e| Arc::clone(e).as_ref().nullable(schema)) - .collect::>>()?; - let window_expr_return_field = - window_fn.return_field(&fields, &nullability, fn_name)?; + let window_expr_return_field = window_fn.return_field(&fields, fn_name)?; let mut window_fields = schema .fields() .iter() From c30a1ed9fc0fa6acf6f1403b788beda869ca2190 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 14 Aug 2025 12:06:18 +0200 Subject: [PATCH 2/6] Fix FieldMetadata doc string --- datafusion/expr/src/expr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 046f2445cc90..00b63e84344a 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -449,7 +449,7 @@ pub struct FieldMetadata { /// The inner metadata of a literal expression, which is a map of string /// keys to string values. /// - /// Note this is not a `HashMap because `HashMap` does not provide + /// Note this is not a `HashMap` because `HashMap` does not provide /// implementations for traits like `Debug` and `Hash`. inner: Arc>, } From 5af907520706e9f3efc693db43ca45bbdab08cb1 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 14 Aug 2025 13:50:33 +0200 Subject: [PATCH 3/6] Avoid Arrow schema clones when constructing DFSchema --- datafusion-examples/examples/pruning.rs | 4 ++-- datafusion/catalog/src/memory/table.rs | 2 +- datafusion/catalog/src/streaming.rs | 2 +- .../tests/dataframe/dataframe_functions.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 2 +- datafusion/optimizer/src/push_down_filter.rs | 4 +--- .../simplify_expressions/expr_simplifier.rs | 18 +++++++++++------- datafusion/optimizer/src/utils.rs | 10 +++++++--- datafusion/pruning/src/pruning_predicate.rs | 1 + datafusion/sql/src/statement.rs | 3 +-- datafusion/sql/tests/cases/plan_to_sql.rs | 2 +- 11 files changed, 28 insertions(+), 22 deletions(-) diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/pruning.rs index b2d2fa13b7ed..9a61789662cd 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/pruning.rs @@ -187,10 +187,10 @@ impl PruningStatistics for MyCatalog { } fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate { - let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap(); + let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap(); let props = ExecutionProps::new(); let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); - PruningPredicate::try_new(physical_expr, schema.clone()).unwrap() + PruningPredicate::try_new(physical_expr, Arc::clone(schema)).unwrap() } fn i32_array<'a>(values: impl Iterator>) -> ArrayRef { diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 63b626fb6cce..90224f6a37bc 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -242,7 +242,7 @@ impl TableProvider for MemTable { // add sort information if present let sort_order = self.sort_order.lock(); if !sort_order.is_empty() { - let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?; + let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?; let eqp = state.execution_props(); let mut file_sort_order = vec![]; diff --git a/datafusion/catalog/src/streaming.rs b/datafusion/catalog/src/streaming.rs index 6ab95266e49d..082e74dab9a1 100644 --- a/datafusion/catalog/src/streaming.rs +++ b/datafusion/catalog/src/streaming.rs @@ -102,7 +102,7 @@ impl TableProvider for StreamingTable { limit: Option, ) -> Result> { let physical_sort = if !self.sort_order.is_empty() { - let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?; + let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?; let eqp = state.execution_props(); create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)? diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index 40590d74ad91..be49b88a99cc 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -1215,7 +1215,7 @@ async fn test_fn_decode() -> Result<()> { // Note that the decode function returns binary, and the default display of // binary is "hexadecimal" and therefore the output looks like decode did // nothing. So compare to a constant. - let df_schema = DFSchema::try_from(test_schema().as_ref().clone())?; + let df_schema = DFSchema::try_from(test_schema())?; let expr = decode(encode(col("a"), lit("hex")), lit("hex")) // need to cast to utf8 otherwise the default display of binary array is hex // so it looks like nothing is done diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4d03c46cf5aa..91a8d08702c2 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1968,7 +1968,7 @@ mod tests { .map(|expr| { create_physical_sort_expr( &expr, - &DFSchema::try_from(table_schema.as_ref().clone())?, + &DFSchema::try_from(Arc::clone(&table_schema))?, &ExecutionProps::default(), ) }) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 62ee8c65bc1c..27c2499c8a26 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3051,9 +3051,7 @@ mod tests { let table_scan = LogicalPlan::TableScan(TableScan { table_name: "test".into(), filters, - projected_schema: Arc::new(DFSchema::try_from( - (*test_provider.schema()).clone(), - )?), + projected_schema: Arc::new(DFSchema::try_from(test_provider.schema())?), projection, source: Arc::new(test_provider), fetch: None, diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 83b50509cd58..3de5a80a9782 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -17,15 +17,15 @@ //! Expression simplification API -use std::borrow::Cow; -use std::collections::HashSet; -use std::ops::Not; - use arrow::{ array::{new_null_array, AsArray}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; +use std::borrow::Cow; +use std::collections::HashSet; +use std::ops::Not; +use std::sync::Arc; use datafusion_common::{ cast::{as_large_list_array, as_list_array}, @@ -589,11 +589,15 @@ impl<'a> ConstEvaluator<'a> { // The dummy column name is unused and doesn't matter as only // expressions without column references can be evaluated static DUMMY_COL_NAME: &str = "."; - let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]); - let input_schema = DFSchema::try_from(schema.clone())?; + let schema = Arc::new(Schema::new(vec![Field::new( + DUMMY_COL_NAME, + DataType::Null, + true, + )])); + let input_schema = DFSchema::try_from(Arc::clone(&schema))?; // Need a single "input" row to produce a single output row let col = new_null_array(&DataType::Null, 1); - let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?; + let input_batch = RecordBatch::try_new(schema, vec![col])?; Ok(Self { can_evaluate: vec![], diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 0aa0bf3ea430..81763fa0552f 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -124,10 +124,14 @@ fn evaluate_expr_with_null_column<'a>( null_columns: impl IntoIterator, ) -> Result { static DUMMY_COL_NAME: &str = "?"; - let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]); - let input_schema = DFSchema::try_from(schema.clone())?; + let schema = Arc::new(Schema::new(vec![Field::new( + DUMMY_COL_NAME, + DataType::Null, + true, + )])); + let input_schema = DFSchema::try_from(Arc::clone(&schema))?; let column = new_null_array(&DataType::Null, 1); - let input_batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![column])?; + let input_batch = RecordBatch::try_new(schema, vec![column])?; let execution_props = ExecutionProps::default(); let null_column = Column::from_name(DUMMY_COL_NAME); diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index e863f57f8d16..fa1a8515b264 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -978,6 +978,7 @@ impl<'a> PruningExpressionBuilder<'a> { } }; + // TODO pass in SchemaRef so we don't need to clone the schema let df_schema = DFSchema::try_from(schema.clone())?; let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable( column_expr, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 0fef18ac55f8..68be37f49838 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1970,8 +1970,7 @@ impl SqlToRel<'_, S> { // Do a table lookup to verify the table exists let table_name = self.object_name_to_table_reference(table_name)?; let table_source = self.context_provider.get_table_source(table_name.clone())?; - let arrow_schema = (*table_source.schema()).clone(); - let table_schema = DFSchema::try_from(arrow_schema)?; + let table_schema = DFSchema::try_from(table_source.schema())?; // Get insert fields and target table's value indices // diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 22924540ba6b..fe6a98d5c2d0 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -94,7 +94,7 @@ fn roundtrip_expr(table: TableReference, sql: &str) -> Result { let state = MockSessionState::default().with_aggregate_function(sum_udaf()); let context = MockContextProvider { state }; let schema = context.get_table_source(table)?.schema(); - let df_schema = DFSchema::try_from(schema.as_ref().clone())?; + let df_schema = DFSchema::try_from(schema)?; let sql_to_rel = SqlToRel::new(&context); let expr = sql_to_rel.sql_to_expr(sql_expr, &df_schema, &mut PlannerContext::new())?; From c31ab2e7fda4a0230021b582844d506cfeaa8bf3 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 14 Aug 2025 14:37:31 +0200 Subject: [PATCH 4/6] Remove redundant `plan` from extension's check_invariants In `UserDefinedLogicalNode::check_invariants`, the actual plan to check for invariants is `self`. The `plan` is always `LogicalPlan::Extension` and provides no further information. It's confusing. --- .../core/tests/user_defined/user_defined_plan.rs | 2 +- datafusion/expr/src/logical_plan/extension.rs | 12 ++++-------- datafusion/expr/src/logical_plan/invariants.rs | 2 +- .../substrait/tests/cases/roundtrip_logical_plan.rs | 6 +----- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 4d3916c1760e..f0bf15d3483b 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -580,7 +580,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { self.input.schema() } - fn check_invariants(&self, check: InvariantLevel, _plan: &LogicalPlan) -> Result<()> { + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { if let Some(InvariantMock { should_fail_invariant, kind, diff --git a/datafusion/expr/src/logical_plan/extension.rs b/datafusion/expr/src/logical_plan/extension.rs index 5bf64a36a654..e10b74098b68 100644 --- a/datafusion/expr/src/logical_plan/extension.rs +++ b/datafusion/expr/src/logical_plan/extension.rs @@ -57,7 +57,7 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { fn schema(&self) -> &DFSchemaRef; /// Perform check of invariants for the extension node. - fn check_invariants(&self, check: InvariantLevel, plan: &LogicalPlan) -> Result<()>; + fn check_invariants(&self, check: InvariantLevel) -> Result<()>; /// Returns all expressions in the current logical plan node. This should /// not include expressions of any inputs (aka non-recursively). @@ -241,11 +241,7 @@ pub trait UserDefinedLogicalNodeCore: /// Perform check of invariants for the extension node. /// /// This is the default implementation for extension nodes. - fn check_invariants( - &self, - _check: InvariantLevel, - _plan: &LogicalPlan, - ) -> Result<()> { + fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { Ok(()) } @@ -334,8 +330,8 @@ impl UserDefinedLogicalNode for T { self.schema() } - fn check_invariants(&self, check: InvariantLevel, plan: &LogicalPlan) -> Result<()> { - self.check_invariants(check, plan) + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + self.check_invariants(check) } fn expressions(&self) -> Vec { diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index d8d6739b0e8f..2d8ed0717102 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -74,7 +74,7 @@ pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> { fn assert_valid_extension_nodes(plan: &LogicalPlan, check: InvariantLevel) -> Result<()> { plan.apply_with_subqueries(|plan: &LogicalPlan| { if let LogicalPlan::Extension(Extension { node }) = plan { - node.check_invariants(check, plan)?; + node.check_invariants(check)?; } plan.apply_expressions(|expr| { // recursively look for subqueries diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 5ec6663347e2..6e4e6ea59d07 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -112,11 +112,7 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan { &self.empty_schema } - fn check_invariants( - &self, - _check: InvariantLevel, - _plan: &LogicalPlan, - ) -> Result<()> { + fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { Ok(()) } From 3f59b5e414954a1c5296337cd2702b4e3b314a0d Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 14 Aug 2025 16:31:21 +0200 Subject: [PATCH 5/6] Validate schema in TryFrom --- datafusion/common/src/dfschema.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 88303bbcd7d2..daf4e19ce0f6 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -899,6 +899,7 @@ impl TryFrom for DFSchema { field_qualifiers: vec![None; field_count], functional_dependencies: FunctionalDependencies::empty(), }; + dfschema.check_names()?; Ok(dfschema) } } From ddaa0d30e0c15aacf601830f69be44a536a83598 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 14 Aug 2025 21:00:05 +0200 Subject: [PATCH 6/6] Revert "Remove redundant `plan` from extension's check_invariants" This reverts commit c31ab2e7fda4a0230021b582844d506cfeaa8bf3. --- .../core/tests/user_defined/user_defined_plan.rs | 2 +- datafusion/expr/src/logical_plan/extension.rs | 12 ++++++++---- datafusion/expr/src/logical_plan/invariants.rs | 2 +- .../substrait/tests/cases/roundtrip_logical_plan.rs | 6 +++++- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index f0bf15d3483b..4d3916c1760e 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -580,7 +580,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { self.input.schema() } - fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + fn check_invariants(&self, check: InvariantLevel, _plan: &LogicalPlan) -> Result<()> { if let Some(InvariantMock { should_fail_invariant, kind, diff --git a/datafusion/expr/src/logical_plan/extension.rs b/datafusion/expr/src/logical_plan/extension.rs index e10b74098b68..5bf64a36a654 100644 --- a/datafusion/expr/src/logical_plan/extension.rs +++ b/datafusion/expr/src/logical_plan/extension.rs @@ -57,7 +57,7 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { fn schema(&self) -> &DFSchemaRef; /// Perform check of invariants for the extension node. - fn check_invariants(&self, check: InvariantLevel) -> Result<()>; + fn check_invariants(&self, check: InvariantLevel, plan: &LogicalPlan) -> Result<()>; /// Returns all expressions in the current logical plan node. This should /// not include expressions of any inputs (aka non-recursively). @@ -241,7 +241,11 @@ pub trait UserDefinedLogicalNodeCore: /// Perform check of invariants for the extension node. /// /// This is the default implementation for extension nodes. - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + fn check_invariants( + &self, + _check: InvariantLevel, + _plan: &LogicalPlan, + ) -> Result<()> { Ok(()) } @@ -330,8 +334,8 @@ impl UserDefinedLogicalNode for T { self.schema() } - fn check_invariants(&self, check: InvariantLevel) -> Result<()> { - self.check_invariants(check) + fn check_invariants(&self, check: InvariantLevel, plan: &LogicalPlan) -> Result<()> { + self.check_invariants(check, plan) } fn expressions(&self) -> Vec { diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 2d8ed0717102..d8d6739b0e8f 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -74,7 +74,7 @@ pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> { fn assert_valid_extension_nodes(plan: &LogicalPlan, check: InvariantLevel) -> Result<()> { plan.apply_with_subqueries(|plan: &LogicalPlan| { if let LogicalPlan::Extension(Extension { node }) = plan { - node.check_invariants(check)?; + node.check_invariants(check, plan)?; } plan.apply_expressions(|expr| { // recursively look for subqueries diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 6e4e6ea59d07..5ec6663347e2 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -112,7 +112,11 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan { &self.empty_schema } - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + fn check_invariants( + &self, + _check: InvariantLevel, + _plan: &LogicalPlan, + ) -> Result<()> { Ok(()) }