Skip to content

Fix sequential metadata fetching in ListingTable causing high latency #14918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 131 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,9 @@ impl ListingTable {
)
}))
.await?;
let file_list = stream::iter(file_list).flatten();
let meta_fetch_concurrency =
ctx.config_options().execution.meta_fetch_concurrency;
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix, the rest of the changes are unit tests.

// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
Expand All @@ -1115,7 +1117,7 @@ impl ListingTable {
}
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordering does not matter here, so might as well use buffer_unordered to avoid stalling on an outlier.


let (files, statistics) = get_statistics_with_limit(
files,
Expand Down Expand Up @@ -1195,7 +1197,9 @@ mod tests {
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::ExecutionPlanProperties;

use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
use tempfile::TempDir;
use url::Url;

#[tokio::test]
async fn read_single_file() -> Result<()> {
Expand Down Expand Up @@ -1584,6 +1588,81 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_assert_list_files_for_exact_paths() -> Result<()> {
// more expected partitions than files
assert_list_files_for_exact_paths(
&[
"bucket/key1/file0",
"bucket/key1/file1",
"bucket/key1/file2",
"bucket/key2/file3",
"bucket/key2/file4",
],
12,
5,
Some(""),
)
.await?;

// more files than meta_fetch_concurrency (32)
let files: Vec<String> =
(0..64).map(|i| format!("bucket/key1/file{}", i)).collect();
// Collect references to each string
let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
assert_list_files_for_exact_paths(file_refs.as_slice(), 5, 5, Some("")).await?;

// as many expected partitions as files
assert_list_files_for_exact_paths(
&[
"bucket/key1/file0",
"bucket/key1/file1",
"bucket/key1/file2",
"bucket/key2/file3",
"bucket/key2/file4",
],
5,
5,
Some(""),
)
.await?;

// more files as expected partitions
assert_list_files_for_exact_paths(
&[
"bucket/key1/file0",
"bucket/key1/file1",
"bucket/key1/file2",
"bucket/key2/file3",
"bucket/key2/file4",
],
2,
2,
Some(""),
)
.await?;

// no files => no groups
assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?;

// files that don't match the default file ext
assert_list_files_for_exact_paths(
&[
"bucket/key1/file0.avro",
"bucket/key1/file1.csv",
"bucket/key1/file2.avro",
"bucket/key2/file3.csv",
"bucket/key2/file4.avro",
"bucket/key3/file5.csv",
],
2,
2,
None,
)
.await?;
Ok(())
}

async fn load_table(
ctx: &SessionContext,
name: &str,
Expand Down Expand Up @@ -1670,6 +1749,56 @@ mod tests {
Ok(())
}

/// Check that the files listed by the table match the specified `output_partitioning`
/// when the object store contains `files`, and validate that file metadata is fetched
/// concurrently
async fn assert_list_files_for_exact_paths(
files: &[&str],
target_partitions: usize,
output_partitioning: usize,
file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
let (store, _) = make_test_store_and_state(
&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
);

let meta_fetch_concurrency = ctx
.state()
.config_options()
.execution
.meta_fetch_concurrency;
let expected_concurrency = files.len().min(meta_fetch_concurrency);
let head_blocking_store = ensure_head_concurrency(store, expected_concurrency);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool


let url = Url::parse("test://").unwrap();
ctx.register_object_store(&url, head_blocking_store.clone());

let format = AvroFormat {};

let opt = ListingOptions::new(Arc::new(format))
.with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);

let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);

let table_paths = files
.iter()
.map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap())
.collect();
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(opt)
.with_schema(Arc::new(schema));

let table = ListingTable::try_new(config)?;

let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;

assert_eq!(file_list.len(), output_partitioning);

Ok(())
}

#[tokio::test]
async fn test_insert_into_append_new_json_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
Expand Down
132 changes: 130 additions & 2 deletions datafusion/core/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@
use crate::execution::context::SessionState;
use crate::execution::session_state::SessionStateBuilder;
use crate::prelude::SessionContext;
use futures::stream::BoxStream;
use futures::FutureExt;
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
use object_store::{
memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult,
MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
PutResult,
};
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;
use tokio::{
sync::Barrier,
time::{timeout, Duration},
};
use url::Url;

/// Returns a test object store with the provided `ctx`
/// Registers a test object store with the provided `ctx`
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
let url = Url::parse("test://").unwrap();
ctx.register_object_store(&url, make_test_store_and_state(files).0);
Expand Down Expand Up @@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
version: None,
}
}

/// Blocks the object_store `head` call until `concurrency` number of calls are pending.
pub fn ensure_head_concurrency(
object_store: Arc<dyn ObjectStore>,
concurrency: usize,
) -> Arc<dyn ObjectStore> {
Arc::new(BlockingObjectStore::new(object_store, concurrency))
}

/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached.
#[derive(Debug)]
struct BlockingObjectStore {
inner: Arc<dyn ObjectStore>,
barrier: Arc<Barrier>,
}

impl BlockingObjectStore {
const NAME: &'static str = "BlockingObjectStore";
fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
Self {
inner,
barrier: Arc::new(Barrier::new(expected_concurrency)),
}
}
}

impl Display for BlockingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.inner, f)
}
}

/// All trait methods are forwarded to the inner object store, except for
/// the `head` method which waits until the expected number of concurrent calls is reached.
#[async_trait::async_trait]
impl ObjectStore for BlockingObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}

async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.inner.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
println!(
"{} received head call for {location}",
BlockingObjectStore::NAME
);
// Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
match wait_result {
Ok(_) => println!(
"{} barrier reached for {location}",
BlockingObjectStore::NAME
),
Err(_) => {
let error_message = format!(
"{} barrier wait timed out for {location}",
BlockingObjectStore::NAME
);
log::error!("{}", error_message);
return Err(Error::Generic {
store: BlockingObjectStore::NAME,
source: error_message.into(),
});
}
}
// Forward the call to the inner object store.
self.inner.head(location).await
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner.delete(location).await
}

fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list(prefix)
}

async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}

async fn copy_if_not_exists(
&self,
from: &Path,
to: &Path,
) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
}