diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index adef02c38d73..df6e975b1422 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1098,7 +1098,9 @@ impl ListingTable { ) })) .await?; - let file_list = stream::iter(file_list).flatten(); + let meta_fetch_concurrency = + ctx.config_options().execution.meta_fetch_concurrency; + let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); // collect the statistics if required by the config let files = file_list .map(|part_file| async { @@ -1115,7 +1117,7 @@ impl ListingTable { } }) .boxed() - .buffered(ctx.config_options().execution.meta_fetch_concurrency); + .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); let (files, statistics) = get_statistics_with_limit( files, @@ -1195,7 +1197,9 @@ mod tests { use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::ExecutionPlanProperties; + use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state}; use tempfile::TempDir; + use url::Url; #[tokio::test] async fn read_single_file() -> Result<()> { @@ -1584,6 +1588,81 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_assert_list_files_for_exact_paths() -> Result<()> { + // more expected partitions than files + assert_list_files_for_exact_paths( + &[ + "bucket/key1/file0", + "bucket/key1/file1", + "bucket/key1/file2", + "bucket/key2/file3", + "bucket/key2/file4", + ], + 12, + 5, + Some(""), + ) + .await?; + + // more files than meta_fetch_concurrency (32) + let files: Vec = + (0..64).map(|i| format!("bucket/key1/file{}", i)).collect(); + // Collect references to each string + let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + assert_list_files_for_exact_paths(file_refs.as_slice(), 5, 5, Some("")).await?; + + // as many expected partitions as files + assert_list_files_for_exact_paths( + &[ + "bucket/key1/file0", + "bucket/key1/file1", + "bucket/key1/file2", + "bucket/key2/file3", + "bucket/key2/file4", + ], + 5, + 5, + Some(""), + ) + .await?; + + // more files as expected partitions + assert_list_files_for_exact_paths( + &[ + "bucket/key1/file0", + "bucket/key1/file1", + "bucket/key1/file2", + "bucket/key2/file3", + "bucket/key2/file4", + ], + 2, + 2, + Some(""), + ) + .await?; + + // no files => no groups + assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?; + + // files that don't match the default file ext + assert_list_files_for_exact_paths( + &[ + "bucket/key1/file0.avro", + "bucket/key1/file1.csv", + "bucket/key1/file2.avro", + "bucket/key2/file3.csv", + "bucket/key2/file4.avro", + "bucket/key3/file5.csv", + ], + 2, + 2, + None, + ) + .await?; + Ok(()) + } + async fn load_table( ctx: &SessionContext, name: &str, @@ -1670,6 +1749,56 @@ mod tests { Ok(()) } + /// Check that the files listed by the table match the specified `output_partitioning` + /// when the object store contains `files`, and validate that file metadata is fetched + /// concurrently + async fn assert_list_files_for_exact_paths( + files: &[&str], + target_partitions: usize, + output_partitioning: usize, + file_ext: Option<&str>, + ) -> Result<()> { + let ctx = SessionContext::new(); + let (store, _) = make_test_store_and_state( + &files.iter().map(|f| (*f, 10)).collect::>(), + ); + + let meta_fetch_concurrency = ctx + .state() + .config_options() + .execution + .meta_fetch_concurrency; + let expected_concurrency = files.len().min(meta_fetch_concurrency); + let head_blocking_store = ensure_head_concurrency(store, expected_concurrency); + + let url = Url::parse("test://").unwrap(); + ctx.register_object_store(&url, head_blocking_store.clone()); + + let format = AvroFormat {}; + + let opt = ListingOptions::new(Arc::new(format)) + .with_file_extension_opt(file_ext) + .with_target_partitions(target_partitions); + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + + let table_paths = files + .iter() + .map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap()) + .collect(); + let config = ListingTableConfig::new_with_multi_paths(table_paths) + .with_listing_options(opt) + .with_schema(Arc::new(schema)); + + let table = ListingTable::try_new(config)?; + + let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + + assert_eq!(file_list.len(), output_partitioning); + + Ok(()) + } + #[tokio::test] async fn test_insert_into_append_new_json_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index cac430c5b49d..e1328770cabd 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -20,12 +20,22 @@ use crate::execution::context::SessionState; use crate::execution::session_state::SessionStateBuilder; use crate::prelude::SessionContext; +use futures::stream::BoxStream; use futures::FutureExt; -use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; +use object_store::{ + memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, + PutResult, +}; +use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; +use tokio::{ + sync::Barrier, + time::{timeout, Duration}, +}; use url::Url; -/// Returns a test object store with the provided `ctx` +/// Registers a test object store with the provided `ctx` pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) { let url = Url::parse("test://").unwrap(); ctx.register_object_store(&url, make_test_store_and_state(files).0); @@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta version: None, } } + +/// Blocks the object_store `head` call until `concurrency` number of calls are pending. +pub fn ensure_head_concurrency( + object_store: Arc, + concurrency: usize, +) -> Arc { + Arc::new(BlockingObjectStore::new(object_store, concurrency)) +} + +/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached. +#[derive(Debug)] +struct BlockingObjectStore { + inner: Arc, + barrier: Arc, +} + +impl BlockingObjectStore { + const NAME: &'static str = "BlockingObjectStore"; + fn new(inner: Arc, expected_concurrency: usize) -> Self { + Self { + inner, + barrier: Arc::new(Barrier::new(expected_concurrency)), + } + } +} + +impl Display for BlockingObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.inner, f) + } +} + +/// All trait methods are forwarded to the inner object store, except for +/// the `head` method which waits until the expected number of concurrent calls is reached. +#[async_trait::async_trait] +impl ObjectStore for BlockingObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + self.inner.get_opts(location, options).await + } + + async fn head(&self, location: &Path) -> object_store::Result { + println!( + "{} received head call for {location}", + BlockingObjectStore::NAME + ); + // Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests. + let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await; + match wait_result { + Ok(_) => println!( + "{} barrier reached for {location}", + BlockingObjectStore::NAME + ), + Err(_) => { + let error_message = format!( + "{} barrier wait timed out for {location}", + BlockingObjectStore::NAME + ); + log::error!("{}", error_message); + return Err(Error::Generic { + store: BlockingObjectStore::NAME, + source: error_message.into(), + }); + } + } + // Forward the call to the inner object store. + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'_, object_store::Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +}