From d13b5256fc4ba6d6c4f165991d60d1621b3c8d97 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Thu, 4 Sep 2025 18:11:05 +0200 Subject: [PATCH] #17411 Relax constraint that file sort order must only reference individual columns --- datafusion/catalog/src/stream.rs | 17 ++-- .../core/src/datasource/listing/table.rs | 83 +++++++++++----- datafusion/datasource/src/file_scan_config.rs | 40 +++----- datafusion/datasource/src/memory.rs | 24 +---- .../physical-expr/src/equivalence/class.rs | 5 + .../physical-expr/src/equivalence/mod.rs | 2 +- .../src/equivalence/projection.rs | 94 ++++++++++++++++++- datafusion/physical-expr/src/lib.rs | 6 +- datafusion/physical-expr/src/physical_expr.rs | 29 +++++- .../sqllogictest/data/composite_order.csv | 8 ++ datafusion/sqllogictest/test_files/order.slt | 37 ++++++++ 11 files changed, 257 insertions(+), 88 deletions(-) create mode 100644 datafusion/sqllogictest/data/composite_order.csv diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 2d66ff4628b9..f4a2338b8eec 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -34,7 +34,7 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType}; -use datafusion_physical_expr::create_ordering; +use datafusion_physical_expr::create_lex_ordering; use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; @@ -321,17 +321,21 @@ impl TableProvider for StreamTable { async fn scan( &self, - _state: &dyn Session, + state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], limit: Option, ) -> Result> { let projected_schema = match projection { Some(p) => { - let projected = self.0.source.schema().project(p)?; - create_ordering(&projected, &self.0.order)? + let projected = Arc::new(self.0.source.schema().project(p)?); + create_lex_ordering(&projected, &self.0.order, state.execution_props())? } - None => create_ordering(self.0.source.schema(), &self.0.order)?, + None => create_lex_ordering( + self.0.source.schema(), + &self.0.order, + state.execution_props(), + )?, }; Ok(Arc::new(StreamingTableExec::try_new( @@ -351,7 +355,8 @@ impl TableProvider for StreamTable { _insert_op: InsertOp, ) -> Result> { let schema = self.0.source.schema(); - let orders = create_ordering(schema, &self.0.order)?; + let orders = + create_lex_ordering(schema, &self.0.order, _state.execution_props())?; // It is sufficient to pass only one of the equivalent orderings: let ordering = orders.into_iter().next().map(Into::into); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 18d84c4ba0c2..27101c7140f7 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -23,7 +23,7 @@ use super::{ }; use crate::{ datasource::file_format::{file_compression_type::FileCompressionType, FileFormat}, - datasource::{create_ordering, physical_plan::FileSinkConfig}, + datasource::physical_plan::FileSinkConfig, execution::context::SessionState, }; use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; @@ -45,9 +45,11 @@ use datafusion_execution::{ cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache}, config::SessionConfig, }; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{ dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType, }; +use datafusion_physical_expr::create_lex_ordering; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; @@ -55,6 +57,7 @@ use futures::{future, stream, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; + /// Indicates the source of the schema for a [`ListingTable`] // PartialEq required for assert_eq! in tests #[derive(Debug, Clone, Copy, PartialEq, Default)] @@ -1129,8 +1132,15 @@ impl ListingTable { } /// If file_sort_order is specified, creates the appropriate physical expressions - fn try_create_output_ordering(&self) -> Result> { - create_ordering(&self.table_schema, &self.options.file_sort_order) + fn try_create_output_ordering( + &self, + execution_props: &ExecutionProps, + ) -> Result> { + create_lex_ordering( + &self.table_schema, + &self.options.file_sort_order, + execution_props, + ) } } @@ -1219,7 +1229,7 @@ impl TableProvider for ListingTable { return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } - let output_ordering = self.try_create_output_ordering()?; + let output_ordering = self.try_create_output_ordering(state.execution_props())?; match state .config_options() .execution @@ -1359,7 +1369,7 @@ impl TableProvider for ListingTable { file_extension: self.options().format.get_ext(), }; - let orderings = self.try_create_output_ordering()?; + let orderings = self.try_create_output_ordering(state.execution_props())?; // It is sufficient to pass only one of the equivalent orderings: let order_requirements = orderings.into_iter().next().map(Into::into); @@ -1587,6 +1597,7 @@ mod tests { SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; + use datafusion_physical_expr::expressions::binary; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::{collect, ExecutionPlanProperties}; use rstest::rstest; @@ -1719,29 +1730,44 @@ mod tests { use crate::datasource::file_format::parquet::ParquetFormat; use datafusion_physical_plan::expressions::col as physical_col; + use datafusion_physical_plan::expressions::lit as physical_lit; use std::ops::Add; // (file_sort_order, expected_result) let cases = vec![ - (vec![], Ok(Vec::::new())), + ( + vec![], + Ok::, DataFusionError>(Vec::::new()), + ), // sort expr, but non column ( - vec![vec![ - col("int_col").add(lit(1)).sort(true, true), - ]], - Err("Expected single column reference in sort_order[0][0], got int_col + Int32(1)"), + vec![vec![col("int_col").add(lit(1)).sort(true, true)]], + Ok(vec![[PhysicalSortExpr { + expr: binary( + physical_col("int_col", &schema).unwrap(), + Operator::Plus, + physical_lit(1), + &schema, + ) + .unwrap(), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }] + .into()]), ), // ok with one column ( vec![vec![col("string_col").sort(true, false)]], Ok(vec![[PhysicalSortExpr { - expr: physical_col("string_col", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: false, - }, - }].into(), - ]) + expr: physical_col("string_col", &schema).unwrap(), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }] + .into()]), ), // ok with two columns, different options ( @@ -1750,14 +1776,18 @@ mod tests { col("int_col").sort(false, true), ]], Ok(vec![[ - PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap()) - .asc() - .nulls_last(), - PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap()) - .desc() - .nulls_first() - ].into(), - ]) + PhysicalSortExpr::new_default( + physical_col("string_col", &schema).unwrap(), + ) + .asc() + .nulls_last(), + PhysicalSortExpr::new_default( + physical_col("int_col", &schema).unwrap(), + ) + .desc() + .nulls_first(), + ] + .into()]), ), ]; @@ -1770,7 +1800,8 @@ mod tests { let table = ListingTable::try_new(config.clone()).expect("Creating the table"); - let ordering_result = table.try_create_output_ordering(); + let ordering_result = + table.try_create_output_ordering(state.execution_props()); match (expected_result, ordering_result) { (Ok(expected), Ok(result)) => { diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4e2235eae8fe..ea80e3a6a92b 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -18,11 +18,6 @@ //! [`FileScanConfig`] to configure scanning of possibly partitioned //! file sources. -use std::{ - any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter, - fmt::Result as FmtResult, marker::PhantomData, sync::Arc, -}; - use crate::file_groups::FileGroup; #[allow(unused_imports)] use crate::schema_adapter::SchemaAdapterFactory; @@ -52,7 +47,7 @@ use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_co use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::projection::ProjectionExpr; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, @@ -63,7 +58,12 @@ use datafusion_physical_plan::{ use datafusion_physical_plan::{ filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation, }; +use std::{ + any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter, + fmt::Result as FmtResult, marker::PhantomData, sync::Arc, +}; +use datafusion_physical_expr::equivalence::project_orderings; use datafusion_physical_plan::coop::cooperative; use datafusion_physical_plan::execution_plan::SchedulingType; use log::{debug, warn}; @@ -1384,30 +1384,11 @@ fn get_projected_output_ordering( base_config: &FileScanConfig, projected_schema: &SchemaRef, ) -> Vec { - let mut all_orderings = vec![]; - for output_ordering in &base_config.output_ordering { - let mut new_ordering = vec![]; - for PhysicalSortExpr { expr, options } in output_ordering.iter() { - if let Some(col) = expr.as_any().downcast_ref::() { - let name = col.name(); - if let Some((idx, _)) = projected_schema.column_with_name(name) { - // Compute the new sort expression (with correct index) after projection: - new_ordering.push(PhysicalSortExpr::new( - Arc::new(Column::new(name, idx)), - *options, - )); - continue; - } - } - // Cannot find expression in the projected_schema, stop iterating - // since rest of the orderings are violated - break; - } - - let Some(new_ordering) = LexOrdering::new(new_ordering) else { - continue; - }; + let projected_orderings = + project_orderings(&base_config.output_ordering, projected_schema); + let mut all_orderings = vec![]; + for new_ordering in projected_orderings { // Check if any file groups are not sorted if base_config.file_groups.iter().any(|group| { if group.len() <= 1 { @@ -1480,6 +1461,7 @@ mod tests { use datafusion_expr::{Operator, SortExpr}; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; /// Returns the column names on the schema pub fn columns(schema: &Schema) -> Vec { diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 564033438faf..eb55aa9b0b0d 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -30,10 +30,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::equivalence::{ - OrderingEquivalenceClass, ProjectionMapping, -}; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::equivalence::project_orderings; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::memory::MemoryStream; @@ -433,22 +430,9 @@ impl MemorySourceConfig { } // If there is a projection on the source, we also need to project orderings - if let Some(projection) = &self.projection { - let base_schema = self.original_schema(); - let proj_exprs = projection.iter().map(|idx| { - let name = base_schema.field(*idx).name(); - (Arc::new(Column::new(name, *idx)) as _, name.to_string()) - }); - let projection_mapping = - ProjectionMapping::try_new(proj_exprs, &base_schema)?; - let base_eqp = EquivalenceProperties::new_with_orderings( - Arc::clone(&base_schema), - sort_information, - ); - let proj_eqp = - base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema)); - let oeq_class: OrderingEquivalenceClass = proj_eqp.into(); - sort_information = oeq_class.into(); + if self.projection.is_some() { + sort_information = + project_orderings(&sort_information, &self.projected_schema); } self.sort_information = sort_information; diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 8af6f3be0389..66ce77ef415e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -590,6 +590,11 @@ impl EquivalenceGroup { aug_mapping: &AugmentedMapping, expr: &Arc, ) -> Option> { + // Literals don't need to be projected + if expr.as_any().downcast_ref::().is_some() { + return Some(Arc::clone(expr)); + } + // The given expression is not inside the mapping, so we try to project // indirectly using equivalence classes. for (targets, eq_class) in aug_mapping.values() { diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index ecb73be256d4..bcc6835e2f6c 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -30,7 +30,7 @@ mod properties; pub use class::{AcrossPartitions, ConstExpr, EquivalenceClass, EquivalenceGroup}; pub use ordering::OrderingEquivalenceClass; -pub use projection::ProjectionMapping; +pub use projection::{project_ordering, project_orderings, ProjectionMapping}; pub use properties::{ calculate_union, join_equivalence_properties, EquivalenceProperties, }; diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 6fe56052292f..a4ed8187cfad 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -23,8 +23,9 @@ use crate::PhysicalExpr; use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{internal_err, plan_err, Result}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use indexmap::IndexMap; /// Stores target expressions, along with their indices, that associate with a @@ -156,6 +157,97 @@ impl FromIterator<(Arc, ProjectionTargets)> for ProjectionMapp } } +/// Projects a slice of [LexOrdering]s onto the given schema. +/// +/// This is a convenience wrapper that applies [project_ordering] to each +/// input ordering and collects the successful projections: +/// - For each input ordering, the result of [project_ordering] is appended to +/// the output if it is `Some(...)`. +/// - Order is preserved and no deduplication is attempted. +/// - If none of the input orderings can be projected, an empty `Vec` is +/// returned. +/// +/// See [project_ordering] for the semantics of projecting a single +/// [LexOrdering]. +pub fn project_orderings( + orderings: &[LexOrdering], + schema: &SchemaRef, +) -> Vec { + let mut projected_orderings = vec![]; + + for ordering in orderings { + projected_orderings.extend(project_ordering(ordering, schema)); + } + + projected_orderings +} + +/// Projects a single [LexOrdering] onto the given schema. +/// +/// This function attempts to rewrite every [PhysicalSortExpr] in the provided +/// [LexOrdering] so that any [Column] expressions point at the correct field +/// indices in `schema`. +/// +/// Key details: +/// - Columns are matched by name, not by index. The index of each matched +/// column is looked up with [Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new +/// [Column] with the correct [index](Column::index) is substituted. +/// - If an expression references a column name that does not exist in +/// `schema`, projection of the current ordering stops and only the already +/// rewritten prefix is kept. This models the fact that a lexicographical +/// ordering remains valid for any leading prefix whose expressions are +/// present in the projected schema. +/// - If no expressions can be projected (i.e. the first one is missing), the +/// function returns `None`. +/// +/// Return value: +/// - `Some(LexOrdering)` if at least one sort expression could be projected. +/// The returned ordering may be a strict prefix of the input ordering. +/// - `None` if no part of the ordering can be projected onto `schema`. +/// +/// Example +/// +/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the projected +/// schema only contains b and not a. The result will be `Some([col("a@0")])`. In other +/// words, the column reference is reindexed to match the projected schema. +/// If neither a nor b is present, the result will be None. +pub fn project_ordering( + ordering: &LexOrdering, + schema: &SchemaRef, +) -> Option { + let mut projected_exprs = vec![]; + for PhysicalSortExpr { expr, options } in ordering.iter() { + let transformed = Arc::clone(expr).transform_up(|expr| { + let Some(col) = expr.as_any().downcast_ref::() else { + return Ok(Transformed::no(expr)); + }; + + let name = col.name(); + if let Some((idx, _)) = schema.column_with_name(name) { + // Compute the new column expression (with correct index) after projection: + Ok(Transformed::yes(Arc::new(Column::new(name, idx)))) + } else { + // Cannot find expression in the projected_schema, + // signal this using an Err result + plan_err!("") + } + }); + + match transformed { + Ok(transformed) => { + projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options)); + } + Err(_) => { + // Err result indicates an expression could not be found in the + // projected_schema, stop iterating since rest of the orderings are violated + break; + } + } + } + + LexOrdering::new(projected_exprs) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 46f7b30d01aa..ecba3da3bce5 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -56,9 +56,9 @@ pub use equivalence::{ }; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ - add_offset_to_expr, add_offset_to_physical_sort_exprs, create_ordering, - create_physical_sort_expr, create_physical_sort_exprs, physical_exprs_bag_equal, - physical_exprs_contains, physical_exprs_equal, + add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, + create_ordering, create_physical_sort_expr, create_physical_sort_exprs, + physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, }; pub use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, PhysicalExprRef}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 3f063d7a030f..2cc484ec6a62 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -21,7 +21,7 @@ use crate::expressions::{self, Column}; use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr}; use arrow::compute::SortOptions; -use arrow::datatypes::Schema; +use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_err, Result}; use datafusion_common::{DFSchema, HashMap}; @@ -29,7 +29,6 @@ use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{Expr, SortExpr}; use itertools::izip; - // Exports: pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr; @@ -163,6 +162,32 @@ pub fn create_ordering( Ok(all_sort_orders) } +/// Creates a vector of [LexOrdering] from a vector of logical expression +pub fn create_lex_ordering( + schema: &SchemaRef, + sort_order: &[Vec], + execution_props: &ExecutionProps, +) -> Result> { + // Try the fast path that only supports column references first + // This avoids creating a DFSchema + if let Ok(ordering) = create_ordering(schema, sort_order) { + return Ok(ordering); + } + + let df_schema = DFSchema::try_from(Arc::clone(schema))?; + + let mut all_sort_orders = vec![]; + + for exprs in sort_order.iter() { + all_sort_orders.extend(LexOrdering::new(create_physical_sort_exprs( + exprs, + &df_schema, + execution_props, + )?)); + } + Ok(all_sort_orders) +} + /// Create a physical sort expression from a logical expression pub fn create_physical_sort_expr( e: &SortExpr, diff --git a/datafusion/sqllogictest/data/composite_order.csv b/datafusion/sqllogictest/data/composite_order.csv new file mode 100644 index 000000000000..b2c5e881bd60 --- /dev/null +++ b/datafusion/sqllogictest/data/composite_order.csv @@ -0,0 +1,8 @@ +a,b +1,0 +0,2 +1,2 +0,4 +5,0 +3,3 +4,3 diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 1050b5961361..04a7615c764b 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1517,3 +1517,40 @@ SELECT address, zip FROM addresses ORDER BY ALL; 111 Duck Duck Goose Ln 11111 111 Duck Duck Goose Ln 11111-0001 123 Quack Blvd 11111 + +# Create a table with an order clause that's not a simple column reference +statement ok +CREATE EXTERNAL TABLE ordered ( + a BIGINT NOT NULL, + b BIGINT NOT NULL +) +STORED AS CSV +LOCATION 'data/composite_order.csv' +OPTIONS ('format.has_header' 'true') +WITH ORDER (a + b); + +# Simple query should be just a table scan +query TT +EXPLAIN SELECT * from ordered; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, has_header=true + +# Query ordered by the declared order should be just a table scan +query TT +EXPLAIN SELECT * from ordered ORDER BY (a + b); +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, has_header=true + +# Order equivalence handling should make this query a simple table scan +query TT +EXPLAIN SELECT * from ordered ORDER BY -(a + b) desc nulls last; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, has_header=true + +# Ordering by another column requires a sort +query TT +EXPLAIN SELECT * from ordered ORDER BY a; +---- +physical_plan +01)SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, has_header=true