@@ -100,16 +100,21 @@ impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
100
100
mod test {
101
101
use std:: sync:: Arc ;
102
102
103
+ use arrow_array:: { Int32Array , StringArray } ;
104
+ use arrow_schema:: { DataType , Field } ;
105
+ use parquet:: arrow:: arrow_reader:: { ArrowReaderMetadata , ArrowReaderOptions } ;
103
106
use parquet:: file:: properties:: WriterProperties ;
104
107
use tempfile:: TempDir ;
105
108
106
109
use crate :: io:: FileIOBuilder ;
107
- use crate :: spec:: { DataContentType , DataFileFormat , Schema , Struct } ;
110
+ use crate :: spec:: {
111
+ DataContentType , DataFileFormat , Literal , NestedField , PrimitiveType , Schema , Struct , Type ,
112
+ } ;
108
113
use crate :: writer:: base_writer:: data_file_writer:: DataFileWriterBuilder ;
109
114
use crate :: writer:: file_writer:: location_generator:: test:: MockLocationGenerator ;
110
115
use crate :: writer:: file_writer:: location_generator:: DefaultFileNameGenerator ;
111
116
use crate :: writer:: file_writer:: ParquetWriterBuilder ;
112
- use crate :: writer:: { IcebergWriter , IcebergWriterBuilder } ;
117
+ use crate :: writer:: { IcebergWriter , IcebergWriterBuilder , RecordBatch } ;
113
118
use crate :: Result ;
114
119
115
120
#[ tokio:: test]
@@ -121,20 +126,124 @@ mod test {
121
126
let file_name_gen =
122
127
DefaultFileNameGenerator :: new ( "test" . to_string ( ) , None , DataFileFormat :: Parquet ) ;
123
128
129
+ let schema = Schema :: builder ( )
130
+ . with_schema_id ( 3 )
131
+ . with_fields ( vec ! [
132
+ NestedField :: required( 3 , "foo" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
133
+ NestedField :: required( 4 , "bar" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
134
+ ] )
135
+ . build ( ) ?;
136
+
124
137
let pw = ParquetWriterBuilder :: new (
125
138
WriterProperties :: builder ( ) . build ( ) ,
126
- Arc :: new ( Schema :: builder ( ) . build ( ) . unwrap ( ) ) ,
139
+ Arc :: new ( schema) ,
140
+ file_io. clone ( ) ,
141
+ location_gen,
142
+ file_name_gen,
143
+ ) ;
144
+
145
+ let mut data_file_writer = DataFileWriterBuilder :: new ( pw, None ) . build ( ) . await . unwrap ( ) ;
146
+
147
+ let data_files = data_file_writer. close ( ) . await . unwrap ( ) ;
148
+ assert_eq ! ( data_files. len( ) , 1 ) ;
149
+
150
+ let data_file = & data_files[ 0 ] ;
151
+ assert_eq ! ( data_file. file_format, DataFileFormat :: Parquet ) ;
152
+ assert_eq ! ( data_file. content, DataContentType :: Data ) ;
153
+ assert_eq ! ( data_file. partition, Struct :: empty( ) ) ;
154
+
155
+ let input_file = file_io. new_input ( data_file. file_path . clone ( ) ) ?;
156
+ let input_content = input_file. read ( ) . await ?;
157
+
158
+ let parquet_reader =
159
+ ArrowReaderMetadata :: load ( & input_content, ArrowReaderOptions :: default ( ) )
160
+ . expect ( "Failed to load Parquet metadata" ) ;
161
+
162
+ let field_ids: Vec < i32 > = parquet_reader
163
+ . parquet_schema ( )
164
+ . columns ( )
165
+ . iter ( )
166
+ . map ( |col| col. self_type ( ) . get_basic_info ( ) . id ( ) )
167
+ . collect ( ) ;
168
+
169
+ assert_eq ! ( field_ids, vec![ 3 , 4 ] ) ;
170
+ Ok ( ( ) )
171
+ }
172
+
173
+ #[ tokio:: test]
174
+ async fn test_parquet_writer_with_partition ( ) -> Result < ( ) > {
175
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
176
+ let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
177
+ let location_gen =
178
+ MockLocationGenerator :: new ( temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ) ;
179
+ let file_name_gen = DefaultFileNameGenerator :: new (
180
+ "test_partitioned" . to_string ( ) ,
181
+ None ,
182
+ DataFileFormat :: Parquet ,
183
+ ) ;
184
+
185
+ let schema = Schema :: builder ( )
186
+ . with_schema_id ( 5 )
187
+ . with_fields ( vec ! [
188
+ NestedField :: required( 5 , "id" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
189
+ NestedField :: required( 6 , "name" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
190
+ ] )
191
+ . build ( ) ?;
192
+
193
+ let partition_value = Struct :: from_iter ( [ Some ( Literal :: int ( 1 ) ) ] ) ;
194
+
195
+ let parquet_writer_builder = ParquetWriterBuilder :: new (
196
+ WriterProperties :: builder ( ) . build ( ) ,
197
+ Arc :: new ( schema. clone ( ) ) ,
127
198
file_io. clone ( ) ,
128
199
location_gen,
129
200
file_name_gen,
130
201
) ;
131
- let mut data_file_writer = DataFileWriterBuilder :: new ( pw, None ) . build ( ) . await ?;
132
202
133
- let data_file = data_file_writer. close ( ) . await . unwrap ( ) ;
134
- assert_eq ! ( data_file. len( ) , 1 ) ;
135
- assert_eq ! ( data_file[ 0 ] . file_format, DataFileFormat :: Parquet ) ;
136
- assert_eq ! ( data_file[ 0 ] . content, DataContentType :: Data ) ;
137
- assert_eq ! ( data_file[ 0 ] . partition, Struct :: empty( ) ) ;
203
+ let mut data_file_writer =
204
+ DataFileWriterBuilder :: new ( parquet_writer_builder, Some ( partition_value. clone ( ) ) )
205
+ . build ( )
206
+ . await ?;
207
+
208
+ let arrow_schema = arrow_schema:: Schema :: new ( vec ! [
209
+ Field :: new( "id" , DataType :: Int32 , false ) ,
210
+ Field :: new( "name" , DataType :: Utf8 , false ) ,
211
+ ] ) ;
212
+ let batch = RecordBatch :: try_new ( Arc :: new ( arrow_schema. clone ( ) ) , vec ! [
213
+ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
214
+ Arc :: new( StringArray :: from( vec![ "Alice" , "Bob" , "Charlie" ] ) ) ,
215
+ ] ) ?;
216
+ data_file_writer. write ( batch) . await ?;
217
+
218
+ let data_files = data_file_writer. close ( ) . await . unwrap ( ) ;
219
+ assert_eq ! ( data_files. len( ) , 1 ) ;
220
+
221
+ let data_file = & data_files[ 0 ] ;
222
+ assert_eq ! ( data_file. file_format, DataFileFormat :: Parquet ) ;
223
+ assert_eq ! ( data_file. content, DataContentType :: Data ) ;
224
+ assert_eq ! ( data_file. partition, partition_value) ;
225
+
226
+ let input_file = file_io. new_input ( data_file. file_path . clone ( ) ) ?;
227
+ let input_content = input_file. read ( ) . await ?;
228
+
229
+ let parquet_reader =
230
+ ArrowReaderMetadata :: load ( & input_content, ArrowReaderOptions :: default ( ) ) ?;
231
+
232
+ let field_ids: Vec < i32 > = parquet_reader
233
+ . parquet_schema ( )
234
+ . columns ( )
235
+ . iter ( )
236
+ . map ( |col| col. self_type ( ) . get_basic_info ( ) . id ( ) )
237
+ . collect ( ) ;
238
+ assert_eq ! ( field_ids, vec![ 5 , 6 ] ) ;
239
+
240
+ let field_names: Vec < & str > = parquet_reader
241
+ . parquet_schema ( )
242
+ . columns ( )
243
+ . iter ( )
244
+ . map ( |col| col. name ( ) )
245
+ . collect ( ) ;
246
+ assert_eq ! ( field_names, vec![ "id" , "name" ] ) ;
138
247
139
248
Ok ( ( ) )
140
249
}
0 commit comments