Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions datafusion/catalog/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_expr::create_ordering;
use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
Expand Down Expand Up @@ -321,17 +321,21 @@ impl TableProvider for StreamTable {

async fn scan(
&self,
_state: &dyn Session,
state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let projected_schema = match projection {
Some(p) => {
let projected = self.0.source.schema().project(p)?;
create_ordering(&projected, &self.0.order)?
let projected = Arc::new(self.0.source.schema().project(p)?);
create_lex_ordering(&projected, &self.0.order, state.execution_props())?
}
None => create_ordering(self.0.source.schema(), &self.0.order)?,
None => create_lex_ordering(
self.0.source.schema(),
&self.0.order,
state.execution_props(),
)?,
};

Ok(Arc::new(StreamingTableExec::try_new(
Expand All @@ -351,7 +355,8 @@ impl TableProvider for StreamTable {
_insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = self.0.source.schema();
let orders = create_ordering(schema, &self.0.order)?;
let orders =
create_lex_ordering(schema, &self.0.order, _state.execution_props())?;
// It is sufficient to pass only one of the equivalent orderings:
let ordering = orders.into_iter().next().map(Into::into);

Expand Down
83 changes: 57 additions & 26 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
};
use crate::{
datasource::file_format::{file_compression_type::FileCompressionType, FileFormat},
datasource::{create_ordering, physical_plan::FileSinkConfig},
datasource::physical_plan::FileSinkConfig,
execution::context::SessionState,
};
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
Expand All @@ -45,16 +45,19 @@ use datafusion_execution::{
cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache},
config::SessionConfig,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};

/// Indicates the source of the schema for a [`ListingTable`]
// PartialEq required for assert_eq! in tests
#[derive(Debug, Clone, Copy, PartialEq, Default)]
Expand Down Expand Up @@ -1129,8 +1132,15 @@ impl ListingTable {
}

/// If file_sort_order is specified, creates the appropriate physical expressions
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
create_ordering(&self.table_schema, &self.options.file_sort_order)
fn try_create_output_ordering(
&self,
execution_props: &ExecutionProps,
) -> Result<Vec<LexOrdering>> {
create_lex_ordering(
&self.table_schema,
&self.options.file_sort_order,
execution_props,
)
}
}

Expand Down Expand Up @@ -1219,7 +1229,7 @@ impl TableProvider for ListingTable {
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}

let output_ordering = self.try_create_output_ordering()?;
let output_ordering = self.try_create_output_ordering(state.execution_props())?;
match state
.config_options()
.execution
Expand Down Expand Up @@ -1359,7 +1369,7 @@ impl TableProvider for ListingTable {
file_extension: self.options().format.get_ext(),
};

let orderings = self.try_create_output_ordering()?;
let orderings = self.try_create_output_ordering(state.execution_props())?;
// It is sufficient to pass only one of the equivalent orderings:
let order_requirements = orderings.into_iter().next().map(Into::into);

Expand Down Expand Up @@ -1587,6 +1597,7 @@ mod tests {
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
use datafusion_physical_expr::expressions::binary;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
use rstest::rstest;
Expand Down Expand Up @@ -1719,29 +1730,44 @@ mod tests {

use crate::datasource::file_format::parquet::ParquetFormat;
use datafusion_physical_plan::expressions::col as physical_col;
use datafusion_physical_plan::expressions::lit as physical_lit;
use std::ops::Add;

// (file_sort_order, expected_result)
let cases = vec![
(vec![], Ok(Vec::<LexOrdering>::new())),
(
vec![],
Ok::<Vec<LexOrdering>, DataFusionError>(Vec::<LexOrdering>::new()),
),
// sort expr, but non column
(
vec![vec![
col("int_col").add(lit(1)).sort(true, true),
]],
Err("Expected single column reference in sort_order[0][0], got int_col + Int32(1)"),
vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
Ok(vec![[PhysicalSortExpr {
expr: binary(
physical_col("int_col", &schema).unwrap(),
Operator::Plus,
physical_lit(1),
&schema,
)
.unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
}]
.into()]),
),
// ok with one column
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![[PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}].into(),
])
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}]
.into()]),
),
// ok with two columns, different options
(
Expand All @@ -1750,14 +1776,18 @@ mod tests {
col("int_col").sort(false, true),
]],
Ok(vec![[
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
].into(),
])
PhysicalSortExpr::new_default(
physical_col("string_col", &schema).unwrap(),
)
.asc()
.nulls_last(),
PhysicalSortExpr::new_default(
physical_col("int_col", &schema).unwrap(),
)
.desc()
.nulls_first(),
]
.into()]),
),
];

