Skip to content

Commit 75372e6

Browse files
jonathanc-nLi0k
authored andcommitted
refactor: Split transaction module (apache#1080)
- Closes apache#980 . Split transactions module
1 parent d0fd9be commit 75372e6

File tree

4 files changed

+1140
-0
lines changed

4 files changed

+1140
-0
lines changed
Lines changed: 373 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::{HashMap, HashSet};
19+
20+
use arrow_array::StringArray;
21+
use futures::TryStreamExt;
22+
use uuid::Uuid;
23+
24+
use crate::error::Result;
25+
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
26+
use crate::transaction::snapshot::{
27+
DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
28+
};
29+
use crate::transaction::Transaction;
30+
use crate::writer::file_writer::ParquetWriter;
31+
use crate::{Error, ErrorKind};
32+
33+
/// FastAppendAction is a transaction action for fast append data files to the table.
34+
pub struct FastAppendAction<'a> {
35+
snapshot_produce_action: SnapshotProduceAction<'a>,
36+
check_duplicate: bool,
37+
}
38+
39+
impl<'a> FastAppendAction<'a> {
40+
#[allow(clippy::too_many_arguments)]
41+
pub(crate) fn new(
42+
tx: Transaction<'a>,
43+
snapshot_id: i64,
44+
commit_uuid: Uuid,
45+
key_metadata: Vec<u8>,
46+
snapshot_properties: HashMap<String, String>,
47+
) -> Result<Self> {
48+
Ok(Self {
49+
snapshot_produce_action: SnapshotProduceAction::new(
50+
tx,
51+
snapshot_id,
52+
key_metadata,
53+
commit_uuid,
54+
snapshot_properties,
55+
)?,
56+
check_duplicate: true,
57+
})
58+
}
59+
60+
/// Set whether to check duplicate files
61+
pub fn with_check_duplicate(mut self, v: bool) -> Self {
62+
self.check_duplicate = v;
63+
self
64+
}
65+
66+
/// Add data files to the snapshot.
67+
pub fn add_data_files(
68+
&mut self,
69+
data_files: impl IntoIterator<Item = DataFile>,
70+
) -> Result<&mut Self> {
71+
self.snapshot_produce_action.add_data_files(data_files)?;
72+
Ok(self)
73+
}
74+
75+
/// Adds existing parquet files
76+
#[allow(dead_code)]
77+
async fn add_parquet_files(mut self, file_path: Vec<String>) -> Result<Transaction<'a>> {
78+
if !self
79+
.snapshot_produce_action
80+
.tx
81+
.table
82+
.metadata()
83+
.default_spec
84+
.is_unpartitioned()
85+
{
86+
return Err(Error::new(
87+
ErrorKind::FeatureUnsupported,
88+
"Appending to partitioned tables is not supported",
89+
));
90+
}
91+
92+
let table_metadata = self.snapshot_produce_action.tx.table.metadata();
93+
94+
let data_files = ParquetWriter::parquet_files_to_data_files(
95+
self.snapshot_produce_action.tx.table.file_io(),
96+
file_path,
97+
table_metadata,
98+
)
99+
.await?;
100+
101+
self.add_data_files(data_files)?;
102+
103+
self.apply().await
104+
}
105+
106+
/// Finished building the action and apply it to the transaction.
107+
pub async fn apply(self) -> Result<Transaction<'a>> {
108+
// Checks duplicate files
109+
if self.check_duplicate {
110+
let new_files: HashSet<&str> = self
111+
.snapshot_produce_action
112+
.added_data_files
113+
.iter()
114+
.map(|df| df.file_path.as_str())
115+
.collect();
116+
117+
let mut manifest_stream = self
118+
.snapshot_produce_action
119+
.tx
120+
.table
121+
.inspect()
122+
.manifests()
123+
.scan()
124+
.await?;
125+
let mut referenced_files = Vec::new();
126+
127+
while let Some(batch) = manifest_stream.try_next().await? {
128+
let file_path_array = batch
129+
.column(1)
130+
.as_any()
131+
.downcast_ref::<StringArray>()
132+
.ok_or_else(|| {
133+
Error::new(
134+
ErrorKind::DataInvalid,
135+
"Failed to downcast file_path column to StringArray",
136+
)
137+
})?;
138+
139+
for i in 0..batch.num_rows() {
140+
let file_path = file_path_array.value(i);
141+
if new_files.contains(file_path) {
142+
referenced_files.push(file_path.to_string());
143+
}
144+
}
145+
}
146+
147+
if !referenced_files.is_empty() {
148+
return Err(Error::new(
149+
ErrorKind::DataInvalid,
150+
format!(
151+
"Cannot add files that are already referenced by table, files: {}",
152+
referenced_files.join(", ")
153+
),
154+
));
155+
}
156+
}
157+
158+
self.snapshot_produce_action
159+
.apply(FastAppendOperation, DefaultManifestProcess)
160+
.await
161+
}
162+
}
163+
164+
struct FastAppendOperation;
165+
166+
impl SnapshotProduceOperation for FastAppendOperation {
167+
fn operation(&self) -> Operation {
168+
Operation::Append
169+
}
170+
171+
async fn delete_entries(
172+
&self,
173+
_snapshot_produce: &SnapshotProduceAction<'_>,
174+
) -> Result<Vec<ManifestEntry>> {
175+
Ok(vec![])
176+
}
177+
178+
async fn existing_manifest(
179+
&self,
180+
snapshot_produce: &SnapshotProduceAction<'_>,
181+
) -> Result<Vec<ManifestFile>> {
182+
let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
183+
return Ok(vec![]);
184+
};
185+
186+
let manifest_list = snapshot
187+
.load_manifest_list(
188+
snapshot_produce.tx.table.file_io(),
189+
&snapshot_produce.tx.table.metadata_ref(),
190+
)
191+
.await?;
192+
193+
Ok(manifest_list
194+
.entries()
195+
.iter()
196+
.filter(|entry| entry.has_added_files() || entry.has_existing_files())
197+
.cloned()
198+
.collect())
199+
}
200+
}
201+
202+
#[cfg(test)]
203+
mod tests {
204+
use crate::scan::tests::TableTestFixture;
205+
use crate::spec::{
206+
DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, MAIN_BRANCH,
207+
};
208+
use crate::transaction::tests::make_v2_minimal_table;
209+
use crate::transaction::Transaction;
210+
use crate::{TableRequirement, TableUpdate};
211+
212+
#[tokio::test]
213+
async fn test_fast_append_action() {
214+
let table = make_v2_minimal_table();
215+
let tx = Transaction::new(&table);
216+
let mut action = tx.fast_append(None, vec![]).unwrap();
217+
218+
// check add data file with incompatible partition value
219+
let data_file = DataFileBuilder::default()
220+
.content(DataContentType::Data)
221+
.file_path("test/3.parquet".to_string())
222+
.file_format(DataFileFormat::Parquet)
223+
.file_size_in_bytes(100)
224+
.record_count(1)
225+
.partition(Struct::from_iter([Some(Literal::string("test"))]))
226+
.build()
227+
.unwrap();
228+
assert!(action.add_data_files(vec![data_file.clone()]).is_err());
229+
230+
let data_file = DataFileBuilder::default()
231+
.content(DataContentType::Data)
232+
.file_path("test/3.parquet".to_string())
233+
.file_format(DataFileFormat::Parquet)
234+
.file_size_in_bytes(100)
235+
.record_count(1)
236+
.partition(Struct::from_iter([Some(Literal::long(300))]))
237+
.build()
238+
.unwrap();
239+
action.add_data_files(vec![data_file.clone()]).unwrap();
240+
let tx = action.apply().await.unwrap();
241+
242+
// check updates and requirements
243+
assert!(
244+
matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
245+
);
246+
assert_eq!(
247+
vec![
248+
TableRequirement::UuidMatch {
249+
uuid: tx.table.metadata().uuid()
250+
},
251+
TableRequirement::RefSnapshotIdMatch {
252+
r#ref: MAIN_BRANCH.to_string(),
253+
snapshot_id: tx.table.metadata().current_snapshot_id
254+
}
255+
],
256+
tx.requirements
257+
);
258+
259+
// check manifest list
260+
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] {
261+
snapshot
262+
} else {
263+
unreachable!()
264+
};
265+
let manifest_list = new_snapshot
266+
.load_manifest_list(table.file_io(), table.metadata())
267+
.await
268+
.unwrap();
269+
assert_eq!(1, manifest_list.entries().len());
270+
assert_eq!(
271+
manifest_list.entries()[0].sequence_number,
272+
new_snapshot.sequence_number()
273+
);
274+
275+
// check manifset
276+
let manifest = manifest_list.entries()[0]
277+
.load_manifest(table.file_io())
278+
.await
279+
.unwrap();
280+
assert_eq!(1, manifest.entries().len());
281+
assert_eq!(
282+
new_snapshot.sequence_number(),
283+
manifest.entries()[0]
284+
.sequence_number()
285+
.expect("Inherit sequence number by load manifest")
286+
);
287+
288+
assert_eq!(
289+
new_snapshot.snapshot_id(),
290+
manifest.entries()[0].snapshot_id().unwrap()
291+
);
292+
assert_eq!(data_file, *manifest.entries()[0].data_file());
293+
}
294+
295+
#[tokio::test]
296+
async fn test_add_existing_parquet_files_to_unpartitioned_table() {
297+
let mut fixture = TableTestFixture::new_unpartitioned();
298+
fixture.setup_unpartitioned_manifest_files().await;
299+
let tx = crate::transaction::Transaction::new(&fixture.table);
300+
301+
let file_paths = vec![
302+
format!("{}/1.parquet", &fixture.table_location),
303+
format!("{}/2.parquet", &fixture.table_location),
304+
format!("{}/3.parquet", &fixture.table_location),
305+
];
306+
307+
let fast_append_action = tx.fast_append(None, vec![]).unwrap();
308+
309+
// Attempt to add the existing Parquet files with fast append.
310+
let new_tx = fast_append_action
311+
.add_parquet_files(file_paths.clone())
312+
.await
313+
.expect("Adding existing Parquet files should succeed");
314+
315+
let mut found_add_snapshot = false;
316+
let mut found_set_snapshot_ref = false;
317+
for update in new_tx.updates.iter() {
318+
match update {
319+
TableUpdate::AddSnapshot { .. } => {
320+
found_add_snapshot = true;
321+
}
322+
TableUpdate::SetSnapshotRef {
323+
ref_name,
324+
reference,
325+
} => {
326+
found_set_snapshot_ref = true;
327+
assert_eq!(ref_name, MAIN_BRANCH);
328+
assert!(reference.snapshot_id > 0);
329+
}
330+
_ => {}
331+
}
332+
}
333+
assert!(found_add_snapshot);
334+
assert!(found_set_snapshot_ref);
335+
336+
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] {
337+
snapshot
338+
} else {
339+
panic!("Expected the first update to be an AddSnapshot update");
340+
};
341+
342+
let manifest_list = new_snapshot
343+
.load_manifest_list(fixture.table.file_io(), fixture.table.metadata())
344+
.await
345+
.expect("Failed to load manifest list");
346+
347+
assert_eq!(
348+
manifest_list.entries().len(),
349+
2,
350+
"Expected 2 manifest list entries, got {}",
351+
manifest_list.entries().len()
352+
);
353+
354+
// Load the manifest from the manifest list
355+
let manifest = manifest_list.entries()[0]
356+
.load_manifest(fixture.table.file_io())
357+
.await
358+
.expect("Failed to load manifest");
359+
360+
// Check that the manifest contains three entries.
361+
assert_eq!(manifest.entries().len(), 3);
362+
363+
// Verify each file path appears in manifest.
364+
let manifest_paths: Vec<String> = manifest
365+
.entries()
366+
.iter()
367+
.map(|entry| entry.data_file().file_path.clone())
368+
.collect();
369+
for path in file_paths {
370+
assert!(manifest_paths.contains(&path));
371+
}
372+
}
373+
}

0 commit comments

Comments
 (0)