Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,25 @@ lz4_flex = { version = "0.11", default-features = false, features = ["std", "fra
zstd = { version = "0.13", default-features = false }
serde_json = { version = "1.0", features = ["std"], default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] }
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }
object_store = { version = "0.12.0", default-features = false, features = ["azure", "fs"] }
sysinfo = { version = "0.36.0", default-features = false, features = ["system"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1.0", default-features = false, features = [
"macros",
"rt-multi-thread",
"io-util",
"fs",
] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
tokio = { version = "1.0", default-features = false, features = [
"macros",
"rt",
"io-util",
] }

[package.metadata.docs.rs]
all-features = true

Expand Down
10 changes: 5 additions & 5 deletions parquet/benches/arrow_reader_row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ use arrow_array::StringViewArray;
use arrow_cast::pretty::pretty_format_batches;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter,
Expand All @@ -72,6 +71,7 @@ use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMas
use parquet::basic::Compression;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use parquet::file::properties::WriterProperties;
use parquet::future::BoxedFuture;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -572,17 +572,17 @@ impl InMemoryReader {
}

impl AsyncFileReader for InMemoryReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxedFuture<'_, parquet::errors::Result<Bytes>> {
let data = self.inner.slice(range.start as usize..range.end as usize);
async move { Ok(data) }.boxed()
Box::pin(async move { Ok(data) })
}

fn get_metadata<'a>(
&'a mut self,
_options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
) -> BoxedFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata = Arc::clone(&self.metadata);
async move { Ok(metadata) }.boxed()
Box::pin(async move { Ok(metadata) })
}
}

Expand Down
17 changes: 8 additions & 9 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
use crate::future::BoxedFuture;
use bytes::Bytes;
use futures::future::BoxFuture;
use std::ops::Range;

/// A data source that can be used with [`ParquetMetaDataReader`] to load [`ParquetMetaData`]
Expand All @@ -30,10 +30,10 @@ use std::ops::Range;
/// ```rust
/// # use parquet::errors::Result;
/// # use parquet::arrow::async_reader::MetadataFetch;
/// # use parquet::future::BoxedFuture;
/// # use bytes::Bytes;
/// # use std::ops::Range;
/// # use std::io::SeekFrom;
/// # use futures::future::BoxFuture;
/// # use futures::FutureExt;
/// # use tokio::io::{AsyncReadExt, AsyncSeekExt};
/// // Adapter that implements the API for reading bytes from an async source (in
Expand All @@ -42,17 +42,16 @@ use std::ops::Range;
/// file: tokio::fs::File,
/// }
/// impl MetadataFetch for TokioFileMetadata {
/// fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
/// fn fetch(&mut self, range: Range<u64>) -> BoxedFuture<'_, Result<Bytes>> {
/// // return a future that fetches data in range
/// async move {
/// Box::pin(async move {
/// let len = (range.end - range.start).try_into().unwrap();
/// let mut buf = vec![0; len]; // target buffer
/// // seek to the start of the range and read the data
/// self.file.seek(SeekFrom::Start(range.start)).await?;
/// self.file.read_exact(&mut buf).await?;
/// Ok(Bytes::from(buf)) // convert to Bytes
/// }
/// .boxed() // turn into BoxedFuture, using FutureExt::boxed
/// }) // turn into BoxedFuture
/// }
/// }
///```
Expand All @@ -66,11 +65,11 @@ pub trait MetadataFetch {
/// [`FutureExt::boxed`]. See the trait documentation for an example
///
/// [`FutureExt::boxed`]: futures::FutureExt::boxed
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
fn fetch(&mut self, range: Range<u64>) -> BoxedFuture<'_, Result<Bytes>>;
}

impl<T: AsyncFileReader> MetadataFetch for &mut T {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn fetch(&mut self, range: Range<u64>) -> BoxedFuture<'_, Result<Bytes>> {
self.get_bytes(range)
}
}
Expand All @@ -87,5 +86,5 @@ pub trait MetadataSuffixFetch: MetadataFetch {
/// [`FutureExt::boxed`]. See the trait documentation for an example
///
/// [`FutureExt::boxed`]: futures::FutureExt::boxed
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
fn fetch_suffix(&mut self, suffix: usize) -> BoxedFuture<'_, Result<Bytes>>;
}
77 changes: 40 additions & 37 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::future::FutureExt;
use futures::ready;
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
Expand All @@ -56,6 +56,7 @@ use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataRea
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::future::BoxedFuture;

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -84,11 +85,11 @@ pub use store::*;
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
pub trait AsyncFileReader: Send {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
fn get_bytes(&mut self, range: Range<u64>) -> BoxedFuture<'_, Result<Bytes>>;

/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxedFuture<'_, Result<Vec<Bytes>>> {
Box::pin(async move {
let mut result = Vec::with_capacity(ranges.len());

for range in ranges.into_iter() {
Expand All @@ -97,8 +98,7 @@ pub trait AsyncFileReader: Send {
}

Ok(result)
}
.boxed()
})
}

/// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
Expand All @@ -120,42 +120,41 @@ pub trait AsyncFileReader: Send {
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
) -> BoxedFuture<'a, Result<Arc<ParquetMetaData>>>;
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxedFuture<'_, Result<Bytes>> {
self.as_mut().get_bytes(range)
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxedFuture<'_, Result<Vec<Bytes>>> {
self.as_mut().get_byte_ranges(ranges)
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
) -> BoxedFuture<'a, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata(options)
}
}

impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
async move {
fn fetch_suffix(&mut self, suffix: usize) -> BoxedFuture<'_, Result<Bytes>> {
Box::pin(async move {
self.seek(SeekFrom::End(-(suffix as i64))).await?;
let mut buf = Vec::with_capacity(suffix);
self.take(suffix as _).read_to_end(&mut buf).await?;
Ok(buf.into())
}
.boxed()
})
}
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
async move {
fn get_bytes(&mut self, range: Range<u64>) -> BoxedFuture<'_, Result<Bytes>> {
Box::pin(async move {
self.seek(SeekFrom::Start(range.start)).await?;

let to_read = range.end - range.start;
Expand All @@ -166,15 +165,14 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
}

Ok(buffer.into())
}
.boxed()
})
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
async move {
) -> BoxedFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
);
Expand All @@ -186,8 +184,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {

let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
Ok(Arc::new(parquet_metadata))
}
.boxed()
})
}
}

Expand Down Expand Up @@ -762,7 +759,7 @@ enum StreamState<T> {
/// Decoding a batch
Decoding(ParquetRecordBatchReader),
/// Reading data from input
Reading(BoxFuture<'static, ReadResult<T>>),
Reading(BoxedFuture<'static, ReadResult<T>>),
/// Error
Error,
}
Expand Down Expand Up @@ -930,14 +927,12 @@ where

let selection = self.selection.as_mut().map(|s| s.split_off(row_count));

let fut = reader
.read_row_group(
row_group_idx,
selection,
self.projection.clone(),
self.batch_size,
)
.boxed();
let fut = Box::pin(reader.read_row_group(
row_group_idx,
selection,
self.projection.clone(),
self.batch_size,
));

self.state = StreamState::Reading(fut)
}
Expand Down Expand Up @@ -1246,29 +1241,32 @@ mod tests {
}

impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxedFuture<'_, Result<Bytes>> {
let range = range.clone();
self.requests
.lock()
.unwrap()
.push(range.start as usize..range.end as usize);
futures::future::ready(Ok(self
Box::pin(futures::future::ready(Ok(self
.data
.slice(range.start as usize..range.end as usize)))
.boxed()
.slice(range.start as usize..range.end as usize))))
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
) -> BoxedFuture<'a, Result<Arc<ParquetMetaData>>> {
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
);
self.metadata = Some(Arc::new(
metadata_reader.parse_and_finish(&self.data).unwrap(),
));
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
Box::pin(futures::future::ready(Ok(self
.metadata
.clone()
.unwrap()
.clone())))
}
}

Expand Down Expand Up @@ -2056,6 +2054,7 @@ mod tests {
}

#[tokio::test]
#[cfg(not(target_arch = "wasm32"))]
async fn test_parquet_record_batch_stream_schema() {
fn get_all_field_names(schema: &Schema) -> Vec<&String> {
schema.flattened_fields().iter().map(|f| f.name()).collect()
Expand Down Expand Up @@ -2214,6 +2213,7 @@ mod tests {
}

#[tokio::test]
#[cfg(not(target_arch = "wasm32"))]
async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Expand Down Expand Up @@ -2461,6 +2461,7 @@ mod tests {

#[tokio::test]
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
Expand All @@ -2487,6 +2488,7 @@ mod tests {

#[tokio::test]
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
Expand All @@ -2512,6 +2514,7 @@ mod tests {

#[tokio::test]
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
async fn empty_offset_index_doesnt_panic_in_column_chunks() {
use tempfile::TempDir;
use tokio::fs::File;
Expand Down
Loading
Loading