Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser;
use datafusion::sql::sqlparser::dialect::dialect_from_str;
use futures::future::join_all;
use futures::StreamExt;
use log::warn;
use object_store::Error::Generic;
Expand Down Expand Up @@ -415,14 +416,18 @@ async fn create_plan(
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
// To support custom formats, treat error as None
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
ctx,
&cmd.location,
&cmd.options,
format,
resolve_region,
)
.await?;
let register_futures = cmd.locations.iter().map(|location| {
register_object_store_and_config_extensions(
ctx,
location,
&cmd.options,
format.clone(),
resolve_region,
)
});
for result in join_all(register_futures).await {
result?;
}
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
Expand Down Expand Up @@ -524,14 +529,16 @@ mod tests {

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
&ctx,
&cmd.location,
&cmd.options,
format,
false,
)
.await?;
for location in &cmd.locations {
register_object_store_and_config_extensions(
&ctx,
&location,
&cmd.options,
format.clone(),
false,
)
.await?;
}
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ListingSchemaProvider {
&CreateExternalTable {
schema: Arc::new(DFSchema::empty()),
name,
location: table_url,
locations: vec![table_url],
file_type: self.format.clone(),
table_partition_cols: vec![],
if_not_exists: false,
Expand Down
10 changes: 9 additions & 1 deletion datafusion/catalog/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,16 @@ impl TableProviderFactory for StreamTableFactory {
state: &dyn Session,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let location = match cmd.locations.len() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's a better way to constrain the API for this function to a single file location without throwing a runtime error if multiple are provided.

1 => &cmd.locations[0],
_ => {
return config_err!(
"Stream table factory supports only a single table location"
)
}
};

let schema: SchemaRef = Arc::clone(cmd.schema.inner());
let location = cmd.location.clone();
let encoding = cmd.file_type.parse()?;
let header = if let Ok(opt) = cmd
.options
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1735,12 +1735,12 @@ mod tests {
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![[PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}].into(),
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}].into(),
])
),
// ok with two columns, different options
Expand All @@ -1751,11 +1751,11 @@ mod tests {
]],
Ok(vec![[
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),
.asc()
.nulls_last(),
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
.desc()
.nulls_first()
].into(),
])
),
Expand Down
Loading
Loading