From 80bf446fed99e94c493e3e364d266acca4980a43 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 3 Aug 2023 15:26:00 +0800 Subject: [PATCH 1/7] feat: Add some missing field in row group metadata: ordinal, total compressed size, file_offset --- parquet/src/file/metadata.rs | 15 ++++++++--- parquet/src/file/writer.rs | 50 ++++++++++++++++++++---------------- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index bb8346306cf9..b2a7ce0e33ed 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -277,6 +277,7 @@ pub struct RowGroupMetaData { sorting_columns: Option>, total_byte_size: i64, schema_descr: SchemaDescPtr, + ordinal: Option, } impl RowGroupMetaData { @@ -350,6 +351,7 @@ impl RowGroupMetaData { sorting_columns, total_byte_size, schema_descr, + ordinal: rg.ordinal, }) } @@ -360,9 +362,9 @@ impl RowGroupMetaData { total_byte_size: self.total_byte_size, num_rows: self.num_rows, sorting_columns: self.sorting_columns().cloned(), - file_offset: None, - total_compressed_size: None, - ordinal: None, + file_offset: self.columns().iter().next().map(|m| m.file_offset), + total_compressed_size: Some(self.compressed_size()), + ordinal: self.ordinal, } } @@ -384,6 +386,7 @@ impl RowGroupMetaDataBuilder { num_rows: 0, sorting_columns: None, total_byte_size: 0, + ordinal: None, }) } @@ -411,6 +414,12 @@ impl RowGroupMetaDataBuilder { self } + /// Sets ordinal for this row group. + pub fn set_ordinal(mut self, value: usize) -> Self { + self.0.ordinal = Some(value as i16); + self + } + /// Builds row group metadata. pub fn build(self) -> Result { if self.0.schema_descr.num_columns() != self.0.columns.len() { diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index bde350a1ea42..d5814f5d054d 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -110,12 +110,12 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the offset index for each column chunk pub type OnCloseRowGroup<'a> = Box< dyn FnOnce( - RowGroupMetaDataPtr, - Vec>, - Vec>, - Vec>, - ) -> Result<()> - + 'a, + RowGroupMetaDataPtr, + Vec>, + Vec>, + Vec>, + ) -> Result<()> + + 'a, >; // ---------------------------------------------------------------------- @@ -204,6 +204,7 @@ impl SerializedFileWriter { self.descr.clone(), self.props.clone(), &mut self.buf, + self.row_group_index, Some(Box::new(on_close)), ); Ok(row_group_writer) @@ -409,6 +410,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { bloom_filters: Vec>, column_indexes: Vec>, offset_indexes: Vec>, + row_group_index: usize, on_close: Option>, } @@ -418,16 +420,19 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { /// - `schema_descr` - the schema to write /// - `properties` - writer properties /// - `buf` - the buffer to write data to + /// - `row_group_index` - row group index in this parquet file. /// - `on_close` - an optional callback that will invoked on [`Self::close`] pub fn new( schema_descr: SchemaDescPtr, properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, + row_group_index: usize, on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); Self { buf, + row_group_index, on_close, total_rows_written: None, descr: schema_descr, @@ -492,13 +497,13 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { &'b mut self, factory: F, ) -> Result> - where - F: FnOnce( - ColumnDescPtr, - WriterPropertiesPtr, - Box, - OnCloseColumnChunk<'b>, - ) -> Result, + where + F: FnOnce( + ColumnDescPtr, + WriterPropertiesPtr, + Box, + OnCloseColumnChunk<'b>, + ) -> Result, { self.assert_previous_writer_closed()?; Ok(match self.next_column_desc() { @@ -603,6 +608,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_total_byte_size(self.total_uncompressed_bytes) .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) .set_sorting_columns(self.props.sorting_columns().cloned()) + .set_ordinal(self.row_group_index) .build()?; let metadata = Arc::new(row_group_metadata); @@ -1226,7 +1232,7 @@ mod tests { None, Arc::new(props), ) - .unwrap(); + .unwrap(); while let Some(page) = page_reader.get_next_page().unwrap() { result_pages.push(page); @@ -1265,9 +1271,9 @@ mod tests { data: Vec>, compression: Compression, ) -> crate::format::FileMetaData - where - W: Write + Send, - R: ChunkReader + From + 'static, + where + W: Write + Send, + R: ChunkReader + From + 'static, { test_roundtrip::( file, @@ -1285,11 +1291,11 @@ mod tests { value: F, compression: Compression, ) -> crate::format::FileMetaData - where - W: Write + Send, - R: ChunkReader + From + 'static, - D: DataType, - F: Fn(Row) -> D::T, + where + W: Write + Send, + R: ChunkReader + From + 'static, + D: DataType, + F: Fn(Row) -> D::T, { let schema = Arc::new( types::Type::group_type_builder("schema") From 6298b39e17ee6c0f0b7b7678c20e73e8a533d5ee Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 3 Aug 2023 15:37:42 +0800 Subject: [PATCH 2/7] make formatter happy --- parquet/src/file/writer.rs | 44 +++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index d5814f5d054d..e1245acc66e3 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -110,12 +110,12 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the offset index for each column chunk pub type OnCloseRowGroup<'a> = Box< dyn FnOnce( - RowGroupMetaDataPtr, - Vec>, - Vec>, - Vec>, - ) -> Result<()> - + 'a, + RowGroupMetaDataPtr, + Vec>, + Vec>, + Vec>, + ) -> Result<()> + + 'a, >; // ---------------------------------------------------------------------- @@ -497,13 +497,13 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { &'b mut self, factory: F, ) -> Result> - where - F: FnOnce( - ColumnDescPtr, - WriterPropertiesPtr, - Box, - OnCloseColumnChunk<'b>, - ) -> Result, + where + F: FnOnce( + ColumnDescPtr, + WriterPropertiesPtr, + Box, + OnCloseColumnChunk<'b>, + ) -> Result, { self.assert_previous_writer_closed()?; Ok(match self.next_column_desc() { @@ -1232,7 +1232,7 @@ mod tests { None, Arc::new(props), ) - .unwrap(); + .unwrap(); while let Some(page) = page_reader.get_next_page().unwrap() { result_pages.push(page); @@ -1271,9 +1271,9 @@ mod tests { data: Vec>, compression: Compression, ) -> crate::format::FileMetaData - where - W: Write + Send, - R: ChunkReader + From + 'static, + where + W: Write + Send, + R: ChunkReader + From + 'static, { test_roundtrip::( file, @@ -1291,11 +1291,11 @@ mod tests { value: F, compression: Compression, ) -> crate::format::FileMetaData - where - W: Write + Send, - R: ChunkReader + From + 'static, - D: DataType, - F: Fn(Row) -> D::T, + where + W: Write + Send, + R: ChunkReader + From + 'static, + D: DataType, + F: Fn(Row) -> D::T, { let schema = Arc::new( types::Type::group_type_builder("schema") From bb84bff40eef17d44eea351d802d2d624a2b7f9e Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 3 Aug 2023 15:57:33 +0800 Subject: [PATCH 3/7] Add some test --- parquet/src/file/metadata.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index b2a7ce0e33ed..c018ebe7e995 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -975,6 +975,7 @@ mod tests { .set_num_rows(1000) .set_total_byte_size(2000) .set_column_metadata(columns) + .set_ordinal(1) .build() .unwrap(); From 443e95c8fba31e30b9e7217ce1a66f2c8fbcd9cb Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 3 Aug 2023 16:36:28 +0800 Subject: [PATCH 4/7] fix bug --- parquet/src/file/metadata.rs | 31 ++++++++++++++++++++++++++----- parquet/src/file/writer.rs | 12 +++++++++++- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index c018ebe7e995..a8ffc05a2b97 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -277,7 +277,9 @@ pub struct RowGroupMetaData { sorting_columns: Option>, total_byte_size: i64, schema_descr: SchemaDescPtr, - ordinal: Option, + // We can't infer from file offset of first column since there may empty columns in row group. + file_offset: Option, + ordinal: Option, } impl RowGroupMetaData { @@ -331,6 +333,18 @@ impl RowGroupMetaData { self.schema_descr.clone() } + /// Returns ordinal of this row group in file + #[inline(always)] + pub fn ordinal(&self) -> Option { + self.ordinal + } + + /// Returns file offset of this row group in file. + #[inline(always)] + pub fn file_offset(&self) -> Option { + self.file_offset + } + /// Method to convert from Thrift. pub fn from_thrift( schema_descr: SchemaDescPtr, @@ -351,7 +365,8 @@ impl RowGroupMetaData { sorting_columns, total_byte_size, schema_descr, - ordinal: rg.ordinal, + file_offset: rg.file_offset.map(|v| v as usize), + ordinal: rg.ordinal.map(|o| o as usize), }) } @@ -362,9 +377,9 @@ impl RowGroupMetaData { total_byte_size: self.total_byte_size, num_rows: self.num_rows, sorting_columns: self.sorting_columns().cloned(), - file_offset: self.columns().iter().next().map(|m| m.file_offset), + file_offset: self.file_offset().map(|v| v as i64), total_compressed_size: Some(self.compressed_size()), - ordinal: self.ordinal, + ordinal: self.ordinal.map(|v| v as i16), } } @@ -383,6 +398,7 @@ impl RowGroupMetaDataBuilder { Self(RowGroupMetaData { columns: Vec::with_capacity(schema_descr.num_columns()), schema_descr, + file_offset: None, num_rows: 0, sorting_columns: None, total_byte_size: 0, @@ -416,7 +432,12 @@ impl RowGroupMetaDataBuilder { /// Sets ordinal for this row group. pub fn set_ordinal(mut self, value: usize) -> Self { - self.0.ordinal = Some(value as i16); + self.0.ordinal = Some(value); + self + } + + pub fn set_file_offset(mut self, value: usize) -> Self { + self.0.file_offset = Some(value); self } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index e1245acc66e3..14de80bc8029 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -184,6 +184,7 @@ impl SerializedFileWriter { pub fn next_row_group(&mut self) -> Result> { self.assert_previous_writer_closed()?; self.row_group_index += 1; + let file_offset = self.buf.bytes_written(); let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; @@ -204,7 +205,8 @@ impl SerializedFileWriter { self.descr.clone(), self.props.clone(), &mut self.buf, - self.row_group_index, + self.row_group_index - 1, + file_offset, Some(Box::new(on_close)), ); Ok(row_group_writer) @@ -411,6 +413,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { column_indexes: Vec>, offset_indexes: Vec>, row_group_index: usize, + file_offset: usize, on_close: Option>, } @@ -421,18 +424,21 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { /// - `properties` - writer properties /// - `buf` - the buffer to write data to /// - `row_group_index` - row group index in this parquet file. + /// - `file_offset` - file offset of this row group in this parquet file. /// - `on_close` - an optional callback that will invoked on [`Self::close`] pub fn new( schema_descr: SchemaDescPtr, properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, row_group_index: usize, + file_offset: usize, on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); Self { buf, row_group_index, + file_offset, on_close, total_rows_written: None, descr: schema_descr, @@ -609,6 +615,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) .set_sorting_columns(self.props.sorting_columns().cloned()) .set_ordinal(self.row_group_index) + .set_file_offset(self.file_offset) .build()?; let metadata = Arc::new(row_group_metadata); @@ -1318,6 +1325,7 @@ mod tests { let mut rows: i64 = 0; for (idx, subset) in data.iter().enumerate() { + let row_group_file_offset = file_writer.buf.bytes_written(); let mut row_group_writer = file_writer.next_row_group().unwrap(); if let Some(mut writer) = row_group_writer.next_column().unwrap() { rows += writer @@ -1329,6 +1337,8 @@ mod tests { let last_group = row_group_writer.close().unwrap(); let flushed = file_writer.flushed_row_groups(); assert_eq!(flushed.len(), idx + 1); + assert_eq!(Some(idx), last_group.ordinal()); + assert_eq!(Some(row_group_file_offset), last_group.file_offset()); assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); } let file_metadata = file_writer.close().unwrap(); From eae6621d91949c31b7addf8416bbb46a0da11e34 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 7 Aug 2023 10:55:58 +0800 Subject: [PATCH 5/7] fix comments --- parquet/src/file/metadata.rs | 20 ++++++++++---------- parquet/src/file/writer.rs | 14 ++++++++------ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index a8ffc05a2b97..8d0c9149e813 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -278,8 +278,8 @@ pub struct RowGroupMetaData { total_byte_size: i64, schema_descr: SchemaDescPtr, // We can't infer from file offset of first column since there may empty columns in row group. - file_offset: Option, - ordinal: Option, + file_offset: Option, + ordinal: Option, } impl RowGroupMetaData { @@ -335,13 +335,13 @@ impl RowGroupMetaData { /// Returns ordinal of this row group in file #[inline(always)] - pub fn ordinal(&self) -> Option { + pub fn ordinal(&self) -> Option { self.ordinal } /// Returns file offset of this row group in file. #[inline(always)] - pub fn file_offset(&self) -> Option { + pub fn file_offset(&self) -> Option { self.file_offset } @@ -365,8 +365,8 @@ impl RowGroupMetaData { sorting_columns, total_byte_size, schema_descr, - file_offset: rg.file_offset.map(|v| v as usize), - ordinal: rg.ordinal.map(|o| o as usize), + file_offset: rg.file_offset, + ordinal: rg.ordinal, }) } @@ -377,9 +377,9 @@ impl RowGroupMetaData { total_byte_size: self.total_byte_size, num_rows: self.num_rows, sorting_columns: self.sorting_columns().cloned(), - file_offset: self.file_offset().map(|v| v as i64), + file_offset: self.file_offset(), total_compressed_size: Some(self.compressed_size()), - ordinal: self.ordinal.map(|v| v as i16), + ordinal: self.ordinal, } } @@ -431,12 +431,12 @@ impl RowGroupMetaDataBuilder { } /// Sets ordinal for this row group. - pub fn set_ordinal(mut self, value: usize) -> Self { + pub fn set_ordinal(mut self, value: i16) -> Self { self.0.ordinal = Some(value); self } - pub fn set_file_offset(mut self, value: usize) -> Self { + pub fn set_file_offset(mut self, value: i64) -> Self { self.0.file_offset = Some(value); self } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 14de80bc8029..3136a78a7fed 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -183,8 +183,10 @@ impl SerializedFileWriter { /// previous row group must be finalised and closed using `RowGroupWriter::close` method. pub fn next_row_group(&mut self) -> Result> { self.assert_previous_writer_closed()?; - self.row_group_index += 1; let file_offset = self.buf.bytes_written(); + let ordinal = self.row_group_index; + + self.row_group_index += 1; let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; @@ -205,7 +207,7 @@ impl SerializedFileWriter { self.descr.clone(), self.props.clone(), &mut self.buf, - self.row_group_index - 1, + ordinal, file_offset, Some(Box::new(on_close)), ); @@ -614,8 +616,8 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_total_byte_size(self.total_uncompressed_bytes) .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) .set_sorting_columns(self.props.sorting_columns().cloned()) - .set_ordinal(self.row_group_index) - .set_file_offset(self.file_offset) + .set_ordinal(self.row_group_index as i16) + .set_file_offset(self.file_offset as i64) .build()?; let metadata = Arc::new(row_group_metadata); @@ -1337,8 +1339,8 @@ mod tests { let last_group = row_group_writer.close().unwrap(); let flushed = file_writer.flushed_row_groups(); assert_eq!(flushed.len(), idx + 1); - assert_eq!(Some(idx), last_group.ordinal()); - assert_eq!(Some(row_group_file_offset), last_group.file_offset()); + assert_eq!(Some(idx as i16), last_group.ordinal()); + assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); } let file_metadata = file_writer.close().unwrap(); From c1393c6438471457c178bd05c91909860620cf49 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 7 Aug 2023 19:33:13 +0800 Subject: [PATCH 6/7] fix comment --- parquet/src/file/writer.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 3136a78a7fed..781a212c48b5 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -207,8 +207,7 @@ impl SerializedFileWriter { self.descr.clone(), self.props.clone(), &mut self.buf, - ordinal, - file_offset, + ordinal as i16, Some(Box::new(on_close)), ); Ok(row_group_writer) @@ -414,8 +413,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { bloom_filters: Vec>, column_indexes: Vec>, offset_indexes: Vec>, - row_group_index: usize, - file_offset: usize, + row_group_index: i16, + file_offset: i64, on_close: Option>, } @@ -432,11 +431,11 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { schema_descr: SchemaDescPtr, properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, - row_group_index: usize, - file_offset: usize, + row_group_index: i16, on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); + let file_offset = buf.bytes_written() as i64; Self { buf, row_group_index, From e9885c3bd893aefee451a811299fb34fece27fd0 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 8 Aug 2023 10:05:03 +0800 Subject: [PATCH 7/7] fix clippy --- parquet/src/file/writer.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 781a212c48b5..c374c7ce37e9 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -183,7 +183,6 @@ impl SerializedFileWriter { /// previous row group must be finalised and closed using `RowGroupWriter::close` method. pub fn next_row_group(&mut self) -> Result> { self.assert_previous_writer_closed()?; - let file_offset = self.buf.bytes_written(); let ordinal = self.row_group_index; self.row_group_index += 1; @@ -615,8 +614,8 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_total_byte_size(self.total_uncompressed_bytes) .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) .set_sorting_columns(self.props.sorting_columns().cloned()) - .set_ordinal(self.row_group_index as i16) - .set_file_offset(self.file_offset as i64) + .set_ordinal(self.row_group_index) + .set_file_offset(self.file_offset) .build()?; let metadata = Arc::new(row_group_metadata);