Expand All @@ -1770,7 +1800,8 @@ mod tests {

let table =
ListingTable::try_new(config.clone()).expect("Creating the table");
let ordering_result = table.try_create_output_ordering();
let ordering_result =
table.try_create_output_ordering(state.execution_props());

match (expected_result, ordering_result) {
(Ok(expected), Ok(result)) => {
Expand Down
42 changes: 12 additions & 30 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
//! [`FileScanConfig`] to configure scanning of possibly partitioned
//! file sources.

use std::{
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
};

use crate::file_groups::FileGroup;
#[allow(unused_imports)]
use crate::schema_adapter::SchemaAdapterFactory;
Expand Down Expand Up @@ -54,16 +49,21 @@ use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::projection::ProjectionExpr;
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
filter_pushdown::FilterPushdownPropagation,
metrics::ExecutionPlanMetricsSet,
projection::{all_alias_free_columns, new_projections_for_columns},
DisplayAs, DisplayFormatType,
};
use std::{
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
};

use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_plan::coop::cooperative;
use datafusion_physical_plan::execution_plan::SchedulingType;
use log::{debug, warn};
Expand Down Expand Up @@ -1371,30 +1371,11 @@ fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<LexOrdering> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
let mut new_ordering = vec![];
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let name = col.name();
if let Some((idx, _)) = projected_schema.column_with_name(name) {
// Compute the new sort expression (with correct index) after projection:
new_ordering.push(PhysicalSortExpr::new(
Arc::new(Column::new(name, idx)),
*options,
));
continue;
}
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
break;
}

let Some(new_ordering) = LexOrdering::new(new_ordering) else {
continue;
};
let projected_orderings =
project_orderings(&base_config.output_ordering, projected_schema);

let mut all_orderings = vec![];
for new_ordering in projected_orderings {
// Check if any file groups are not sorted
if base_config.file_groups.iter().any(|group| {
if group.len() <= 1 {
Expand Down Expand Up @@ -1467,6 +1448,7 @@ mod tests {
use datafusion_expr::{Operator, SortExpr};
use datafusion_physical_expr::create_physical_sort_expr;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

/// Returns the column names on the schema
pub fn columns(schema: &Schema) -> Vec<String> {
Expand Down
24 changes: 4 additions & 20 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::{
OrderingEquivalenceClass, ProjectionMapping,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::memory::MemoryStream;
Expand Down Expand Up @@ -433,22 +430,9 @@ impl MemorySourceConfig {
}

// If there is a projection on the source, we also need to project orderings
if let Some(projection) = &self.projection {
let base_schema = self.original_schema();
let proj_exprs = projection.iter().map(|idx| {
let name = base_schema.field(*idx).name();
(Arc::new(Column::new(name, *idx)) as _, name.to_string())
});
let projection_mapping =
ProjectionMapping::try_new(proj_exprs, &base_schema)?;
let base_eqp = EquivalenceProperties::new_with_orderings(
Arc::clone(&base_schema),
sort_information,
);
let proj_eqp =
base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
sort_information = oeq_class.into();
if self.projection.is_some() {
sort_information =
project_orderings(&sort_information, &self.projected_schema);
}

self.sort_information = sort_information;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,11 @@ impl EquivalenceGroup {
aug_mapping: &AugmentedMapping,
expr: &Arc<dyn PhysicalExpr>,
) -> Option<Arc<dyn PhysicalExpr>> {
// Literals don't need to be projected
if expr.as_any().downcast_ref::<Literal>().is_some() {
return Some(Arc::clone(expr));
}

// The given expression is not inside the mapping, so we try to project
// indirectly using equivalence classes.
for (targets, eq_class) in aug_mapping.values() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod properties;

pub use class::{AcrossPartitions, ConstExpr, EquivalenceClass, EquivalenceGroup};
pub use ordering::OrderingEquivalenceClass;
pub use projection::ProjectionMapping;
pub use projection::{project_ordering, project_orderings, ProjectionMapping};
pub use properties::{
calculate_union, join_equivalence_properties, EquivalenceProperties,
};
Expand Down
Loading