Skip to content

Commit 5d3bb48

Browse files
Fix sequential metadata fetching in ListingTable causing high latency
When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan. This commit replaces the sequential flattening with concurrent merging using `stream::iter(file_list).flatten_unordered(meta_fetch_concurrency)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan. Note that the ordering loss introduced by `flatten_unordered` is perfectly acceptable as the file list will anyways be fully sorted by path in `split_files` before being returned. Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.
1 parent fc2fbb3 commit 5d3bb48

File tree

2 files changed

+260
-4
lines changed

2 files changed

+260
-4
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,9 @@ impl ListingTable {
10981098
)
10991099
}))
11001100
.await?;
1101-
let file_list = stream::iter(file_list).flatten();
1101+
let meta_fetch_concurrency =
1102+
ctx.config_options().execution.meta_fetch_concurrency;
1103+
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
11021104
// collect the statistics if required by the config
11031105
let files = file_list
11041106
.map(|part_file| async {
@@ -1115,7 +1117,7 @@ impl ListingTable {
11151117
}
11161118
})
11171119
.boxed()
1118-
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
1120+
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11191121

11201122
let (files, statistics) = get_statistics_with_limit(
11211123
files,
@@ -1195,7 +1197,9 @@ mod tests {
11951197
use datafusion_physical_expr::PhysicalSortExpr;
11961198
use datafusion_physical_plan::ExecutionPlanProperties;
11971199

1200+
use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
11981201
use tempfile::TempDir;
1202+
use url::Url;
11991203

12001204
#[tokio::test]
12011205
async fn read_single_file() -> Result<()> {
@@ -1584,6 +1588,80 @@ mod tests {
15841588
Ok(())
15851589
}
15861590

1591+
#[tokio::test]
1592+
async fn test_assert_list_files_for_exact_paths() -> Result<()> {
1593+
// more expected partitions than files
1594+
assert_list_files_for_exact_paths(
1595+
&[
1596+
"bucket/key1/file0",
1597+
"bucket/key1/file1",
1598+
"bucket/key1/file2",
1599+
"bucket/key2/file3",
1600+
"bucket/key2/file4",
1601+
],
1602+
12,
1603+
5,
1604+
Some(""),
1605+
)
1606+
.await?;
1607+
1608+
// more files than meta_fetch_concurrency (32)
1609+
let files = (0..64)
1610+
.map(|i| format!("bucket/key1/file{}", i).as_str())
1611+
.collect::<Vec<_>>();
1612+
assert_list_files_for_exact_paths(files.as_slice(), 12, 12, Some("")).await?;
1613+
1614+
// as many expected partitions as files
1615+
assert_list_files_for_exact_paths(
1616+
&[
1617+
"bucket/key1/file0",
1618+
"bucket/key1/file1",
1619+
"bucket/key1/file2",
1620+
"bucket/key2/file3",
1621+
"bucket/key2/file4",
1622+
],
1623+
5,
1624+
5,
1625+
Some(""),
1626+
)
1627+
.await?;
1628+
1629+
// more files as expected partitions
1630+
assert_list_files_for_exact_paths(
1631+
&[
1632+
"bucket/key1/file0",
1633+
"bucket/key1/file1",
1634+
"bucket/key1/file2",
1635+
"bucket/key2/file3",
1636+
"bucket/key2/file4",
1637+
],
1638+
2,
1639+
2,
1640+
Some(""),
1641+
)
1642+
.await?;
1643+
1644+
// no files => no groups
1645+
assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?;
1646+
1647+
// files that don't match the default file ext
1648+
assert_list_files_for_exact_paths(
1649+
&[
1650+
"bucket/key1/file0.avro",
1651+
"bucket/key1/file1.csv",
1652+
"bucket/key1/file2.avro",
1653+
"bucket/key2/file3.csv",
1654+
"bucket/key2/file4.avro",
1655+
"bucket/key3/file5.csv",
1656+
],
1657+
2,
1658+
2,
1659+
None,
1660+
)
1661+
.await?;
1662+
Ok(())
1663+
}
1664+
15871665
async fn load_table(
15881666
ctx: &SessionContext,
15891667
name: &str,
@@ -1670,6 +1748,56 @@ mod tests {
16701748
Ok(())
16711749
}
16721750

1751+
/// Check that the files listed by the table match the specified `output_partitioning`
1752+
/// when the object store contains `files`, and validate that file metadata is fetched
1753+
/// concurrently
1754+
async fn assert_list_files_for_exact_paths(
1755+
files: &[&str],
1756+
target_partitions: usize,
1757+
output_partitioning: usize,
1758+
file_ext: Option<&str>,
1759+
) -> Result<()> {
1760+
let ctx = SessionContext::new();
1761+
let (store, _) = make_test_store_and_state(
1762+
&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
1763+
);
1764+
1765+
let meta_fetch_concurrency = ctx
1766+
.state()
1767+
.config_options()
1768+
.execution
1769+
.meta_fetch_concurrency;
1770+
let expected_concurrency = files.len().min(meta_fetch_concurrency);
1771+
let head_blocking_store = ensure_head_concurrency(store, expected_concurrency);
1772+
1773+
let url = Url::parse("test://").unwrap();
1774+
ctx.register_object_store(&url, head_blocking_store.clone());
1775+
1776+
let format = AvroFormat {};
1777+
1778+
let opt = ListingOptions::new(Arc::new(format))
1779+
.with_file_extension_opt(file_ext)
1780+
.with_target_partitions(target_partitions);
1781+
1782+
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1783+
1784+
let table_paths = files
1785+
.iter()
1786+
.map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap())
1787+
.collect();
1788+
let config = ListingTableConfig::new_with_multi_paths(table_paths)
1789+
.with_listing_options(opt)
1790+
.with_schema(Arc::new(schema));
1791+
1792+
let table = ListingTable::try_new(config)?;
1793+
1794+
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1795+
1796+
assert_eq!(file_list.len(), output_partitioning);
1797+
1798+
Ok(())
1799+
}
1800+
16731801
#[tokio::test]
16741802
async fn test_insert_into_append_new_json_files() -> Result<()> {
16751803
let mut config_map: HashMap<String, String> = HashMap::new();

datafusion/core/src/test/object_store.rs

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,22 @@
2020
use crate::execution::context::SessionState;
2121
use crate::execution::session_state::SessionStateBuilder;
2222
use crate::prelude::SessionContext;
23+
use futures::stream::BoxStream;
2324
use futures::FutureExt;
24-
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
25+
use object_store::{
26+
memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult,
27+
MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
28+
PutResult,
29+
};
30+
use std::fmt::{Debug, Display, Formatter};
2531
use std::sync::Arc;
32+
use tokio::{
33+
sync::Barrier,
34+
time::{timeout, Duration},
35+
};
2636
use url::Url;
2737

28-
/// Returns a test object store with the provided `ctx`
38+
/// Registers a test object store with the provided `ctx`
2939
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
3040
let url = Url::parse("test://").unwrap();
3141
ctx.register_object_store(&url, make_test_store_and_state(files).0);
@@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
6171
version: None,
6272
}
6373
}
74+
75+
/// Blocks the object_store `head` call until `concurrency` number of calls are pending.
76+
pub fn ensure_head_concurrency(
77+
object_store: Arc<dyn ObjectStore>,
78+
concurrency: usize,
79+
) -> Arc<dyn ObjectStore> {
80+
Arc::new(BlockingObjectStore::new(object_store, concurrency))
81+
}
82+
83+
/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached.
84+
#[derive(Debug)]
85+
struct BlockingObjectStore {
86+
inner: Arc<dyn ObjectStore>,
87+
barrier: Arc<Barrier>,
88+
}
89+
90+
impl BlockingObjectStore {
91+
const NAME: &'static str = "BlockingObjectStore";
92+
fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
93+
Self {
94+
inner,
95+
barrier: Arc::new(Barrier::new(expected_concurrency)),
96+
}
97+
}
98+
}
99+
100+
impl Display for BlockingObjectStore {
101+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102+
Display::fmt(&self.inner, f)
103+
}
104+
}
105+
106+
/// All trait methods are forwarded to the inner object store, except for
107+
/// the `head` method which waits until the expected number of concurrent calls is reached.
108+
#[async_trait::async_trait]
109+
impl ObjectStore for BlockingObjectStore {
110+
async fn put_opts(
111+
&self,
112+
location: &Path,
113+
payload: PutPayload,
114+
opts: PutOptions,
115+
) -> object_store::Result<PutResult> {
116+
self.inner.put_opts(location, payload, opts).await
117+
}
118+
async fn put_multipart_opts(
119+
&self,
120+
location: &Path,
121+
opts: PutMultipartOpts,
122+
) -> object_store::Result<Box<dyn MultipartUpload>> {
123+
self.inner.put_multipart_opts(location, opts).await
124+
}
125+
126+
async fn get_opts(
127+
&self,
128+
location: &Path,
129+
options: GetOptions,
130+
) -> object_store::Result<GetResult> {
131+
self.inner.get_opts(location, options).await
132+
}
133+
134+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
135+
println!(
136+
"{} received head call for {location}",
137+
BlockingObjectStore::NAME
138+
);
139+
// Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
140+
let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
141+
match wait_result {
142+
Ok(_) => println!(
143+
"{} barrier reached for {location}",
144+
BlockingObjectStore::NAME
145+
),
146+
Err(_) => {
147+
let error_message = format!(
148+
"{} barrier wait timed out for {location}",
149+
BlockingObjectStore::NAME
150+
);
151+
log::error!("{}", error_message);
152+
return Err(Error::Generic {
153+
store: BlockingObjectStore::NAME,
154+
source: error_message.into(),
155+
});
156+
}
157+
}
158+
// Forward the call to the inner object store.
159+
self.inner.head(location).await
160+
}
161+
162+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
163+
self.inner.delete(location).await
164+
}
165+
166+
fn list(
167+
&self,
168+
prefix: Option<&Path>,
169+
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
170+
self.inner.list(prefix)
171+
}
172+
173+
async fn list_with_delimiter(
174+
&self,
175+
prefix: Option<&Path>,
176+
) -> object_store::Result<ListResult> {
177+
self.inner.list_with_delimiter(prefix).await
178+
}
179+
180+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
181+
self.inner.copy(from, to).await
182+
}
183+
184+
async fn copy_if_not_exists(
185+
&self,
186+
from: &Path,
187+
to: &Path,
188+
) -> object_store::Result<()> {
189+
self.inner.copy_if_not_exists(from, to).await
190+
}
191+
}

0 commit comments

Comments
 (0)