diff --git a/Cargo.lock b/Cargo.lock index cf4a23df9..69f07c52d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2676,6 +2676,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "hermit-abi" version = "0.4.0" @@ -2980,6 +2986,7 @@ dependencies = [ "serde_with", "tempfile", "tera", + "thrift", "tokio", "typed-builder 0.20.0", "url", @@ -3947,6 +3954,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "num_enum" version = "0.7.3" @@ -4408,7 +4425,7 @@ checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi", + "hermit-abi 0.4.0", "pin-project-lite", "rustix 0.38.44", "tracing", @@ -5972,6 +5989,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "thrift" version = "0.17.0" @@ -5980,7 +6006,9 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding 3.0.4", + "log", "ordered-float 2.10.1", + "threadpool", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index adfbda164..850946c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,6 +94,7 @@ serde_json = "1.0.138" serde_repr = "0.1.16" serde_with = "3.4" tempfile = "3.18" +thrift = "0.17.0" tokio = { version = "1.36", default-features = false } typed-builder = "0.20" url = "2.5.4" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 7320c455d..bb4c26ab6 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -79,6 +79,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } +thrift = { workspace = true } tokio = { workspace = true, optional = true, features = ["sync"] } typed-builder = { workspace = true } url = { workspace = true } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index bed9cc3dd..333314a02 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -28,10 +28,12 @@ use itertools::Itertools; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; use parquet::arrow::AsyncArrowWriter; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::{from_thrift, Statistics}; +use parquet::file::statistics::Statistics; use parquet::format::FileMetaData; +use parquet::thrift::{TCompactOutputProtocol, TSerializable}; +use thrift::protocol::TOutputProtocol; use super::location_generator::{FileNameGenerator, LocationGenerator}; use super::track_writer::TrackWriter; @@ -352,89 +354,27 @@ impl ParquetWriter { Ok(data_files) } - fn to_data_file_builder( - schema: SchemaRef, - metadata: FileMetaData, - written_size: usize, - file_path: String, - ) -> Result { - let index_by_parquet_path = { - let mut visitor = IndexByParquetPathName::new(); - visit_schema(&schema, &mut visitor)?; - visitor - }; - - let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = { - let mut per_col_size: HashMap = HashMap::new(); - let mut per_col_val_num: HashMap = HashMap::new(); - let mut per_col_null_val_num: HashMap = HashMap::new(); - let mut min_max_agg = MinMaxColAggregator::new(schema); - - for row_group in &metadata.row_groups { - for column_chunk in row_group.columns.iter() { - let Some(column_chunk_metadata) = &column_chunk.meta_data else { - continue; - }; - let physical_type = column_chunk_metadata.type_; - let Some(&field_id) = - index_by_parquet_path.get(&column_chunk_metadata.path_in_schema.join(".")) - else { - // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163 - // Ignore the field if it is not in schema. - continue; - }; - *per_col_size.entry(field_id).or_insert(0) += - column_chunk_metadata.total_compressed_size as u64; - *per_col_val_num.entry(field_id).or_insert(0) += - column_chunk_metadata.num_values as u64; - if let Some(null_count) = column_chunk_metadata - .statistics - .as_ref() - .and_then(|s| s.null_count) - { - *per_col_null_val_num.entry(field_id).or_insert(0_u64) += null_count as u64; - } - if let Some(statistics) = &column_chunk_metadata.statistics { - min_max_agg.update( - field_id, - from_thrift(physical_type.try_into()?, Some(statistics.clone()))? - .unwrap(), - )?; - } - } - } + fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result { + let mut buffer = Vec::new(); + { + let mut protocol = TCompactOutputProtocol::new(&mut buffer); + file_metadata + .write_to_out_protocol(&mut protocol) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to write parquet metadata") + .with_source(err) + })?; + + protocol.flush().map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err) + })?; + } - ( - per_col_size, - per_col_val_num, - per_col_null_val_num, - min_max_agg.produce(), - ) - }; + let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to decode parquet metadata").with_source(err) + })?; - let mut builder = DataFileBuilder::default(); - builder - .file_path(file_path) - .file_format(DataFileFormat::Parquet) - .record_count(metadata.num_rows as u64) - .file_size_in_bytes(written_size as u64) - .column_sizes(column_sizes) - .value_counts(value_counts) - .null_value_counts(null_value_counts) - .lower_bounds(lower_bounds) - .upper_bounds(upper_bounds) - // # TODO(#417) - // - nan_value_counts - // - distinct_counts - .key_metadata(metadata.footer_signing_key_metadata) - .split_offsets( - metadata - .row_groups - .iter() - .filter_map(|group| group.file_offset) - .collect(), - ); - Ok(builder) + Ok(parquet_metadata) } /// `ParquetMetadata` to data file builder @@ -551,19 +491,30 @@ impl FileWriter for ParquetWriter { Ok(()) } - async fn close(self) -> crate::Result> { - let Some(writer) = self.inner_writer else { - return Ok(vec![]); + async fn close(mut self) -> crate::Result> { + let writer = match self.inner_writer.take() { + Some(writer) => writer, + None => return Ok(vec![]), }; + let metadata = writer.close().await.map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err) })?; let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed); - Ok(vec![Self::to_data_file_builder( + let parquet_metadata = + Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Failed to convert metadata from thrift to parquet.", + ) + .with_source(err) + })?); + + Ok(vec![Self::parquet_to_data_file_builder( self.schema, - metadata, + parquet_metadata, written_size as usize, self.out_file.location().to_string(), )?])