Skip to content

Commit d696c83

Browse files
committed
Code complete
1 parent 1cd8b96 commit d696c83

File tree

5 files changed

+86
-47
lines changed

5 files changed

+86
-47
lines changed

crates/iceberg/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ define_from_err!(
325325
"Failed to convert decimal literal to rust decimal"
326326
);
327327

328+
define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
329+
328330
/// Helper macro to check arguments.
329331
///
330332
///

crates/iceberg/src/spec/manifest.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,14 @@ use super::{
2424
};
2525
use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER};
2626
use crate::error::Result;
27-
use crate::io::{InputFile, OutputFile};
27+
use crate::io::OutputFile;
2828
use crate::spec::PartitionField;
2929
use crate::{Error, ErrorKind};
3030
use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter};
3131
use futures::AsyncWriteExt;
3232
use serde_json::to_vec;
3333
use std::cmp::min;
3434
use std::collections::HashMap;
35-
use std::io::Read;
3635
use std::str::FromStr;
3736
use std::sync::Arc;
3837

@@ -274,14 +273,16 @@ impl ManifestWriter {
274273
self.update_field_summary(&entry);
275274

276275
let value = match manifest.metadata.format_version {
277-
FormatVersion::V1 => {
278-
to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)?
279-
.resolve(&avro_schema)?
280-
}
281-
FormatVersion::V2 => {
282-
to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)?
283-
.resolve(&avro_schema)?
284-
}
276+
FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from(
277+
(*entry).clone(),
278+
&partition_type,
279+
)?)?
280+
.resolve(&avro_schema)?,
281+
FormatVersion::V2 => to_value(_serde::ManifestEntryV2::try_from(
282+
(*entry).clone(),
283+
&partition_type,
284+
)?)?
285+
.resolve(&avro_schema)?,
285286
};
286287

287288
avro_writer.append(value)?;
@@ -803,6 +804,7 @@ impl ManifestMetadata {
803804
}
804805
}
805806

807+
/// Reference to [`ManifestEntry`].
806808
pub type ManifestEntryRef = Arc<ManifestEntry>;
807809

808810
/// A manifest is an immutable Avro file that lists data files or delete
@@ -1455,7 +1457,7 @@ mod tests {
14551457
format_version: FormatVersion::V2,
14561458
},
14571459
entries: vec![
1458-
ManifestEntry {
1460+
Arc::new(ManifestEntry {
14591461
status: ManifestStatus::Added,
14601462
snapshot_id: None,
14611463
sequence_number: None,
@@ -1478,7 +1480,7 @@ mod tests {
14781480
equality_ids: Vec::new(),
14791481
sort_order_id: None,
14801482
}
1481-
}
1483+
})
14821484
]
14831485
};
14841486

@@ -1575,7 +1577,7 @@ mod tests {
15751577
content: ManifestContentType::Data,
15761578
format_version: FormatVersion::V2,
15771579
},
1578-
entries: vec![ManifestEntry {
1580+
entries: vec![Arc::new(ManifestEntry {
15791581
status: ManifestStatus::Added,
15801582
snapshot_id: None,
15811583
sequence_number: None,
@@ -1640,7 +1642,7 @@ mod tests {
16401642
equality_ids: vec![],
16411643
sort_order_id: None,
16421644
},
1643-
}],
1645+
})],
16441646
};
16451647

16461648
let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
@@ -1684,7 +1686,7 @@ mod tests {
16841686
content: ManifestContentType::Data,
16851687
format_version: FormatVersion::V1,
16861688
},
1687-
entries: vec![ManifestEntry {
1689+
entries: vec![Arc::new(ManifestEntry {
16881690
status: ManifestStatus::Added,
16891691
snapshot_id: Some(0),
16901692
sequence_number: None,
@@ -1707,7 +1709,7 @@ mod tests {
17071709
equality_ids: vec![],
17081710
sort_order_id: Some(0),
17091711
}
1710-
}],
1712+
})],
17111713
};
17121714

17131715
let writer =
@@ -1754,7 +1756,7 @@ mod tests {
17541756
format_version: FormatVersion::V1,
17551757
},
17561758
entries: vec![
1757-
ManifestEntry {
1759+
Arc::new(ManifestEntry {
17581760
status: ManifestStatus::Added,
17591761
snapshot_id: Some(0),
17601762
sequence_number: None,
@@ -1795,7 +1797,7 @@ mod tests {
17951797
equality_ids: vec![],
17961798
sort_order_id: Some(0),
17971799
},
1798-
}
1800+
})
17991801
]
18001802
};
18011803

crates/iceberg/src/spec/manifest_list.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use self::{
2929
_serde::{ManifestListEntryV1, ManifestListEntryV2},
3030
};
3131

32-
use super::{FormatVersion, Manifest, ManifestEntry, StructType};
32+
use super::{FormatVersion, Manifest, StructType};
3333
use crate::error::Result;
3434

