@@ -168,6 +168,7 @@ impl<'a> SnapshotProduceAction<'a> {
168
168
fn new_manifest_writer (
169
169
& mut self ,
170
170
content_type : & ManifestContentType ,
171
+ partition_spec_id : i32 ,
171
172
) -> Result < ManifestWriter > {
172
173
let new_manifest_path = format ! (
173
174
"{}/{}/{}-m{}.{}" ,
@@ -182,17 +183,26 @@ impl<'a> SnapshotProduceAction<'a> {
182
183
. current_table
183
184
. file_io ( )
184
185
. new_output ( new_manifest_path) ?;
186
+ let partition_spec = self
187
+ . tx
188
+ . current_table
189
+ . metadata ( )
190
+ . partition_spec_by_id ( partition_spec_id)
191
+ . ok_or_else ( || {
192
+ Error :: new (
193
+ ErrorKind :: DataInvalid ,
194
+ "Invalid partition spec id for new manifest writer" ,
195
+ )
196
+ . with_context ( "partition spec id" , partition_spec_id. to_string ( ) )
197
+ } ) ?
198
+ . as_ref ( )
199
+ . clone ( ) ;
185
200
let builder = ManifestWriterBuilder :: new (
186
201
output,
187
202
Some ( self . snapshot_id ) ,
188
203
self . key_metadata . clone ( ) ,
189
204
self . tx . current_table . metadata ( ) . current_schema ( ) . clone ( ) ,
190
- self . tx
191
- . current_table
192
- . metadata ( )
193
- . default_partition_spec ( )
194
- . as_ref ( )
195
- . clone ( ) ,
205
+ partition_spec,
196
206
) ;
197
207
if self . tx . current_table . metadata ( ) . format_version ( ) == FormatVersion :: V1 {
198
208
Ok ( builder. build_v1 ( ) )
@@ -244,29 +254,95 @@ impl<'a> SnapshotProduceAction<'a> {
244
254
builder. build ( )
245
255
}
246
256
} ) ;
247
- let mut writer = self . new_manifest_writer ( & content_type) ?;
257
+ let mut writer = self . new_manifest_writer (
258
+ & content_type,
259
+ self . tx . current_table . metadata ( ) . default_partition_spec_id ( ) ,
260
+ ) ?;
248
261
for entry in manifest_entries {
249
262
writer. add_entry ( entry) ?;
250
263
}
251
264
writer. write_manifest_file ( ) . await
252
265
}
253
266
267
+ async fn write_delete_manifest (
268
+ & mut self ,
269
+ deleted_entries : Vec < ManifestEntry > ,
270
+ ) -> Result < Vec < ManifestFile > > {
271
+ if deleted_entries. is_empty ( ) {
272
+ return Ok ( vec ! [ ] ) ;
273
+ }
274
+
275
+ // Group deleted entries by spec_id
276
+ let mut partition_groups = HashMap :: new ( ) ;
277
+ for entry in deleted_entries {
278
+ partition_groups
279
+ . entry ( entry. data_file ( ) . partition_spec_id )
280
+ . or_insert_with ( Vec :: new)
281
+ . push ( entry) ;
282
+ }
283
+
284
+ // Write a delete manifest per spec_id group
285
+ let mut deleted_manifests = Vec :: new ( ) ;
286
+ for ( spec_id, entries) in partition_groups {
287
+ let mut data_file_writer: Option < ManifestWriter > = None ;
288
+ let mut delete_file_writer: Option < ManifestWriter > = None ;
289
+ for entry in entries {
290
+ match entry. content_type ( ) {
291
+ DataContentType :: Data => {
292
+ if data_file_writer. is_none ( ) {
293
+ data_file_writer = Some (
294
+ self . new_manifest_writer ( & ManifestContentType :: Data , spec_id) ?,
295
+ ) ;
296
+ }
297
+ data_file_writer. as_mut ( ) . unwrap ( ) . add_delete_entry ( entry) ?;
298
+ }
299
+ DataContentType :: EqualityDeletes | DataContentType :: PositionDeletes => {
300
+ if delete_file_writer. is_none ( ) {
301
+ delete_file_writer = Some (
302
+ self . new_manifest_writer ( & ManifestContentType :: Deletes , spec_id) ?,
303
+ ) ;
304
+ }
305
+ delete_file_writer
306
+ . as_mut ( )
307
+ . unwrap ( )
308
+ . add_delete_entry ( entry) ?;
309
+ }
310
+ }
311
+ }
312
+ if let Some ( writer) = data_file_writer {
313
+ deleted_manifests. push ( writer. write_manifest_file ( ) . await ?) ;
314
+ }
315
+ if let Some ( writer) = delete_file_writer {
316
+ deleted_manifests. push ( writer. write_manifest_file ( ) . await ?) ;
317
+ }
318
+ }
319
+
320
+ Ok ( deleted_manifests)
321
+ }
322
+
254
323
async fn manifest_file < OP : SnapshotProduceOperation , MP : ManifestProcess > (
255
324
& mut self ,
256
325
snapshot_produce_operation : & OP ,
257
326
manifest_process : & MP ,
258
327
) -> Result < Vec < ManifestFile > > {
259
328
let mut manifest_files = vec ! [ ] ;
260
329
let data_files = std:: mem:: take ( & mut self . added_data_files ) ;
261
- let delete_files = std:: mem:: take ( & mut self . added_delete_files ) ;
330
+ let added_delete_files = std:: mem:: take ( & mut self . added_delete_files ) ;
262
331
if !data_files. is_empty ( ) {
263
332
let added_manifest = self . write_added_manifest ( data_files) . await ?;
264
333
manifest_files. push ( added_manifest) ;
265
334
}
266
- if !delete_files. is_empty ( ) {
267
- let added_delete_manifest = self . write_added_manifest ( delete_files) . await ?;
335
+
336
+ if !added_delete_files. is_empty ( ) {
337
+ let added_delete_manifest = self . write_added_manifest ( added_delete_files) . await ?;
268
338
manifest_files. push ( added_delete_manifest) ;
269
339
}
340
+
341
+ let delete_manifests = self
342
+ . write_delete_manifest ( snapshot_produce_operation. delete_entries ( self ) . await ?)
343
+ . await ?;
344
+ manifest_files. extend ( delete_manifests) ;
345
+
270
346
let existing_manifests = snapshot_produce_operation. existing_manifest ( self ) . await ?;
271
347
272
348
manifest_files. extend ( existing_manifests) ;
@@ -505,7 +581,7 @@ impl MergeManifestManager {
505
581
Box < dyn Future < Output = Result < Vec < ManifestFile > > > + Send > ,
506
582
> )
507
583
} else {
508
- let writer = snapshot_produce. new_manifest_writer ( & self . content ) ?;
584
+ let writer = snapshot_produce. new_manifest_writer ( & self . content , snapshot_produce . tx . current_table . metadata ( ) . default_partition_spec_id ( ) ) ?;
509
585
let snapshot_id = snapshot_produce. snapshot_id ;
510
586
let file_io = snapshot_produce. tx . current_table . file_io ( ) . clone ( ) ;
511
587
Ok ( ( Box :: pin ( async move {
0 commit comments