Skip to content

Commit 94177d3

Browse files
Fix sequential metadata fetching in ListingTable causing high latency
When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan. This commit replaces the sequential flattening with concurrent merging using `stream::iter(file_list).flatten_unordered(meta_fetch_concurrency)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan. Note that the ordering loss introduced by `flatten_unordered` is perfectly acceptable as the file list will anyways be fully sorted by path in `split_files` before being returned. Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.
1 parent b4c3784 commit 94177d3

File tree

2 files changed

+262
-4
lines changed

2 files changed

+262
-4
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,9 @@ impl ListingTable {
10981098
)
10991099
}))
11001100
.await?;
1101-
let file_list = stream::iter(file_list).flatten();
1101+
let meta_fetch_concurrency =
1102+
ctx.config_options().execution.meta_fetch_concurrency;
1103+
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
11021104
// collect the statistics if required by the config
11031105
let files = file_list
11041106
.map(|part_file| async {
@@ -1115,7 +1117,7 @@ impl ListingTable {
11151117
}
11161118
})
11171119
.boxed()
1118-
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
1120+
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11191121

11201122
let (files, statistics) = get_statistics_with_limit(
11211123
files,
@@ -1195,7 +1197,9 @@ mod tests {
11951197
use datafusion_physical_expr::PhysicalSortExpr;
11961198
use datafusion_physical_plan::ExecutionPlanProperties;
11971199

1200+
use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
11981201
use tempfile::TempDir;
1202+
use url::Url;
11991203

12001204
#[tokio::test]
12011205
async fn read_single_file() -> Result<()> {
@@ -1584,6 +1588,82 @@ mod tests {
15841588
Ok(())
15851589
}
15861590

1591+
#[tokio::test]
1592+
async fn test_assert_list_files_for_exact_paths() -> Result<()> {
1593+
// more expected partitions than files
1594+
assert_list_files_for_exact_paths(
1595+
&[
1596+
"bucket/key1/file0",
1597+
"bucket/key1/file1",
1598+
"bucket/key1/file2",
1599+
"bucket/key2/file3",
1600+
"bucket/key2/file4",
1601+
],
1602+
12,
1603+
5,
1604+
Some(""),
1605+
)
1606+
.await?;
1607+
1608+
// more files than meta_fetch_concurrency (32)
1609+
let files: Vec<String> = (0..64)
1610+
.map(|i| format!("bucket/key1/file{}", i))
1611+
.collect();
1612+
// Collect references to each string
1613+
let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1614+
assert_list_files_for_exact_paths(file_refs.as_slice(), 5, 5, Some("")).await?;
1615+
1616+
// as many expected partitions as files
1617+
assert_list_files_for_exact_paths(
1618+
&[
1619+
"bucket/key1/file0",
1620+
"bucket/key1/file1",
1621+
"bucket/key1/file2",
1622+
"bucket/key2/file3",
1623+
"bucket/key2/file4",
1624+
],
1625+
5,
1626+
5,
1627+
Some(""),
1628+
)
1629+
.await?;
1630+
1631+
// more files as expected partitions
1632+
assert_list_files_for_exact_paths(
1633+
&[
1634+
"bucket/key1/file0",
1635+
"bucket/key1/file1",
1636+
"bucket/key1/file2",
1637+
"bucket/key2/file3",
1638+
"bucket/key2/file4",
1639+
],
1640+
2,
1641+
2,
1642+
Some(""),
1643+
)
1644+
.await?;
1645+
1646+
// no files => no groups
1647+
assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?;
1648+
1649+
// files that don't match the default file ext
1650+
assert_list_files_for_exact_paths(
1651+
&[
1652+
"bucket/key1/file0.avro",
1653+
"bucket/key1/file1.csv",
1654+
"bucket/key1/file2.avro",
1655+
"bucket/key2/file3.csv",
1656+
"bucket/key2/file4.avro",
1657+
"bucket/key3/file5.csv",
1658+
],
1659+
2,
1660+
2,
1661+
None,
1662+
)
1663+
.await?;
1664+
Ok(())
1665+
}
1666+
15871667
async fn load_table(
15881668
ctx: &SessionContext,
15891669
name: &str,
@@ -1670,6 +1750,56 @@ mod tests {
16701750
Ok(())
16711751
}
16721752

1753+
/// Check that the files listed by the table match the specified `output_partitioning`
1754+
/// when the object store contains `files`, and validate that file metadata is fetched
1755+
/// concurrently
1756+
async fn assert_list_files_for_exact_paths(
1757+
files: &[&str],
1758+
target_partitions: usize,
1759+
output_partitioning: usize,
1760+
file_ext: Option<&str>,
1761+
) -> Result<()> {
1762+
let ctx = SessionContext::new();
1763+
let (store, _) = make_test_store_and_state(
1764+
&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
1765+
);
1766+
1767+
let meta_fetch_concurrency = ctx
1768+
.state()
1769+
.config_options()
1770+
.execution
1771+
.meta_fetch_concurrency;
1772+
let expected_concurrency = files.len().min(meta_fetch_concurrency);
1773+
let head_blocking_store = ensure_head_concurrency(store, expected_concurrency);
1774+
1775+
let url = Url::parse("test://").unwrap();
1776+
ctx.register_object_store(&url, head_blocking_store.clone());
1777+
1778+
let format = AvroFormat {};
1779+
1780+
let opt = ListingOptions::new(Arc::new(format))
1781+
.with_file_extension_opt(file_ext)
1782+
.with_target_partitions(target_partitions);
1783+
1784+
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1785+
1786+
let table_paths = files
1787+
.iter()
1788+
.map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap())
1789+
.collect();
1790+
let config = ListingTableConfig::new_with_multi_paths(table_paths)
1791+
.with_listing_options(opt)
1792+
.with_schema(Arc::new(schema));
1793+
1794+
let table = ListingTable::try_new(config)?;
1795+
1796+
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1797+
1798+
assert_eq!(file_list.len(), output_partitioning);
1799+
1800+
Ok(())
1801+
}
1802+
16731803
#[tokio::test]
16741804
async fn test_insert_into_append_new_json_files() -> Result<()> {
16751805
let mut config_map: HashMap<String, String> = HashMap::new();

datafusion/core/src/test/object_store.rs

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,22 @@
2020
use crate::execution::context::SessionState;
2121
use crate::execution::session_state::SessionStateBuilder;
2222
use crate::prelude::SessionContext;
23+
use futures::stream::BoxStream;
2324
use futures::FutureExt;
24-
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
25+
use object_store::{
26+
memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult,
27+
MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
28+
PutResult,
29+
};
30+
use std::fmt::{Debug, Display, Formatter};
2531
use std::sync::Arc;
32+
use tokio::{
33+
sync::Barrier,
34+
time::{timeout, Duration},
35+
};
2636
use url::Url;
2737

28-
/// Returns a test object store with the provided `ctx`
38+
/// Registers a test object store with the provided `ctx`
2939
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
3040
let url = Url::parse("test://").unwrap();
3141
ctx.register_object_store(&url, make_test_store_and_state(files).0);
@@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
6171
version: None,
6272
}
6373
}
74+
75+
/// Blocks the object_store `head` call until `concurrency` number of calls are pending.
76+
pub fn ensure_head_concurrency(
77+
object_store: Arc<dyn ObjectStore>,
78+
concurrency: usize,
79+
) -> Arc<dyn ObjectStore> {
80+
Arc::new(BlockingObjectStore::new(object_store, concurrency))
81+
}
82+
83+
/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached.
84+
#[derive(Debug)]
85+
struct BlockingObjectStore {
86+
inner: Arc<dyn ObjectStore>,
87+
barrier: Arc<Barrier>,
88+
}
89+
90+
impl BlockingObjectStore {
91+
const NAME: &'static str = "BlockingObjectStore";
92+
fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
93+
Self {
94+
inner,
95+
barrier: Arc::new(Barrier::new(expected_concurrency)),
96+
}
97+
}
98+
}
99+
100+
impl Display for BlockingObjectStore {
101+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102+
Display::fmt(&self.inner, f)
103+
}
104+
}
105+
106+
/// All trait methods are forwarded to the inner object store, except for
107+
/// the `head` method which waits until the expected number of concurrent calls is reached.
108+
#[async_trait::async_trait]
109+
impl ObjectStore for BlockingObjectStore {
110+
async fn put_opts(
111+
&self,
112+
location: &Path,
113+
payload: PutPayload,
114+
opts: PutOptions,
115+
) -> object_store::Result<PutResult> {
116+
self.inner.put_opts(location, payload, opts).await
117+
}
118+
async fn put_multipart_opts(
119+
&self,
120+
location: &Path,
121+
opts: PutMultipartOpts,
122+
) -> object_store::Result<Box<dyn MultipartUpload>> {
123+
self.inner.put_multipart_opts(location, opts).await
124+
}
125+
126+
async fn get_opts(
127+
&self,
128+
location: &Path,
129+
options: GetOptions,
130+
) -> object_store::Result<GetResult> {
131+
self.inner.get_opts(location, options).await
132+
}
133+
134+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
135+
println!(
136+
"{} received head call for {location}",
137+
BlockingObjectStore::NAME
138+
);
139+
// Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
140+
let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
141+
match wait_result {
142+
Ok(_) => println!(
143+
"{} barrier reached for {location}",
144+
BlockingObjectStore::NAME
145+
),
146+
Err(_) => {
147+
let error_message = format!(
148+
"{} barrier wait timed out for {location}",
149+
BlockingObjectStore::NAME
150+
);
151+
log::error!("{}", error_message);
152+
return Err(Error::Generic {
153+
store: BlockingObjectStore::NAME,
154+
source: error_message.into(),
155+
});
156+
}
157+
}
158+
// Forward the call to the inner object store.
159+
self.inner.head(location).await
160+
}
161+
162+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
163+
self.inner.delete(location).await
164+
}
165+
166+
fn list(
167+
&self,
168+
prefix: Option<&Path>,
169+
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
170+
self.inner.list(prefix)
171+
}
172+
173+
async fn list_with_delimiter(
174+
&self,
175+
prefix: Option<&Path>,
176+
) -> object_store::Result<ListResult> {
177+
self.inner.list_with_delimiter(prefix).await
178+
}
179+
180+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
181+
self.inner.copy(from, to).await
182+
}
183+
184+
async fn copy_if_not_exists(
185+
&self,
186+
from: &Path,
187+
to: &Path,
188+
) -> object_store::Result<()> {
189+
self.inner.copy_if_not_exists(from, to).await
190+
}
191+
}

0 commit comments

Comments
 (0)