diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 5dbd4b5b39dd..1fdcc70dd290 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -89,11 +89,25 @@ lz4_flex = { version = "0.11", default-features = false, features = ["std", "fra zstd = { version = "0.13", default-features = false } serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } -tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] } rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] } object_store = { version = "0.12.0", default-features = false, features = ["azure", "fs"] } sysinfo = { version = "0.36.0", default-features = false, features = ["system"] } +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +tokio = { version = "1.0", default-features = false, features = [ + "macros", + "rt-multi-thread", + "io-util", + "fs", +] } + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +tokio = { version = "1.0", default-features = false, features = [ + "macros", + "rt", + "io-util", +] } + [package.metadata.docs.rs] all-features = true diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs index ec403f8fd39c..5ec4e7c56261 100644 --- a/parquet/benches/arrow_reader_row_filter.rs +++ b/parquet/benches/arrow_reader_row_filter.rs @@ -62,7 +62,6 @@ use arrow_array::StringViewArray; use arrow_cast::pretty::pretty_format_batches; use bytes::Bytes; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use futures::future::BoxFuture; use futures::{FutureExt, StreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, @@ -72,6 +71,7 @@ use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMas use parquet::basic::Compression; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; use parquet::file::properties::WriterProperties; +use parquet::future::BoxedFuture; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::ops::Range; use std::sync::Arc; @@ -572,17 +572,17 @@ impl InMemoryReader { } impl AsyncFileReader for InMemoryReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, parquet::errors::Result> { let data = self.inner.slice(range.start as usize..range.end as usize); - async move { Ok(data) }.boxed() + Box::pin(async move { Ok(data) }) } fn get_metadata<'a>( &'a mut self, _options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, parquet::errors::Result>> { + ) -> BoxedFuture<'a, parquet::errors::Result>> { let metadata = Arc::clone(&self.metadata); - async move { Ok(metadata) }.boxed() + Box::pin(async move { Ok(metadata) }) } } diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 0ab6a621fca0..b6641c0c99f9 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -17,8 +17,8 @@ use crate::arrow::async_reader::AsyncFileReader; use crate::errors::Result; +use crate::future::BoxedFuture; use bytes::Bytes; -use futures::future::BoxFuture; use std::ops::Range; /// A data source that can be used with [`ParquetMetaDataReader`] to load [`ParquetMetaData`] @@ -30,10 +30,10 @@ use std::ops::Range; /// ```rust /// # use parquet::errors::Result; /// # use parquet::arrow::async_reader::MetadataFetch; +/// # use parquet::future::BoxedFuture; /// # use bytes::Bytes; /// # use std::ops::Range; /// # use std::io::SeekFrom; -/// # use futures::future::BoxFuture; /// # use futures::FutureExt; /// # use tokio::io::{AsyncReadExt, AsyncSeekExt}; /// // Adapter that implements the API for reading bytes from an async source (in @@ -42,17 +42,16 @@ use std::ops::Range; /// file: tokio::fs::File, /// } /// impl MetadataFetch for TokioFileMetadata { -/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { +/// fn fetch(&mut self, range: Range) -> BoxedFuture<'_, Result> { /// // return a future that fetches data in range -/// async move { +/// Box::pin(async move { /// let len = (range.end - range.start).try_into().unwrap(); /// let mut buf = vec![0; len]; // target buffer /// // seek to the start of the range and read the data /// self.file.seek(SeekFrom::Start(range.start)).await?; /// self.file.read_exact(&mut buf).await?; /// Ok(Bytes::from(buf)) // convert to Bytes -/// } -/// .boxed() // turn into BoxedFuture, using FutureExt::boxed +/// }) // turn into BoxedFuture /// } /// } ///``` @@ -66,11 +65,11 @@ pub trait MetadataFetch { /// [`FutureExt::boxed`]. See the trait documentation for an example /// /// [`FutureExt::boxed`]: futures::FutureExt::boxed - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn fetch(&mut self, range: Range) -> BoxedFuture<'_, Result>; } impl MetadataFetch for &mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxedFuture<'_, Result> { self.get_bytes(range) } } @@ -87,5 +86,5 @@ pub trait MetadataSuffixFetch: MetadataFetch { /// [`FutureExt::boxed`]. See the trait documentation for an example /// /// [`FutureExt::boxed`]: futures::FutureExt::boxed - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result>; + fn fetch_suffix(&mut self, suffix: usize) -> BoxedFuture<'_, Result>; } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 33b03fbbca95..95c065d7d00f 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -30,7 +30,7 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; -use futures::future::{BoxFuture, FutureExt}; +use futures::future::FutureExt; use futures::ready; use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -56,6 +56,7 @@ use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataRea use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; +use crate::future::BoxedFuture; mod metadata; pub use metadata::*; @@ -84,11 +85,11 @@ pub use store::*; /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html pub trait AsyncFileReader: Send { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { - async move { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxedFuture<'_, Result>> { + Box::pin(async move { let mut result = Vec::with_capacity(ranges.len()); for range in ranges.into_iter() { @@ -97,8 +98,7 @@ pub trait AsyncFileReader: Send { } Ok(result) - } - .boxed() + }) } /// Return a future which results in the [`ParquetMetaData`] for this Parquet file. @@ -120,42 +120,41 @@ pub trait AsyncFileReader: Send { fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>>; + ) -> BoxedFuture<'a, Result>>; } /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, Result> { self.as_mut().get_bytes(range) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxedFuture<'_, Result>> { self.as_mut().get_byte_ranges(ranges) } fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> BoxedFuture<'a, Result>> { self.as_mut().get_metadata(options) } } impl MetadataSuffixFetch for T { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { - async move { + fn fetch_suffix(&mut self, suffix: usize) -> BoxedFuture<'_, Result> { + Box::pin(async move { self.seek(SeekFrom::End(-(suffix as i64))).await?; let mut buf = Vec::with_capacity(suffix); self.take(suffix as _).read_to_end(&mut buf).await?; Ok(buf.into()) - } - .boxed() + }) } } impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, Result> { + Box::pin(async move { self.seek(SeekFrom::Start(range.start)).await?; let to_read = range.end - range.start; @@ -166,15 +165,14 @@ impl AsyncFileReader for T { } Ok(buffer.into()) - } - .boxed() + }) } fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { - async move { + ) -> BoxedFuture<'a, Result>> { + Box::pin(async move { let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy( PageIndexPolicy::from(options.is_some_and(|o| o.page_index())), ); @@ -186,8 +184,7 @@ impl AsyncFileReader for T { let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?; Ok(Arc::new(parquet_metadata)) - } - .boxed() + }) } } @@ -762,7 +759,7 @@ enum StreamState { /// Decoding a batch Decoding(ParquetRecordBatchReader), /// Reading data from input - Reading(BoxFuture<'static, ReadResult>), + Reading(BoxedFuture<'static, ReadResult>), /// Error Error, } @@ -930,14 +927,12 @@ where let selection = self.selection.as_mut().map(|s| s.split_off(row_count)); - let fut = reader - .read_row_group( - row_group_idx, - selection, - self.projection.clone(), - self.batch_size, - ) - .boxed(); + let fut = Box::pin(reader.read_row_group( + row_group_idx, + selection, + self.projection.clone(), + self.batch_size, + )); self.state = StreamState::Reading(fut) } @@ -1246,29 +1241,32 @@ mod tests { } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, Result> { let range = range.clone(); self.requests .lock() .unwrap() .push(range.start as usize..range.end as usize); - futures::future::ready(Ok(self + Box::pin(futures::future::ready(Ok(self .data - .slice(range.start as usize..range.end as usize))) - .boxed() + .slice(range.start as usize..range.end as usize)))) } fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> BoxedFuture<'a, Result>> { let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy( PageIndexPolicy::from(options.is_some_and(|o| o.page_index())), ); self.metadata = Some(Arc::new( metadata_reader.parse_and_finish(&self.data).unwrap(), )); - futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed() + Box::pin(futures::future::ready(Ok(self + .metadata + .clone() + .unwrap() + .clone()))) } } @@ -2056,6 +2054,7 @@ mod tests { } #[tokio::test] + #[cfg(not(target_arch = "wasm32"))] async fn test_parquet_record_batch_stream_schema() { fn get_all_field_names(schema: &Schema) -> Vec<&String> { schema.flattened_fields().iter().map(|f| f.name()).collect() @@ -2214,6 +2213,7 @@ mod tests { } #[tokio::test] + #[cfg(not(target_arch = "wasm32"))] async fn test_nested_skip() { let schema = Arc::new(Schema::new(vec![ Field::new("col_1", DataType::UInt64, false), @@ -2461,6 +2461,7 @@ mod tests { #[tokio::test] #[allow(deprecated)] + #[cfg(not(target_arch = "wasm32"))] async fn empty_offset_index_doesnt_panic_in_read_row_group() { use tokio::fs::File; let testdata = arrow::util::test_util::parquet_test_data(); @@ -2487,6 +2488,7 @@ mod tests { #[tokio::test] #[allow(deprecated)] + #[cfg(not(target_arch = "wasm32"))] async fn non_empty_offset_index_doesnt_panic_in_read_row_group() { use tokio::fs::File; let testdata = arrow::util::test_util::parquet_test_data(); @@ -2512,6 +2514,7 @@ mod tests { #[tokio::test] #[allow(deprecated)] + #[cfg(not(target_arch = "wasm32"))] async fn empty_offset_index_doesnt_panic_in_column_chunks() { use tempfile::TempDir; use tokio::fs::File; diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index ce1398b56d37..8f1f3f31b137 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -22,7 +22,7 @@ use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; use bytes::Bytes; -use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use futures::{FutureExt, TryFutureExt}; use object_store::{path::Path, ObjectStore}; use object_store::{GetOptions, GetRange}; use tokio::runtime::Handle; @@ -132,9 +132,9 @@ impl ParquetObjectReader { } } - fn spawn(&self, f: F) -> BoxFuture<'_, Result> + fn spawn(&self, f: F) -> BoxedFuture<'_, Result> where - F: for<'a> FnOnce(&'a Arc, &'a Path) -> BoxFuture<'a, Result> + F: for<'a> FnOnce(&'a Arc, &'a Path) -> BoxedFuture<'a, Result> + Send + 'static, O: Send + 'static, @@ -144,48 +144,48 @@ impl ParquetObjectReader { Some(handle) => { let path = self.path.clone(); let store = Arc::clone(&self.store); - handle - .spawn(async move { f(&store, &path).await }) - .map_ok_or_else( - |e| match e.try_into_panic() { - Err(e) => Err(ParquetError::External(Box::new(e))), - Ok(p) => std::panic::resume_unwind(p), - }, - |res| res.map_err(|e| e.into()), - ) - .boxed() + Box::pin( + handle + .spawn(async move { f(&store, &path).await }) + .map_ok_or_else( + |e| match e.try_into_panic() { + Err(e) => Err(ParquetError::External(Box::new(e))), + Ok(p) => std::panic::resume_unwind(p), + }, + |res| res.map_err(|e| e.into()), + ), + ) } - None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(), + None => Box::pin(f(&self.store, &self.path).map_err(|e| e.into())), } } } impl MetadataSuffixFetch for &mut ParquetObjectReader { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { + fn fetch_suffix(&mut self, suffix: usize) -> BoxedFuture<'_, Result> { let options = GetOptions { range: Some(GetRange::Suffix(suffix as u64)), ..Default::default() }; self.spawn(|store, path| { - async move { + Box::pin(async move { let resp = store.get_opts(path, options).await?; Ok::<_, ParquetError>(resp.bytes().await?) - } - .boxed() + }) }) } } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, Result> { self.spawn(|store, path| store.get_range(path, range)) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxedFuture<'_, Result>> where Self: Send, { - self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) + self.spawn(|store, path| Box::pin(async move { store.get_ranges(path, &ranges).await })) } // This method doesn't directly call `self.spawn` because all of the IO that is done down the @@ -197,7 +197,7 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> BoxedFuture<'a, Result>> { Box::pin(async move { let mut metadata = ParquetMetaDataReader::new() .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index)) @@ -354,7 +354,9 @@ mod tests { let current_id = std::thread::current().id(); let other_id = reader - .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed()) + .spawn(|_, _| { + Box::pin(async move { Ok::<_, ParquetError>(std::thread::current().id()) }) + }) .await .unwrap(); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 66ba6b87fee7..ade52fcb8647 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -61,17 +61,15 @@ mod store; pub use store::*; use crate::{ - arrow::arrow_writer::ArrowWriterOptions, - arrow::ArrowWriter, + arrow::{arrow_writer::ArrowWriterOptions, ArrowWriter}, errors::{ParquetError, Result}, file::{metadata::RowGroupMetaData, properties::WriterProperties}, format::{FileMetaData, KeyValue}, + future::BoxedFuture, }; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use bytes::Bytes; -use futures::future::BoxFuture; -use futures::FutureExt; use std::mem; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -83,40 +81,38 @@ pub trait AsyncFileWriter: Send { /// This design allows the writer implementer to control the buffering and I/O scheduling. /// /// The underlying writer MAY implement retry logic to prevent breaking users write process. - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>; + fn write(&mut self, bs: Bytes) -> BoxedFuture<'_, Result<()>>; /// Flush any buffered data to the underlying writer and finish writing process. /// /// After `complete` returns `Ok(())`, caller SHOULD not call write again. - fn complete(&mut self) -> BoxFuture<'_, Result<()>>; + fn complete(&mut self) -> BoxedFuture<'_, Result<()>>; } impl AsyncFileWriter for Box { - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + fn write(&mut self, bs: Bytes) -> BoxedFuture<'_, Result<()>> { self.as_mut().write(bs) } - fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + fn complete(&mut self) -> BoxedFuture<'_, Result<()>> { self.as_mut().complete() } } impl AsyncFileWriter for T { - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { - async move { + fn write(&mut self, bs: Bytes) -> BoxedFuture<'_, Result<()>> { + Box::pin(async move { self.write_all(&bs).await?; Ok(()) - } - .boxed() + }) } - fn complete(&mut self) -> BoxFuture<'_, Result<()>> { - async move { + fn complete(&mut self) -> BoxedFuture<'_, Result<()>> { + Box::pin(async move { self.flush().await?; self.shutdown().await?; Ok(()) - } - .boxed() + }) } } @@ -364,6 +360,7 @@ mod tests { } #[tokio::test] + #[cfg(not(target_arch = "wasm32"))] async fn test_async_writer_bytes_written() { let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); @@ -386,6 +383,7 @@ mod tests { } #[tokio::test] + #[cfg(not(target_arch = "wasm32"))] async fn test_async_writer_file() { let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; let col2 = Arc::new(BinaryArray::from_iter_values(vec![ @@ -412,6 +410,7 @@ mod tests { } #[tokio::test] + #[cfg(not(target_arch = "wasm32"))] async fn in_progress_accounting() { // define schema let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); diff --git a/parquet/src/arrow/async_writer/store.rs b/parquet/src/arrow/async_writer/store.rs index ad09eae4996f..50c2938ea9f3 100644 --- a/parquet/src/arrow/async_writer/store.rs +++ b/parquet/src/arrow/async_writer/store.rs @@ -16,7 +16,6 @@ // under the License. use bytes::Bytes; -use futures::future::BoxFuture; use std::sync::Arc; use crate::arrow::async_writer::AsyncFileWriter; @@ -93,7 +92,7 @@ impl ParquetObjectWriter { } impl AsyncFileWriter for ParquetObjectWriter { - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + fn write(&mut self, bs: Bytes) -> BoxedFuture<'_, Result<()>> { Box::pin(async { self.w .put(bs) @@ -102,7 +101,7 @@ impl AsyncFileWriter for ParquetObjectWriter { }) } - fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + fn complete(&mut self) -> BoxedFuture<'_, Result<()>> { Box::pin(async { self.w .shutdown() diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 4b8c57175d4e..6ca990db7cb3 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -921,8 +921,6 @@ mod async_tests { use arrow_array::RecordBatch; use arrow_schema::{Field, Schema}; use bytes::Bytes; - use futures::future::BoxFuture; - use futures::FutureExt; use std::fs::File; use std::future::Future; use std::io::{Read, Seek, SeekFrom}; @@ -934,6 +932,7 @@ mod async_tests { use crate::arrow::ArrowWriter; use crate::file::properties::WriterProperties; use crate::file::reader::Length; + use crate::future::BoxedFuture; use crate::util::test_common::file_util::get_test_file; struct MetadataFetchFn(F); @@ -943,8 +942,8 @@ mod async_tests { F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { self.0(range).await }.boxed() + fn fetch(&mut self, range: Range) -> BoxedFuture<'_, Result> { + Box::pin(async move { self.0(range).await }) } } @@ -956,8 +955,8 @@ mod async_tests { Fut: Future> + Send, F2: Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { self.0(range).await }.boxed() + fn fetch(&mut self, range: Range) -> BoxedFuture<'_, Result> { + Box::pin(async move { self.0(range).await }) } } @@ -967,8 +966,8 @@ mod async_tests { F2: FnMut(usize) -> Fut + Send, Fut: Future> + Send, { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { - async move { self.1(suffix).await }.boxed() + fn fetch_suffix(&mut self, suffix: usize) -> BoxedFuture<'_, Result> { + Box::pin(async move { self.1(suffix).await }) } } diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index b1100c4bc440..8ac48d362388 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -172,7 +172,7 @@ pub use self::encodings::{decoding, encoding}; experimental!(#[macro_use] mod util); -pub use util::utf8; +pub use util::{future, utf8}; #[cfg(feature = "arrow")] pub mod arrow; diff --git a/parquet/src/util/future.rs b/parquet/src/util/future.rs new file mode 100644 index 000000000000..8812e43e6664 --- /dev/null +++ b/parquet/src/util/future.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`BoxedFuture`] type module + +/// BoxedFuture is the type alias of [`futures::future::BoxFuture`]. +#[cfg(not(target_arch = "wasm32"))] +pub type BoxedFuture<'a, T> = futures::future::BoxFuture<'a, T>; + +/// BoxedFuture is the type alias of [`futures::future::LocalBoxFuture`]. +#[cfg(target_arch = "wasm32")] +pub type BoxedFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>; diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 145cdd693e59..5e70632c92d5 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -25,6 +25,9 @@ pub mod push_buffers; pub(crate) mod test_common; pub mod utf8; +#[cfg(feature = "async")] +pub mod future; + #[cfg(any(test, feature = "test_common"))] pub use self::test_common::page_util::{ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, diff --git a/parquet/tests/arrow_reader/io/async_reader.rs b/parquet/tests/arrow_reader/io/async_reader.rs index f2d3ce07234b..384addb5dea9 100644 --- a/parquet/tests/arrow_reader/io/async_reader.rs +++ b/parquet/tests/arrow_reader/io/async_reader.rs @@ -22,13 +22,13 @@ use crate::io::{ OperationLog, TestParquetFile, }; use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::errors::Result; use parquet::file::metadata::ParquetMetaData; +use parquet::future::BoxedFuture; use std::ops::Range; use std::sync::Arc; @@ -370,7 +370,7 @@ struct RecordingAsyncFileReader { } impl AsyncFileReader for RecordingAsyncFileReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, parquet::errors::Result> { let ops = Arc::clone(&self.ops); let data = self .bytes @@ -382,14 +382,13 @@ impl AsyncFileReader for RecordingAsyncFileReader { start: range.start as usize, end: range.end as usize, }; - async move { + Box::pin(async move { ops.add_entry_for_range(&logged_range); Ok(data) - } - .boxed() + }) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxedFuture<'_, Result>> { let ops = Arc::clone(&self.ops); let datas = ranges .iter() @@ -408,23 +407,21 @@ impl AsyncFileReader for RecordingAsyncFileReader { }) .collect::>(); - async move { + Box::pin(async move { ops.add_entry_for_ranges(&logged_ranges); Ok(datas) - } - .boxed() + }) } fn get_metadata<'a>( &'a mut self, _options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> BoxedFuture<'a, Result>> { let ops = Arc::clone(&self.ops); let parquet_meta_data = Arc::clone(&self.parquet_meta_data); - async move { + Box::pin(async move { ops.add_entry(LogEntry::GetProvidedMetadata); Ok(parquet_meta_data) - } - .boxed() + }) } } diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index 15fd7c9e4f2d..295d095cd814 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -25,8 +25,7 @@ use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; use arrow_array::{RecordBatch, StringViewArray}; use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; @@ -34,6 +33,7 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; use parquet::file::properties::WriterProperties; +use parquet::future::BoxedFuture; use std::ops::Range; use std::sync::Arc; use std::sync::LazyLock; @@ -257,24 +257,27 @@ impl TestReader { } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, parquet::errors::Result> { let range = range.clone(); - futures::future::ready(Ok(self + Box::pin(futures::future::ready(Ok(self .data - .slice(range.start as usize..range.end as usize))) - .boxed() + .slice(range.start as usize..range.end as usize)))) } fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, parquet::errors::Result>> { + ) -> BoxedFuture<'a, parquet::errors::Result>> { let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy( PageIndexPolicy::from(options.is_some_and(|o| o.page_index())), ); self.metadata = Some(Arc::new( metadata_reader.parse_and_finish(&self.data).unwrap(), )); - futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed() + Box::pin(futures::future::ready(Ok(self + .metadata + .clone() + .unwrap() + .clone()))) } }