Skip to content

Commit 1a6390b

Browse files
Fix sequential metadata fetching in ListingTable causing high latency (#14918)
* Fix sequential metadata fetching in ListingTable causing high latency When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan. This commit replaces the sequential flattening with concurrent merging using `stream::iter(file_list).flatten_unordered(meta_fetch_concurrency)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan. Note that the ordering loss introduced by `flatten_unordered` is perfectly acceptable as the file list will anyways be fully sorted by path in `split_files` before being returned. Additionally, tests have been updated to ensure that metadata fetching occurs concurrently. * fix fmt --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 2705bc8 commit 1a6390b

File tree

2 files changed

+261
-4
lines changed

2 files changed

+261
-4
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,9 @@ impl ListingTable {
11051105
)
11061106
}))
11071107
.await?;
1108-
let file_list = stream::iter(file_list).flatten();
1108+
let meta_fetch_concurrency =
1109+
ctx.config_options().execution.meta_fetch_concurrency;
1110+
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
11091111
// collect the statistics if required by the config
11101112
let files = file_list
11111113
.map(|part_file| async {
@@ -1122,7 +1124,7 @@ impl ListingTable {
11221124
}
11231125
})
11241126
.boxed()
1125-
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
1127+
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11261128

11271129
let (files, statistics) = get_statistics_with_limit(
11281130
files,
@@ -1202,7 +1204,9 @@ mod tests {
12021204
use datafusion_physical_expr::PhysicalSortExpr;
12031205
use datafusion_physical_plan::ExecutionPlanProperties;
12041206

1207+
use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
12051208
use tempfile::TempDir;
1209+
use url::Url;
12061210

12071211
#[tokio::test]
12081212
async fn read_single_file() -> Result<()> {
@@ -1592,6 +1596,81 @@ mod tests {
15921596
Ok(())
15931597
}
15941598

1599+
#[tokio::test]
1600+
async fn test_assert_list_files_for_exact_paths() -> Result<()> {
1601+
// more expected partitions than files
1602+
assert_list_files_for_exact_paths(
1603+
&[
1604+
"bucket/key1/file0",
1605+
"bucket/key1/file1",
1606+
"bucket/key1/file2",
1607+
"bucket/key2/file3",
1608+
"bucket/key2/file4",
1609+
],
1610+
12,
1611+
5,
1612+
Some(""),
1613+
)
1614+
.await?;
1615+
1616+
// more files than meta_fetch_concurrency (32)
1617+
let files: Vec<String> =
1618+
(0..64).map(|i| format!("bucket/key1/file{}", i)).collect();
1619+
// Collect references to each string
1620+
let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1621+
assert_list_files_for_exact_paths(file_refs.as_slice(), 5, 5, Some("")).await?;
1622+
1623+
// as many expected partitions as files
1624+
assert_list_files_for_exact_paths(
1625+
&[
1626+
"bucket/key1/file0",
1627+
"bucket/key1/file1",
1628+
"bucket/key1/file2",
1629+
"bucket/key2/file3",
1630+
"bucket/key2/file4",
1631+
],
1632+
5,
1633+
5,
1634+
Some(""),
1635+
)
1636+
.await?;
1637+
1638+
// more files as expected partitions
1639+
assert_list_files_for_exact_paths(
1640+
&[
1641+
"bucket/key1/file0",
1642+
"bucket/key1/file1",
1643+
"bucket/key1/file2",
1644+
"bucket/key2/file3",
1645+
"bucket/key2/file4",
1646+
],
1647+
2,
1648+
2,
1649+
Some(""),
1650+
)
1651+
.await?;
1652+
1653+
// no files => no groups
1654+
assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?;
1655+
1656+
// files that don't match the default file ext
1657+
assert_list_files_for_exact_paths(
1658+
&[
1659+
"bucket/key1/file0.avro",
1660+
"bucket/key1/file1.csv",
1661+
"bucket/key1/file2.avro",
1662+
"bucket/key2/file3.csv",
1663+
"bucket/key2/file4.avro",
1664+
"bucket/key3/file5.csv",
1665+
],
1666+
2,
1667+
2,
1668+
None,
1669+
)
1670+
.await?;
1671+
Ok(())
1672+
}
1673+
15951674
async fn load_table(
15961675
ctx: &SessionContext,
15971676
name: &str,
@@ -1678,6 +1757,56 @@ mod tests {
16781757
Ok(())
16791758
}
16801759

1760+
/// Check that the files listed by the table match the specified `output_partitioning`
1761+
/// when the object store contains `files`, and validate that file metadata is fetched
1762+
/// concurrently
1763+
async fn assert_list_files_for_exact_paths(
1764+
files: &[&str],
1765+
target_partitions: usize,
1766+
output_partitioning: usize,
1767+
file_ext: Option<&str>,
1768+
) -> Result<()> {
1769+
let ctx = SessionContext::new();
1770+
let (store, _) = make_test_store_and_state(
1771+
&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
1772+
);
1773+
1774+
let meta_fetch_concurrency = ctx
1775+
.state()
1776+
.config_options()
1777+
.execution
1778+
.meta_fetch_concurrency;
1779+
let expected_concurrency = files.len().min(meta_fetch_concurrency);
1780+
let head_blocking_store = ensure_head_concurrency(store, expected_concurrency);
1781+
1782+
let url = Url::parse("test://").unwrap();
1783+
ctx.register_object_store(&url, head_blocking_store.clone());
1784+
1785+
let format = AvroFormat {};
1786+
1787+
let opt = ListingOptions::new(Arc::new(format))
1788+
.with_file_extension_opt(file_ext)
1789+
.with_target_partitions(target_partitions);
1790+
1791+
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1792+
1793+
let table_paths = files
1794+
.iter()
1795+
.map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap())
1796+
.collect();
1797+
let config = ListingTableConfig::new_with_multi_paths(table_paths)
1798+
.with_listing_options(opt)
1799+
.with_schema(Arc::new(schema));
1800+
1801+
let table = ListingTable::try_new(config)?;
1802+
1803+
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1804+
1805+
assert_eq!(file_list.len(), output_partitioning);
1806+
1807+
Ok(())
1808+
}
1809+
16811810
#[tokio::test]
16821811
async fn test_insert_into_append_new_json_files() -> Result<()> {
16831812
let mut config_map: HashMap<String, String> = HashMap::new();

datafusion/core/src/test/object_store.rs

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,22 @@
2020
use crate::execution::context::SessionState;
2121
use crate::execution::session_state::SessionStateBuilder;
2222
use crate::prelude::SessionContext;
23+
use futures::stream::BoxStream;
2324
use futures::FutureExt;
24-
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
25+
use object_store::{
26+
memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult,
27+
MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
28+
PutResult,
29+
};
30+
use std::fmt::{Debug, Display, Formatter};
2531
use std::sync::Arc;
32+
use tokio::{
33+
sync::Barrier,
34+
time::{timeout, Duration},
35+
};
2636
use url::Url;
2737

28-
/// Returns a test object store with the provided `ctx`
38+
/// Registers a test object store with the provided `ctx`
2939
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
3040
let url = Url::parse("test://").unwrap();
3141
ctx.register_object_store(&url, make_test_store_and_state(files).0);
@@ -61,3 +71,121 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
6171
version: None,
6272
}
6373
}
74+
75+
/// Blocks the object_store `head` call until `concurrency` number of calls are pending.
76+
pub fn ensure_head_concurrency(
77+
object_store: Arc<dyn ObjectStore>,
78+
concurrency: usize,
79+
) -> Arc<dyn ObjectStore> {
80+
Arc::new(BlockingObjectStore::new(object_store, concurrency))
81+
}
82+
83+
/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached.
84+
#[derive(Debug)]
85+
struct BlockingObjectStore {
86+
inner: Arc<dyn ObjectStore>,
87+
barrier: Arc<Barrier>,
88+
}
89+
90+
impl BlockingObjectStore {
91+
const NAME: &'static str = "BlockingObjectStore";
92+
fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
93+
Self {
94+
inner,
95+
barrier: Arc::new(Barrier::new(expected_concurrency)),
96+
}
97+
}
98+
}
99+
100+
impl Display for BlockingObjectStore {
101+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102+
Display::fmt(&self.inner, f)
103+
}
104+
}
105+
106+
/// All trait methods are forwarded to the inner object store, except for
107+
/// the `head` method which waits until the expected number of concurrent calls is reached.
108+
#[async_trait::async_trait]
109+
impl ObjectStore for BlockingObjectStore {
110+
async fn put_opts(
111+
&self,
112+
location: &Path,
113+
payload: PutPayload,
114+
opts: PutOptions,
115+
) -> object_store::Result<PutResult> {
116+
self.inner.put_opts(location, payload, opts).await
117+
}
118+
async fn put_multipart_opts(
119+
&self,
120+
location: &Path,
121+
opts: PutMultipartOpts,
122+
) -> object_store::Result<Box<dyn MultipartUpload>> {
123+
self.inner.put_multipart_opts(location, opts).await
124+
}
125+
126+
async fn get_opts(
127+
&self,
128+
location: &Path,
129+
options: GetOptions,
130+
) -> object_store::Result<GetResult> {
131+
self.inner.get_opts(location, options).await
132+
}
133+
134+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
135+
println!(
136+
"{} received head call for {location}",
137+
BlockingObjectStore::NAME
138+
);
139+
// Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
140+
let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
141+
match wait_result {
142+
Ok(_) => println!(
143+
"{} barrier reached for {location}",
144+
BlockingObjectStore::NAME
145+
),
146+
Err(_) => {
147+
let error_message = format!(
148+
"{} barrier wait timed out for {location}",
149+
BlockingObjectStore::NAME
150+
);
151+
log::error!("{}", error_message);
152+
return Err(Error::Generic {
153+
store: BlockingObjectStore::NAME,
154+
source: error_message.into(),
155+
});
156+
}
157+
}
158+
// Forward the call to the inner object store.
159+
self.inner.head(location).await
160+
}
161+
162+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
163+
self.inner.delete(location).await
164+
}
165+
166+
fn list(
167+
&self,
168+
prefix: Option<&Path>,
169+
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
170+
self.inner.list(prefix)
171+
}
172+
173+
async fn list_with_delimiter(
174+
&self,
175+
prefix: Option<&Path>,
176+
) -> object_store::Result<ListResult> {
177+
self.inner.list_with_delimiter(prefix).await
178+
}
179+
180+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
181+
self.inner.copy(from, to).await
182+
}
183+
184+
async fn copy_if_not_exists(
185+
&self,
186+
from: &Path,
187+
to: &Path,
188+
) -> object_store::Result<()> {
189+
self.inner.copy_if_not_exists(from, to).await
190+
}
191+
}

0 commit comments

Comments
 (0)