From 80fe933f082119e3fcc5f561b2aee6717079c370 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 19 Sep 2025 17:05:43 +0200 Subject: [PATCH 1/2] feat(cli): support external tables on multiple locations --- datafusion-cli/src/exec.rs | 39 +- datafusion/catalog/src/listing_schema.rs | 2 +- datafusion/catalog/src/stream.rs | 6 +- .../core/src/datasource/listing/table.rs | 20 +- .../src/datasource/listing_table_factory.rs | 358 ++++++++++++------ datafusion/core/src/test_util/mod.rs | 6 +- datafusion/expr/src/logical_plan/ddl.rs | 10 +- datafusion/proto/src/logical_plan/mod.rs | 4 +- datafusion/sql/src/statement.rs | 2 +- parquet-testing | 2 +- testing | 2 +- 11 files changed, 285 insertions(+), 166 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index d079a88a6440..e76e593d5ff8 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -39,6 +39,7 @@ use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; use datafusion::sql::sqlparser; use datafusion::sql::sqlparser::dialect::dialect_from_str; +use futures::future::join_all; use futures::StreamExt; use log::warn; use object_store::Error::Generic; @@ -415,14 +416,18 @@ async fn create_plan( if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { // To support custom formats, treat error as None let format = config_file_type_from_str(&cmd.file_type); - register_object_store_and_config_extensions( - ctx, - &cmd.location, - &cmd.options, - format, - resolve_region, - ) - .await?; + let register_futures = cmd.locations.iter().map(|location| { + register_object_store_and_config_extensions( + ctx, + location, + &cmd.options, + format.clone(), + resolve_region, + ) + }); + for result in join_all(register_futures).await { + result?; + } } if let LogicalPlan::Copy(copy_to) = &mut plan { @@ -524,14 +529,16 @@ mod tests { if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { let format = config_file_type_from_str(&cmd.file_type); - register_object_store_and_config_extensions( - &ctx, - &cmd.location, - &cmd.options, - format, - false, - ) - .await?; + for location in &cmd.locations { + register_object_store_and_config_extensions( + &ctx, + &location, + &cmd.options, + format.clone(), + false, + ) + .await?; + } } else { return plan_err!("LogicalPlan is not a CreateExternalTable"); } diff --git a/datafusion/catalog/src/listing_schema.rs b/datafusion/catalog/src/listing_schema.rs index 7e19c1ecaab0..443e08e149c3 100644 --- a/datafusion/catalog/src/listing_schema.rs +++ b/datafusion/catalog/src/listing_schema.rs @@ -132,7 +132,7 @@ impl ListingSchemaProvider { &CreateExternalTable { schema: Arc::new(DFSchema::empty()), name, - location: table_url, + locations: vec![table_url], file_type: self.format.clone(), table_partition_cols: vec![], if_not_exists: false, diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 2d66ff4628b9..52e73bfb7b73 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -53,8 +53,12 @@ impl TableProviderFactory for StreamTableFactory { state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { + let location = match cmd.locations.len() { + 1 => &cmd.locations[0], + _ => return config_err!("Stream table factory supports only a single table location"), + }; + let schema: SchemaRef = Arc::clone(cmd.schema.inner()); - let location = cmd.location.clone(); let encoding = cmd.file_type.parse()?; let header = if let Ok(opt) = cmd .options diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 18d84c4ba0c2..52dc5f9f2db6 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1735,12 +1735,12 @@ mod tests { ( vec![vec![col("string_col").sort(true, false)]], Ok(vec![[PhysicalSortExpr { - expr: physical_col("string_col", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: false, - }, - }].into(), + expr: physical_col("string_col", &schema).unwrap(), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }].into(), ]) ), // ok with two columns, different options @@ -1751,11 +1751,11 @@ mod tests { ]], Ok(vec![[ PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap()) - .asc() - .nulls_last(), + .asc() + .nulls_last(), PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap()) - .desc() - .nulls_first() + .desc() + .nulls_first() ].into(), ]) ), diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index f98297d0e3f7..ca5aa28cc560 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -17,23 +17,27 @@ //! Factory for creating ListingTables with default options -use std::collections::HashSet; -use std::path::Path; -use std::sync::Arc; - use crate::catalog::{TableProvider, TableProviderFactory}; use crate::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use crate::execution::context::SessionState; +use std::collections::HashSet; +use std::path::Path; +use std::sync::Arc; use arrow::datatypes::DataType; use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema}; +use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{config_datafusion_err, Result}; +use datafusion_common::{plan_err, DFSchema, DataFusionError, ToDFSchema}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; use datafusion_catalog::Session; +use datafusion_expr::expr::Sort; + +use futures::future::join_all; /// A `TableProviderFactory` capable of creating new `ListingTable`s #[derive(Debug, Default)] @@ -63,137 +67,190 @@ impl TableProviderFactory for ListingTableFactory { ))? .create(session_state, &cmd.options)?; - let mut table_path = ListingTableUrl::parse(&cmd.location)?; - let file_extension = match table_path.is_collection() { - // Setting the extension to be empty instead of allowing the default extension seems - // odd, but was done to ensure existing behavior isn't modified. It seems like this - // could be refactored to either use the default extension or set the fully expected - // extension when compression is included (e.g. ".csv.gz") - true => "", - false => &get_extension(cmd.location.as_str()), + let file_extension = match cmd.locations.len() { + 1 => { + let table_path = ListingTableUrl::parse(&cmd.locations[0])?; + match table_path.is_collection() { + // Setting the extension to be empty instead of allowing the default extension seems + // odd, but was done to ensure existing behavior isn't modified. It seems like this + // could be refactored to either use the default extension or set the fully expected + // extension when compression is included (e.g. ".csv.gz"). + // We do the same if there are multiple locations provided for the table. + true => "", + false => &get_extension(cmd.locations[0].as_str()), + } + } + _ => "", }; + let mut options = ListingOptions::new(file_format) .with_session_config_options(session_state.config()) .with_file_extension(file_extension); - let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() { - let infer_parts = session_state - .config_options() - .execution - .listing_table_factory_infer_partitions; - let part_cols = if cmd.table_partition_cols.is_empty() && infer_parts { - options - .infer_partitions(session_state, &table_path) - .await? - .into_iter() - } else { - cmd.table_partition_cols.clone().into_iter() - }; - - ( - None, - part_cols - .map(|p| { - ( - p, - DataType::Dictionary( - Box::new(DataType::UInt16), - Box::new(DataType::Utf8), - ), - ) - }) - .collect::>(), + let table_paths: Vec = cmd + .locations + .iter() + .map(|loc| ListingTableUrl::parse(loc)) + .collect::>>()?; + + // We use the first location to infer the partition columns, + // primarily for performance and simplicity reasons. + let partition_columns = infer_partition_columns( + &options, + session_state, + &table_paths[0], + &cmd.table_partition_cols, + ) + .await?; + + let infer_schemas = table_paths.into_iter().map(|listing_url| { + infer_schema( + &options, + &session_state, + listing_url, + &partition_columns, + &cmd.order_exprs, + &cmd.schema, + &cmd.file_type, ) - } else { - let schema = Arc::clone(cmd.schema.inner()); - let table_partition_cols = cmd - .table_partition_cols - .iter() - .map(|col| { - schema - .field_with_name(col) - .map_err(|e| arrow_datafusion_err!(e)) - }) - .collect::>>()? - .into_iter() - .map(|f| (f.name().to_owned(), f.data_type().to_owned())) - .collect(); - // exclude partition columns to support creating partitioned external table - // with a specified column definition like - // `create external table a(c0 int, c1 int) stored as csv partitioned by (c1)...` - let mut project_idx = Vec::new(); - for i in 0..schema.fields().len() { - if !cmd.table_partition_cols.contains(schema.field(i).name()) { - project_idx.push(i); - } - } - let schema = Arc::new(schema.project(&project_idx)?); - (Some(schema), table_partition_cols) - }; - - options = options.with_table_partition_cols(table_partition_cols); - - options - .validate_partitions(session_state, &table_path) - .await?; - - let resolved_schema = match provided_schema { - // We will need to check the table columns against the schema - // this is done so that we can do an ORDER BY for external table creation - // specifically for parquet file format. - // See: https://github.com/apache/datafusion/issues/7317 - None => { - // if the folder then rewrite a file path as 'path/*.parquet' - // to only read the files the reader can understand - if table_path.is_folder() && table_path.get_glob().is_none() { - // Since there are no files yet to infer an actual extension, - // derive the pattern based on compression type. - // So for gzipped CSV the pattern is `*.csv.gz` - let glob = match options.format.compression_type() { - Some(compression) => { - match options.format.get_ext_with_compression(&compression) { - // Use glob based on `FileFormat` extension - Ok(ext) => format!("*.{ext}"), - // Fallback to `file_type`, if not supported by `FileFormat` - Err(_) => format!("*.{}", cmd.file_type.to_lowercase()), - } - } - None => format!("*.{}", cmd.file_type.to_lowercase()), - }; - table_path = table_path.with_glob(glob.as_ref())?; - } - let schema = options.infer_schema(session_state, &table_path).await?; - let df_schema = Arc::clone(&schema).to_dfschema()?; - let column_refs: HashSet<_> = cmd - .order_exprs - .iter() - .flat_map(|sort| sort.iter()) - .flat_map(|s| s.expr.column_refs()) - .collect(); - - for column in &column_refs { - if !df_schema.has_column(column) { - return plan_err!("Column {column} is not in schema"); - } - } - - schema - } - Some(s) => s, - }; - let config = ListingTableConfig::new(table_path) + }); + let results = join_all(infer_schemas).await; + + let mut merged_schema = DFSchema::empty(); + let mut listing_urls = Vec::new(); + for result in results { + let (resolved_table_path, resolved_schema) = result?; + listing_urls.push(resolved_table_path); + merged_schema.merge(&resolved_schema.to_dfschema()?); + } + + options = options.with_table_partition_cols(partition_columns); + let config = ListingTableConfig::new_with_multi_paths(listing_urls) .with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone())) - .with_schema(resolved_schema); + .with_schema(merged_schema.inner().to_owned()); + let provider = ListingTable::try_new(config)? .with_cache(state.runtime_env().cache_manager.get_file_statistic_cache()); + let table = provider .with_definition(cmd.definition.clone()) .with_constraints(cmd.constraints.clone()) .with_column_defaults(cmd.column_defaults.clone()); + Ok(Arc::new(table)) } } +async fn infer_schema( + options: &ListingOptions, + session_state: &SessionState, + mut table_path: ListingTableUrl, + partition_cols: &Vec<(String, DataType)>, + order_exprs: &Vec>, + schema: &DFSchema, + file_type: &String, +) -> Result<(ListingTableUrl, SchemaRef), DataFusionError> { + let provided_schema = if schema.fields().len() == 0 { + None + } else { + let schema = Arc::clone(schema.inner()); + let partitions_cols_set: HashSet<&String> = + HashSet::from_iter(partition_cols.iter().map(|(k, _)| k)); + // exclude partition columns to support creating a partitioned external table + // with a specified column definition like + // `create external table a(c0 int, c1 int) stored as csv partitioned by (c1)...` + let mut project_idx = Vec::new(); + for i in 0..schema.fields().len() { + if !partitions_cols_set.contains(schema.field(i).name()) { + project_idx.push(i); + } + } + let schema = Arc::new(schema.project(&project_idx)?); + Some(schema) + }; + + options + .validate_partitions(session_state, &table_path) + .await?; + + let resolved_schema = match provided_schema { + // We will need to check the table columns against the schema + // this is done so that we can do an ORDER BY for external table creation + // specifically for parquet file format. + // See: https://github.com/apache/datafusion/issues/7317 + None => { + // if the folder then rewrite a file path as 'path/*.parquet' + // to only read the files the reader can understand + if table_path.is_folder() && table_path.get_glob().is_none() { + // Since there are no files yet to infer an actual extension, + // derive the pattern based on compression type. + // So for gzipped CSV the pattern is `*.csv.gz` + let glob = match options.format.compression_type() { + Some(compression) => { + match options.format.get_ext_with_compression(&compression) { + // Use glob based on `FileFormat` extension + Ok(ext) => format!("*.{ext}"), + // Fallback to `file_type`, if not supported by `FileFormat` + Err(_) => format!("*.{}", file_type.to_lowercase()), + } + } + None => format!("*.{}", file_type.to_lowercase()), + }; + table_path = table_path.with_glob(&glob)?; + } + let schema = options.infer_schema(session_state, &table_path).await?; + let df_schema = Arc::clone(&schema).to_dfschema()?; + let column_refs: HashSet<_> = order_exprs + .iter() + .flat_map(|sort| sort.iter()) + .flat_map(|s| s.expr.column_refs()) + .collect(); + + for column in &column_refs { + if !df_schema.has_column(column) { + return plan_err!("Column {column} is not in schema"); + } + } + + schema + } + Some(s) => s, + }; + Ok((table_path, resolved_schema)) +} + +async fn infer_partition_columns( + options: &ListingOptions, + session_state: &SessionState, + table_path: &ListingTableUrl, + provided_cols: &Vec, +) -> Result, DataFusionError> { + let infer_parts = session_state + .config_options() + .execution + .listing_table_factory_infer_partitions; + let part_cols = if provided_cols.is_empty() && infer_parts { + options + .infer_partitions(session_state, &table_path) + .await? + .into_iter() + } else { + provided_cols.clone().into_iter() + }; + + Ok(part_cols + .map(|p| { + ( + p, + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), + ), + ) + }) + .collect::>()) +} + // Get file extension from path fn get_extension(path: &str) -> String { let res = Path::new(path).extension().and_then(|ext| ext.to_str()); @@ -233,7 +290,7 @@ mod tests { let name = TableReference::bare("foo"); let cmd = CreateExternalTable { name, - location: csv_file.path().to_str().unwrap().to_string(), + locations: vec![csv_file.path().to_str().unwrap().to_string()], file_type: "csv".to_string(), schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], @@ -274,7 +331,7 @@ mod tests { options.insert("format.has_header".into(), "true".into()); let cmd = CreateExternalTable { name, - location: csv_file.path().to_str().unwrap().to_string(), + locations: vec![csv_file.path().to_str().unwrap().to_string()], file_type: "csv".to_string(), schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], @@ -319,7 +376,7 @@ mod tests { options.insert("format.compression".into(), "gzip".into()); let cmd = CreateExternalTable { name, - location: dir.path().to_str().unwrap().to_string(), + locations: vec![dir.path().to_str().unwrap().to_string()], file_type: "csv".to_string(), schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], @@ -371,7 +428,7 @@ mod tests { options.insert("format.has_header".into(), "true".into()); let cmd = CreateExternalTable { name, - location: dir.path().to_str().unwrap().to_string(), + locations: vec![dir.path().to_str().unwrap().to_string()], file_type: "csv".to_string(), schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], @@ -415,7 +472,7 @@ mod tests { let cmd = CreateExternalTable { name, - location: String::from(path.to_str().unwrap()), + locations: vec![String::from(path.to_str().unwrap())], file_type: "parquet".to_string(), schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], @@ -455,7 +512,7 @@ mod tests { let cmd = CreateExternalTable { name, - location: dir.path().to_str().unwrap().to_string(), + locations: vec![dir.path().to_str().unwrap().to_string()], file_type: "parquet".to_string(), schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], @@ -496,7 +553,7 @@ mod tests { let cmd = CreateExternalTable { name, - location: dir.path().to_str().unwrap().to_string(), + locations: vec![dir.path().to_str().unwrap().to_string()], file_type: "parquet".to_string(), schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], @@ -519,4 +576,55 @@ mod tests { let listing_options = listing_table.options(); assert!(listing_options.table_partition_cols.is_empty()); } + + #[tokio::test] + async fn test_create_using_multiple_locations() { + let dir = tempfile::tempdir().unwrap(); + let mut path = PathBuf::from(dir.path()); + path.push("folder-1"); + path.push("folder-2"); + fs::create_dir_all(&path).unwrap(); + + let factory = ListingTableFactory::new(); + let context = SessionContext::new(); + let state = context.state(); + let name = TableReference::bare("foo"); + + let options = HashMap::new(); + let cmd = CreateExternalTable { + name, + locations: vec![ + dir.path().to_str().unwrap().to_string() + "/folder-1", + dir.path().to_str().unwrap().to_string() + "/folder-2", + ], + file_type: "csv".to_string(), + schema: Arc::new(DFSchema::empty()), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options, + constraints: Constraints::default(), + column_defaults: HashMap::new(), + }; + let table_provider = factory.create(&state, &cmd).await.unwrap(); + let listing_table = table_provider + .as_any() + .downcast_ref::() + .unwrap(); + + let listing_options = listing_table.options(); + assert_eq!("", listing_options.file_extension); + assert_eq!(2, listing_table.table_paths().len()); + + // Glob pattern is set to search for gzipped files + let table_path = listing_table.table_paths().first().unwrap(); + assert_eq!( + table_path.get_glob().clone().unwrap(), + Pattern::new("*.csv").unwrap() + ); + } } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 7149c5b0bd8c..36a64c33dde1 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -184,8 +184,8 @@ impl TableProviderFactory for TestTableFactory { cmd: &CreateExternalTable, ) -> Result> { Ok(Arc::new(TestTableProvider { - url: cmd.location.to_string(), - schema: Arc::clone(cmd.schema.inner()), + url: cmd.locations, + schema: Arc::new(cmd.schema.as_ref().into()), })) } } @@ -194,7 +194,7 @@ impl TableProviderFactory for TestTableFactory { #[derive(Debug)] pub struct TestTableProvider { /// URL of table files or folder - pub url: String, + pub url: Vec, /// test table schema pub schema: SchemaRef, } diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 57e7d41cbaf9..1ea6b3278a46 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -209,7 +209,7 @@ pub struct CreateExternalTable { /// The table name pub name: TableReference, /// The physical location - pub location: String, + pub locations: Vec, /// The file type of physical file pub file_type: String, /// Partition Columns @@ -239,7 +239,7 @@ impl Hash for CreateExternalTable { fn hash(&self, state: &mut H) { self.schema.hash(state); self.name.hash(state); - self.location.hash(state); + self.locations.hash(state); self.file_type.hash(state); self.table_partition_cols.hash(state); self.if_not_exists.hash(state); @@ -259,7 +259,7 @@ impl PartialOrd for CreateExternalTable { /// The table name pub name: &'a TableReference, /// The physical location - pub location: &'a String, + pub locations: &'a Vec, /// The file type of physical file pub file_type: &'a String, /// Partition Columns @@ -277,7 +277,7 @@ impl PartialOrd for CreateExternalTable { } let comparable_self = ComparableCreateExternalTable { name: &self.name, - location: &self.location, + locations: &self.locations, file_type: &self.file_type, table_partition_cols: &self.table_partition_cols, if_not_exists: &self.if_not_exists, @@ -288,7 +288,7 @@ impl PartialOrd for CreateExternalTable { }; let comparable_other = ComparableCreateExternalTable { name: &other.name, - location: &other.location, + locations: &other.locations, file_type: &other.file_type, table_partition_cols: &other.table_partition_cols, if_not_exists: &other.if_not_exists, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 6687cc31a3b0..d709a9389aee 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -625,7 +625,7 @@ impl AsLogicalPlan for LogicalPlanNode { create_extern_table.name.as_ref(), "CreateExternalTable", )?, - location: create_extern_table.location.clone(), + locations: create_extern_table.location.clone(), file_type: create_extern_table.file_type.clone(), table_partition_cols: create_extern_table .table_partition_cols @@ -1465,7 +1465,7 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Ddl(DdlStatement::CreateExternalTable( CreateExternalTable { name, - location, + locations: location, file_type, schema: df_schema, table_partition_cols, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 44e924614208..6d4cdce37e58 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1548,7 +1548,7 @@ impl SqlToRel<'_, S> { PlanCreateExternalTable { schema: df_schema, name, - location, + locations: location.split(",").map(|x| x.to_string()).collect(), file_type, table_partition_cols, if_not_exists, diff --git a/parquet-testing b/parquet-testing index 107b36603e05..a3d96a65e11e 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 +Subproject commit a3d96a65e11e2bbca7d22a894e8313ede90a33a3 diff --git a/testing b/testing index 0d60ccae40d0..0b1aad061a66 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 0d60ccae40d0e8f2d22c15fafb01c5d4be8c63a6 +Subproject commit 0b1aad061a6608cf98a54b3c2e4c199f1974886e From 599dc316b966418c568e368f968b8ca27acdaac0 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 24 Sep 2025 16:42:32 -0400 Subject: [PATCH 2/2] Fix fmt --- datafusion/catalog/src/stream.rs | 6 +++++- datafusion/core/src/datasource/listing_table_factory.rs | 2 -- datafusion/core/src/test_util/mod.rs | 4 ++-- datafusion/proto/src/logical_plan/mod.rs | 4 ++-- datafusion/proto/tests/cases/roundtrip_logical_plan.rs | 4 ++-- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 52e73bfb7b73..3489cd2e153b 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -55,7 +55,11 @@ impl TableProviderFactory for StreamTableFactory { ) -> Result> { let location = match cmd.locations.len() { 1 => &cmd.locations[0], - _ => return config_err!("Stream table factory supports only a single table location"), + _ => { + return config_err!( + "Stream table factory supports only a single table location" + ) + } }; let schema: SchemaRef = Arc::clone(cmd.schema.inner()); diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index ca5aa28cc560..4580a3cc764d 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -26,8 +26,6 @@ use std::collections::HashSet; use std::path::Path; use std::sync::Arc; -use arrow::datatypes::DataType; -use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema}; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{config_datafusion_err, Result}; use datafusion_common::{plan_err, DFSchema, DataFusionError, ToDFSchema}; diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 36a64c33dde1..ae834cae100b 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -184,8 +184,8 @@ impl TableProviderFactory for TestTableFactory { cmd: &CreateExternalTable, ) -> Result> { Ok(Arc::new(TestTableProvider { - url: cmd.locations, - schema: Arc::new(cmd.schema.as_ref().into()), + url: cmd.locations.clone(), + schema: Arc::clone(cmd.schema.inner()), })) } } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index d709a9389aee..c110028f9418 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -625,7 +625,7 @@ impl AsLogicalPlan for LogicalPlanNode { create_extern_table.name.as_ref(), "CreateExternalTable", )?, - locations: create_extern_table.location.clone(), + locations: vec![create_extern_table.location.clone()], file_type: create_extern_table.file_type.clone(), table_partition_cols: create_extern_table .table_partition_cols @@ -1499,7 +1499,7 @@ impl AsLogicalPlan for LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::CreateExternalTable( protobuf::CreateExternalTableNode { name: Some(name.clone().into()), - location: location.clone(), + location: location.clone().join(","), file_type: file_type.clone(), schema: Some(df_schema.try_into()?), table_partition_cols: table_partition_cols.clone(), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index d0f25a85f798..a48b8f863352 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -196,7 +196,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec { })?; assert_eq!(msg.table_name, table_ref.to_string()); let provider = TestTableProvider { - url: msg.url, + url: vec![msg.url], schema, }; Ok(Arc::new(provider)) @@ -214,7 +214,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec { .downcast_ref::() .expect("Can't encode non-test tables"); let msg = TestTableProto { - url: table.url.clone(), + url: table.url.join(","), table_name: table_ref.to_string(), }; msg.encode(buf).map_err(|_| {