Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-examples/examples/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ impl PruningStatistics for MyCatalog {
}

fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate {
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();
let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
PruningPredicate::try_new(physical_expr, schema.clone()).unwrap()
PruningPredicate::try_new(physical_expr, Arc::clone(schema)).unwrap()
}

fn i32_array<'a>(values: impl Iterator<Item = &'a Option<i32>>) -> ArrayRef {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl TableProvider for MemTable {
// add sort information if present
let sort_order = self.sort_order.lock();
if !sort_order.is_empty() {
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;

let eqp = state.execution_props();
let mut file_sort_order = vec![];
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl TableProvider for StreamingTable {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let physical_sort = if !self.sort_order.is_empty() {
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
let eqp = state.execution_props();

create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ impl TryFrom<SchemaRef> for DFSchema {
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context is that checking names here makes this consistent with the other (fallible) ways to create a DFSchema from a Schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW it seems this has caused a regression for some users in 50.0.0 (there are some code paths that violate an invariant). See #17706

Ok(dfschema)
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ async fn test_fn_decode() -> Result<()> {
// Note that the decode function returns binary, and the default display of
// binary is "hexadecimal" and therefore the output looks like decode did
// nothing. So compare to a constant.
let df_schema = DFSchema::try_from(test_schema().as_ref().clone())?;
let df_schema = DFSchema::try_from(test_schema())?;
let expr = decode(encode(col("a"), lit("hex")), lit("hex"))
// need to cast to utf8 otherwise the default display of binary array is hex
// so it looks like nothing is done
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1968,7 +1968,7 @@ mod tests {
.map(|expr| {
create_physical_sort_expr(
&expr,
&DFSchema::try_from(table_schema.as_ref().clone())?,
&DFSchema::try_from(Arc::clone(&table_schema))?,
&ExecutionProps::default(),
)
})
Expand Down
3 changes: 1 addition & 2 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ pub struct FieldMetadata {
/// The inner metadata of a literal expression, which is a map of string
/// keys to string values.
///
/// Note this is not a `HashMap because `HashMap` does not provide
/// Note this is not a `HashMap` because `HashMap` does not provide
/// implementations for traits like `Debug` and `Hash`.
inner: Arc<BTreeMap<String, String>>,
}
Expand Down Expand Up @@ -1039,7 +1039,6 @@ impl WindowFunctionDefinition {
pub fn return_field(
&self,
input_expr_fields: &[FieldRef],
_input_expr_nullable: &[bool],
display_name: &str,
) -> Result<FieldRef> {
match self {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3051,9 +3051,7 @@ mod tests {
let table_scan = LogicalPlan::TableScan(TableScan {
table_name: "test".into(),
filters,
projected_schema: Arc::new(DFSchema::try_from(
(*test_provider.schema()).clone(),
)?),
projected_schema: Arc::new(DFSchema::try_from(test_provider.schema())?),
projection,
source: Arc::new(test_provider),
fetch: None,
Expand Down
18 changes: 11 additions & 7 deletions datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

//! Expression simplification API

use std::borrow::Cow;
use std::collections::HashSet;
use std::ops::Not;

use arrow::{
array::{new_null_array, AsArray},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use std::borrow::Cow;
use std::collections::HashSet;
use std::ops::Not;
use std::sync::Arc;

use datafusion_common::{
cast::{as_large_list_array, as_list_array},
Expand Down Expand Up @@ -589,11 +589,15 @@ impl<'a> ConstEvaluator<'a> {
// The dummy column name is unused and doesn't matter as only
// expressions without column references can be evaluated
static DUMMY_COL_NAME: &str = ".";
let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]);
let input_schema = DFSchema::try_from(schema.clone())?;
let schema = Arc::new(Schema::new(vec![Field::new(
DUMMY_COL_NAME,
DataType::Null,
true,
)]));
let input_schema = DFSchema::try_from(Arc::clone(&schema))?;
// Need a single "input" row to produce a single output row
let col = new_null_array(&DataType::Null, 1);
let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?;
let input_batch = RecordBatch::try_new(schema, vec![col])?;

Ok(Self {
can_evaluate: vec![],
Expand Down
10 changes: 7 additions & 3 deletions datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,14 @@ fn evaluate_expr_with_null_column<'a>(
null_columns: impl IntoIterator<Item = &'a Column>,
) -> Result<ColumnarValue> {
static DUMMY_COL_NAME: &str = "?";
let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]);
let input_schema = DFSchema::try_from(schema.clone())?;
let schema = Arc::new(Schema::new(vec![Field::new(
DUMMY_COL_NAME,
DataType::Null,
true,
)]));
let input_schema = DFSchema::try_from(Arc::clone(&schema))?;
let column = new_null_array(&DataType::Null, 1);
let input_batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![column])?;
let input_batch = RecordBatch::try_new(schema, vec![column])?;
let execution_props = ExecutionProps::default();
let null_column = Column::from_name(DUMMY_COL_NAME);

Expand Down
7 changes: 1 addition & 6 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ pub fn schema_add_window_field(
.iter()
.map(|e| Arc::clone(e).as_ref().return_field(schema))
.collect::<Result<Vec<_>>>()?;
let nullability = args
.iter()
.map(|e| Arc::clone(e).as_ref().nullable(schema))
.collect::<Result<Vec<_>>>()?;
let window_expr_return_field =
window_fn.return_field(&fields, &nullability, fn_name)?;
let window_expr_return_field = window_fn.return_field(&fields, fn_name)?;
let mut window_fields = schema
.fields()
.iter()
Expand Down
1 change: 1 addition & 0 deletions datafusion/pruning/src/pruning_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ impl<'a> PruningExpressionBuilder<'a> {
}
};

// TODO pass in SchemaRef so we don't need to clone the schema
let df_schema = DFSchema::try_from(schema.clone())?;
let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
column_expr,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1970,8 +1970,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let table_source = self.context_provider.get_table_source(table_name.clone())?;
let arrow_schema = (*table_source.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;
let table_schema = DFSchema::try_from(table_source.schema())?;

// Get insert fields and target table's value indices
//
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn roundtrip_expr(table: TableReference, sql: &str) -> Result<String> {
let state = MockSessionState::default().with_aggregate_function(sum_udaf());
let context = MockContextProvider { state };
let schema = context.get_table_source(table)?.schema();
let df_schema = DFSchema::try_from(schema.as_ref().clone())?;
let df_schema = DFSchema::try_from(schema)?;
let sql_to_rel = SqlToRel::new(&context);
let expr =
sql_to_rel.sql_to_expr(sql_expr, &df_schema, &mut PlannerContext::new())?;
Expand Down