@@ -499,8 +499,8 @@ impl ParquetWriter {
499
499
. lower_bounds ( lower_bounds)
500
500
. upper_bounds ( upper_bounds)
501
501
. nan_value_counts ( nan_value_counts)
502
- // # TODO(#417)
503
- // - distinct_counts
502
+ // # NOTE:
503
+ // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
504
504
. key_metadata ( metadata. footer_signing_key_metadata )
505
505
. split_offsets (
506
506
metadata
@@ -639,17 +639,6 @@ impl ParquetWriter {
639
639
} ;
640
640
}
641
641
}
642
- DataType :: List ( arrow_field) => {
643
- handle_list_type ! ( ListArray , col, self , field, arrow_field) ;
644
- }
645
- // NOTE: iceberg to arrow schema conversion does not form these types,
646
- // meaning these branches never get called right now.
647
- DataType :: LargeList ( _) => {
648
- // handle_list_type!(LargeListArray, col, self, field, arrow_field);
649
- }
650
- DataType :: FixedSizeList ( _, _) => {
651
- // handle_list_type!(FixedSizeList, col, self, field, arrow_field);
652
- }
653
642
DataType :: Map ( _, _) => {
654
643
let map_arr = col. as_any ( ) . downcast_ref :: < MapArray > ( ) . unwrap ( ) ;
655
644
@@ -664,6 +653,17 @@ impl ParquetWriter {
664
653
let values_col = map_arr. values ( ) ;
665
654
self . transverse_batch ( values_col, & map_ty. value_field ) ;
666
655
}
656
+ DataType :: List ( arrow_field) => {
657
+ handle_list_type ! ( ListArray , col, self , field, arrow_field) ;
658
+ }
659
+ // NOTE: iceberg to arrow schema conversion does not form these types,
660
+ // meaning these branches never get called right now.
661
+ DataType :: LargeList ( _) => {
662
+ // handle_list_type!(LargeListArray, col, self, field, arrow_field);
663
+ }
664
+ DataType :: FixedSizeList ( _, _) => {
665
+ // handle_list_type!(FixedSizeList, col, self, field, arrow_field);
666
+ }
667
667
_ => { }
668
668
} ;
669
669
}
@@ -972,12 +972,6 @@ mod tests {
972
972
// prepare data
973
973
let schema = {
974
974
let fields = vec ! [
975
- // TODO(feniljain):
976
- // Types:
977
- // [X] Primitive
978
- // [ ] Struct
979
- // [ ] List
980
- // [ ] Map
981
975
arrow_schema:: Field :: new( "col" , arrow_schema:: DataType :: Float32 , true )
982
976
. with_metadata( HashMap :: from( [ (
983
977
PARQUET_FIELD_ID_META_KEY . to_string( ) ,
@@ -1634,6 +1628,73 @@ mod tests {
1634
1628
Ok ( ( ) )
1635
1629
}
1636
1630
1631
+ #[ tokio:: test]
1632
+ async fn test_parquet_writer ( ) -> Result < ( ) > {
1633
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
1634
+ let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
1635
+ let location_gen =
1636
+ MockLocationGenerator :: new ( temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ) ;
1637
+ let file_name_gen =
1638
+ DefaultFileNameGenerator :: new ( "test" . to_string ( ) , None , DataFileFormat :: Parquet ) ;
1639
+
1640
+ // prepare data
1641
+ let schema = {
1642
+ let fields = vec ! [
1643
+ arrow_schema:: Field :: new( "col" , arrow_schema:: DataType :: Int64 , true ) . with_metadata(
1644
+ HashMap :: from( [ ( PARQUET_FIELD_ID_META_KEY . to_string( ) , "0" . to_string( ) ) ] ) ,
1645
+ ) ,
1646
+ ] ;
1647
+ Arc :: new ( arrow_schema:: Schema :: new ( fields) )
1648
+ } ;
1649
+ let col = Arc :: new ( Int64Array :: from_iter_values ( 0 ..1024 ) ) as ArrayRef ;
1650
+ let null_col = Arc :: new ( Int64Array :: new_null ( 1024 ) ) as ArrayRef ;
1651
+ let to_write = RecordBatch :: try_new ( schema. clone ( ) , vec ! [ col] ) . unwrap ( ) ;
1652
+ let to_write_null = RecordBatch :: try_new ( schema. clone ( ) , vec ! [ null_col] ) . unwrap ( ) ;
1653
+
1654
+ // write data
1655
+ let mut pw = ParquetWriterBuilder :: new (
1656
+ WriterProperties :: builder ( ) . build ( ) ,
1657
+ Arc :: new ( to_write. schema ( ) . as_ref ( ) . try_into ( ) . unwrap ( ) ) ,
1658
+ file_io. clone ( ) ,
1659
+ location_gen,
1660
+ file_name_gen,
1661
+ )
1662
+ . build ( )
1663
+ . await ?;
1664
+ pw. write ( & to_write) . await ?;
1665
+ pw. write ( & to_write_null) . await ?;
1666
+ let res = pw. close ( ) . await ?;
1667
+ assert_eq ! ( res. len( ) , 1 ) ;
1668
+ let data_file = res
1669
+ . into_iter ( )
1670
+ . next ( )
1671
+ . unwrap ( )
1672
+ // Put dummy field for build successfully.
1673
+ . content ( crate :: spec:: DataContentType :: Data )
1674
+ . partition ( Struct :: empty ( ) )
1675
+ . build ( )
1676
+ . unwrap ( ) ;
1677
+
1678
+ // check data file
1679
+ assert_eq ! ( data_file. record_count( ) , 2048 ) ;
1680
+ assert_eq ! ( * data_file. value_counts( ) , HashMap :: from( [ ( 0 , 2048 ) ] ) ) ;
1681
+ assert_eq ! (
1682
+ * data_file. lower_bounds( ) ,
1683
+ HashMap :: from( [ ( 0 , Datum :: long( 0 ) ) ] )
1684
+ ) ;
1685
+ assert_eq ! (
1686
+ * data_file. upper_bounds( ) ,
1687
+ HashMap :: from( [ ( 0 , Datum :: long( 1023 ) ) ] )
1688
+ ) ;
1689
+ assert_eq ! ( * data_file. null_value_counts( ) , HashMap :: from( [ ( 0 , 1024 ) ] ) ) ;
1690
+
1691
+ // check the written file
1692
+ let expect_batch = concat_batches ( & schema, vec ! [ & to_write, & to_write_null] ) . unwrap ( ) ;
1693
+ check_parquet_data_file ( & file_io, & data_file, & expect_batch) . await ;
1694
+
1695
+ Ok ( ( ) )
1696
+ }
1697
+
1637
1698
#[ tokio:: test]
1638
1699
async fn test_parquet_writer_with_complex_schema ( ) -> Result < ( ) > {
1639
1700
let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
0 commit comments