Skip to content

Commit 803cd39

Browse files
committed
add position delete writer support
1 parent f3a571d commit 803cd39

File tree

2 files changed

+285
-0
lines changed

2 files changed

+285
-0
lines changed

crates/iceberg/src/writer/base_writer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@
1919
2020
pub mod data_file_writer;
2121
pub mod equality_delete_writer;
22+
pub mod position_delete_file_writer;
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Position delete file writer.
19+
use std::future::Future;
20+
use std::pin::Pin;
21+
use std::sync::Arc;
22+
23+
use arrow_array::builder::{PrimitiveBuilder, StringBuilder};
24+
use arrow_array::types::Int64Type;
25+
use arrow_array::RecordBatch;
26+
use once_cell::sync::Lazy;
27+
28+
use crate::arrow::schema_to_arrow_schema;
29+
use crate::spec::{DataContentType, DataFile, NestedField, PrimitiveType, Schema, Struct, Type};
30+
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
31+
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
32+
use crate::{Error, ErrorKind, Result};
33+
34+
/// The config for `MemoryPositionDeleteWriter`.
35+
pub struct PositionDeleteWriterConfig {
36+
/// The partition value of the position delete file.
37+
pub partition_value: Struct,
38+
}
39+
40+
impl PositionDeleteWriterConfig {
41+
/// Create a new `MemoryPositionDeleteWriterConfig`.
42+
pub fn new(partition_value: Option<Struct>) -> Self {
43+
Self {
44+
partition_value: partition_value.unwrap_or(Struct::empty()),
45+
}
46+
}
47+
}
48+
49+
static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {
50+
Schema::builder()
51+
.with_fields(vec![
52+
Arc::new(NestedField::required(
53+
2147483546,
54+
"file_path",
55+
Type::Primitive(PrimitiveType::String),
56+
)),
57+
Arc::new(NestedField::required(
58+
2147483545,
59+
"pos",
60+
Type::Primitive(PrimitiveType::Long),
61+
)),
62+
])
63+
.build()
64+
.unwrap()
65+
});
66+
67+
/// Position delete input.
68+
#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)]
69+
pub struct PositionDeleteInput<'a> {
70+
/// The path of the file.
71+
path: &'a str,
72+
/// The offset of the position delete.
73+
offset: i64,
74+
}
75+
76+
impl<'a> PositionDeleteInput<'a> {
77+
/// Create a new `PositionDeleteInput`.
78+
pub fn new(path: &'a str, offset: i64) -> Self {
79+
PositionDeleteInput { path, offset }
80+
}
81+
}
82+
/// Builder for `MemoryPositionDeleteWriter`.
83+
#[derive(Clone)]
84+
pub struct PositionDeleteWriterBuilder<B: FileWriterBuilder> {
85+
inner: B,
86+
}
87+
88+
impl<B: FileWriterBuilder> PositionDeleteWriterBuilder<B> {
89+
/// Create a new `MemoryPositionDeleteWriterBuilder` using a `FileWriterBuilder`.
90+
pub fn new(inner: B) -> Self {
91+
Self { inner }
92+
}
93+
}
94+
95+
#[async_trait::async_trait]
96+
impl<'a, B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput<'a>>>
97+
for PositionDeleteWriterBuilder<B>
98+
{
99+
type R = PositionDeleteWriter<B>;
100+
type C = PositionDeleteWriterConfig;
101+
102+
async fn build(self, config: Self::C) -> Result<Self::R> {
103+
Ok(PositionDeleteWriter {
104+
inner_writer: Some(self.inner.build().await?),
105+
partition_value: config.partition_value,
106+
})
107+
}
108+
}
109+
110+
/// Position delete writer.
111+
pub struct PositionDeleteWriter<B: FileWriterBuilder> {
112+
inner_writer: Option<B::R>,
113+
partition_value: Struct,
114+
}
115+
116+
/// Implement `IcebergWriter` for `PositionDeleteWriter`.
117+
impl<'a, B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput<'a>>>
118+
for PositionDeleteWriter<B>
119+
{
120+
fn write<'life0, 'async_trait>(
121+
&'life0 mut self,
122+
input: Vec<PositionDeleteInput<'a>>,
123+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
124+
where
125+
'life0: 'async_trait,
126+
Self: 'async_trait,
127+
{
128+
// Construct record batch using input.
129+
let mut path_column_builder = StringBuilder::new();
130+
let mut offset_column_builder = PrimitiveBuilder::<Int64Type>::new();
131+
for input in input.into_iter() {
132+
path_column_builder.append_value(input.path);
133+
offset_column_builder.append_value(input.offset);
134+
}
135+
let record_batch = RecordBatch::try_new(
136+
Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap()),
137+
vec![
138+
Arc::new(path_column_builder.finish()),
139+
Arc::new(offset_column_builder.finish()),
140+
],
141+
)
142+
.map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()));
143+
144+
Box::pin(async move {
145+
if let Some(inner_writer) = &mut self.inner_writer {
146+
inner_writer.write(&record_batch?).await?;
147+
} else {
148+
return Err(Error::new(ErrorKind::Unexpected, "write has been closed"));
149+
}
150+
Ok(())
151+
})
152+
}
153+
154+
fn close<'life0, 'async_trait>(
155+
&'life0 mut self,
156+
) -> Pin<Box<dyn Future<Output = Result<Vec<DataFile>>> + Send + 'async_trait>>
157+
where
158+
'life0: 'async_trait,
159+
Self: 'async_trait,
160+
{
161+
Box::pin(async move {
162+
let writer = self.inner_writer.take().unwrap();
163+
Ok(writer
164+
.close()
165+
.await?
166+
.into_iter()
167+
.map(|mut res| {
168+
res.content(DataContentType::PositionDeletes);
169+
res.partition(self.partition_value.clone());
170+
res.build().expect("Guaranteed to be valid")
171+
})
172+
.collect())
173+
})
174+
}
175+
}
176+
177+
#[cfg(test)]
178+
mod test {
179+
use std::sync::Arc;
180+
181+
use arrow_array::{Int64Array, StringArray};
182+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
183+
use parquet::file::properties::WriterProperties;
184+
use tempfile::TempDir;
185+
186+
use crate::io::FileIOBuilder;
187+
use crate::spec::{DataContentType, DataFileFormat, Struct};
188+
use crate::writer::base_writer::position_delete_file_writer::{
189+
PositionDeleteInput, PositionDeleteWriterBuilder, PositionDeleteWriterConfig,
190+
POSITION_DELETE_SCHEMA,
191+
};
192+
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
193+
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
194+
use crate::writer::file_writer::ParquetWriterBuilder;
195+
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
196+
use crate::Result;
197+
198+
#[tokio::test]
199+
async fn test_position_delete_writer() -> Result<()> {
200+
let temp_dir = TempDir::new().unwrap();
201+
let file_io = FileIOBuilder::new("memory").build().unwrap();
202+
let location_gen =
203+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
204+
let file_name_gen =
205+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
206+
207+
let pw = ParquetWriterBuilder::new(
208+
WriterProperties::builder().build(),
209+
Arc::new(POSITION_DELETE_SCHEMA.clone()),
210+
file_io.clone(),
211+
location_gen,
212+
file_name_gen,
213+
);
214+
let mut position_delete_writer = PositionDeleteWriterBuilder::new(pw)
215+
.build(PositionDeleteWriterConfig::new(None))
216+
.await?;
217+
218+
// Write some position delete inputs
219+
let inputs: Vec<PositionDeleteInput> = vec![
220+
PositionDeleteInput {
221+
path: "file2.parquet",
222+
offset: 2,
223+
},
224+
PositionDeleteInput {
225+
path: "file2.parquet",
226+
offset: 1,
227+
},
228+
PositionDeleteInput {
229+
path: "file2.parquet",
230+
offset: 3,
231+
},
232+
PositionDeleteInput {
233+
path: "file3.parquet",
234+
offset: 2,
235+
},
236+
PositionDeleteInput {
237+
path: "file1.parquet",
238+
offset: 5,
239+
},
240+
PositionDeleteInput {
241+
path: "file1.parquet",
242+
offset: 4,
243+
},
244+
PositionDeleteInput {
245+
path: "file1.parquet",
246+
offset: 1,
247+
},
248+
];
249+
position_delete_writer.write(inputs.clone()).await?;
250+
251+
let data_files = position_delete_writer.close().await.unwrap();
252+
assert_eq!(data_files.len(), 1);
253+
assert_eq!(data_files[0].file_format, DataFileFormat::Parquet);
254+
assert_eq!(data_files[0].content, DataContentType::PositionDeletes);
255+
assert_eq!(data_files[0].partition, Struct::empty());
256+
257+
let parquet_file = file_io
258+
.new_input(&data_files[0].file_path)?
259+
.read()
260+
.await
261+
.unwrap();
262+
let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap();
263+
let reader = builder.build().unwrap();
264+
let batches = reader.map(|x| x.unwrap()).collect::<Vec<_>>();
265+
266+
let path_column = batches[0]
267+
.column(0)
268+
.as_any()
269+
.downcast_ref::<StringArray>()
270+
.unwrap();
271+
let offset_column = batches[0]
272+
.column(1)
273+
.as_any()
274+
.downcast_ref::<Int64Array>()
275+
.unwrap();
276+
277+
for (i, input) in inputs.iter().enumerate() {
278+
assert_eq!(path_column.value(i), input.path);
279+
assert_eq!(offset_column.value(i), input.offset);
280+
}
281+
282+
Ok(())
283+
}
284+
}

0 commit comments

Comments
 (0)