Skip to content

Commit 978c24f

Browse files
author
Devdutt Shenoi
committed
fix: inconsistent ordering when querying parquets in staging
1 parent 1b066f0 commit 978c24f

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

src/query/stream_schema_provider.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,19 @@ impl StandardTableProvider {
239239
.await?;
240240
execution_plans.push(arrow_exec);
241241

242-
// Partititon parquet files on disk among the available CPUs
243-
let target_partition = num_cpus::get();
244-
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
245-
for (index, file_path) in staging.parquet_files().into_iter().enumerate() {
242+
// Get a list of parquet files still in staging, order by filename
243+
let mut parquet_files = staging.parquet_files();
244+
parquet_files.sort_by(|a, b| a.cmp(b).reverse());
245+
246+
// NOTE: We don't partition among CPUs to ensure consistent results.
247+
// i.e. We were seeing in-consistent ordering when querying over parquets in staging.
248+
let mut partitioned_files = Vec::with_capacity(parquet_files.len());
249+
for file_path in parquet_files {
246250
let Ok(file_meta) = file_path.metadata() else {
247251
continue;
248252
};
249253
let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len());
250-
partitioned_files[index % target_partition].push(file)
254+
partitioned_files.push(file)
251255
}
252256

253257
// NOTE: There is the possibility of a parquet file being pushed to object store
@@ -256,7 +260,7 @@ impl StandardTableProvider {
256260
self.create_parquet_physical_plan(
257261
execution_plans,
258262
ObjectStoreUrl::parse("file:///").unwrap(),
259-
partitioned_files,
263+
vec![partitioned_files],
260264
Statistics::new_unknown(&self.schema),
261265
projection,
262266
filters,

0 commit comments

Comments
 (0)