3535
/// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write.
@@ -60,16 +60,16 @@ impl ManifestList {
6060
bs: &[u8],
6161
version: FormatVersion,
6262
partition_type: &StructType,
63-
) -> Result<ManifestList, Error> {
63+
) -> Result<ManifestList> {
6464
match version {
6565
FormatVersion::V1 => {
6666
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
67-
let values = Value::Array(reader.collect::<Result<Vec<Value>, _>>()?);
67+
let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
6868
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type)
6969
}
7070
FormatVersion::V2 => {
7171
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?;
72-
let values = Value::Array(reader.collect::<Result<Vec<Value>, _>>()?);
72+
let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
7373
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type)
7474
}
7575
}
@@ -169,7 +169,7 @@ impl ManifestListWriter {
169169
pub fn add_manifest_entries(
170170
&mut self,
171171
manifest_entries: impl Iterator<Item = ManifestListEntry>,
172-
) -> Result<(), Error> {
172+
) -> Result<()> {
173173
match self.format_version {
174174
FormatVersion::V1 => {
175175
for manifest_entry in manifest_entries {
@@ -212,7 +212,7 @@ impl ManifestListWriter {
212212
}
213213

214214
/// Write the manifest list to the output file.
215-
pub async fn close(self) -> Result<(), Error> {
215+
pub async fn close(self) -> Result<()> {
216216
let data = self.avro_writer.into_inner()?;
217217
let mut writer = self.output_file.writer().await?;
218218
writer.write_all(&data).await.unwrap();
@@ -591,7 +591,7 @@ pub enum ManifestContentType {
591591
impl FromStr for ManifestContentType {
592592
type Err = Error;
593593

594-
fn from_str(s: &str) -> Result<Self, Error> {
594+
fn from_str(s: &str) -> Result<Self> {
595595
match s {
596596
"data" => Ok(ManifestContentType::Data),
597597
"deletes" => Ok(ManifestContentType::Deletes),
@@ -615,7 +615,7 @@ impl ToString for ManifestContentType {
615615
impl TryFrom<i32> for ManifestContentType {
616616
type Error = Error;
617617

618-
fn try_from(value: i32) -> Result<Self, Self::Error> {
618+
fn try_from(value: i32) -> std::result::Result<Self, Self::Error> {
619619
match value {
620620
0 => Ok(ManifestContentType::Data),
621621
1 => Ok(ManifestContentType::Deletes),
@@ -637,16 +637,17 @@ impl ManifestListEntry {
637637
pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
638638
let mut avro = Vec::new();
639639
file_io
640-
.new_input(&self.manifest_path)
640+
.new_input(&self.manifest_path)?
641641
.reader()
642+
.await?
642643
.read_to_end(&mut avro)
643644
.await?;
644645

645-
let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro).await?;
646+
let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?;
646647

647648
// Let entries inherit values from the manifest list entry.
648649
for entry in &mut entries {
649-
entry.inherit_data(&self);
650+
entry.inherit_data(self);
650651
}
651652

652653
Ok(Manifest::new(metadata, entries))

crates/iceberg/src/spec/snapshot.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
use crate::error::Result;
2222
use chrono::{DateTime, TimeZone, Utc};
2323
use futures::AsyncReadExt;
24-
use itertools::Format;
2524
use serde::{Deserialize, Serialize};
2625
use std::collections::HashMap;
2726
use std::sync::Arc;
2827
use typed_builder::TypedBuilder;
2928

3029
use super::table_metadata::SnapshotLog;
3130
use crate::io::FileIO;
32-
use crate::spec::{FormatVersion, ManifestList, SchemaId, TableMetadata};
31+
use crate::spec::{ManifestList, SchemaId, TableMetadata};
3332
use crate::{Error, ErrorKind};
3433
use _serde::SnapshotV2;
3534

@@ -144,15 +143,16 @@ impl Snapshot {
144143
) -> Result<ManifestList> {
145144
match &self.manifest_list {
146145
ManifestListLocation::ManifestListFile(file) => {
147-
let mut manifest_list_content= Vec::with_capacity(1024);
146+
let mut manifest_list_content= Vec::new();
148147
file_io
149148
.new_input(file)?
149+
.reader().await?
150150
.read_to_end(&mut manifest_list_content)
151151
.await?;
152152

153153
Ok(ManifestList::parse_with_version(&manifest_list_content, table_metadata.format_version(), table_metadata.default_partition_spec().unwrap().partition_type(table_metadata.current_schema()).as_ref().unwrap())?)
154154
}
155-
ManifestListLocation::ManifestFiles(files) => Err(Error::new(
155+
ManifestListLocation::ManifestFiles(_) => Err(Error::new(
156156
ErrorKind::FeatureUnsupported,
157157
"Loading manifests from `manifests` is currently not supported, we only support loading from `manifest-list` file, see https://iceberg.apache.org/spec/#snapshots for more information.",
158158
)),
@@ -176,6 +176,7 @@ pub(super) mod _serde {
176176

177177
use serde::{Deserialize, Serialize};
178178

179+
use crate::spec::SchemaId;
179180
use crate::{Error, ErrorKind};
180181

181182
use super::{ManifestListLocation, Operation, Snapshot, Summary};
@@ -192,7 +193,7 @@ pub(super) mod _serde {
192193
pub manifest_list: String,
193194
pub summary: Summary,
194195
#[serde(skip_serializing_if = "Option::is_none")]
195-
pub schema_id: Option<i64>,
196+
pub schema_id: Option<SchemaId>,
196197
}
197198

198199
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
@@ -210,7 +211,7 @@ pub(super) mod _serde {
210211
#[serde(skip_serializing_if = "Option::is_none")]
211212
pub summary: Option<Summary>,
212213
#[serde(skip_serializing_if = "Option::is_none")]
213-
pub schema_id: Option<i64>,
214+
pub schema_id: Option<SchemaId>,
214215
}
215216

216217
impl From<SnapshotV2> for Snapshot {

0 commit comments

Comments
 (0)