From 7249542417a63f333d1d98bc643524fa3d0f41dc Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 13 May 2025 02:25:33 +0200 Subject: [PATCH 1/4] WIP --- bindings/python/Cargo.lock | 5 +- bindings/python/Cargo.toml | 1 + bindings/python/poetry.lock | 7 + bindings/python/pyproject.toml | 2 +- bindings/python/src/lib.rs | 2 + bindings/python/src/manifest.rs | 410 +++++++++++++++++++++++ crates/iceberg/src/spec/datatypes.rs | 15 + crates/iceberg/src/spec/manifest_list.rs | 10 - 8 files changed, 439 insertions(+), 13 deletions(-) create mode 100644 bindings/python/poetry.lock create mode 100644 bindings/python/src/manifest.rs diff --git a/bindings/python/Cargo.lock b/bindings/python/Cargo.lock index b9250311a..ce61c134d 100644 --- a/bindings/python/Cargo.lock +++ b/bindings/python/Cargo.lock @@ -1312,7 +1312,6 @@ dependencies = [ "arrow-string", "async-trait", "bimap", - "bitvec", "bytes", "chrono", "derive_builder", @@ -2187,6 +2186,7 @@ dependencies = [ "arrow", "iceberg", "pyo3", + "serde_json", ] [[package]] @@ -2568,7 +2568,8 @@ checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" [[package]] name = "roaring" version = "0.10.12" -source = "git+https://github.com/RoaringBitmap/roaring-rs.git#9496afe1e6ad93a8875f901c2579761f7acdc5b0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e8d2cfa184d94d0726d650a9f4a1be7f9b76ac9fdb954219878dc00c1c1e7b" dependencies = [ "bytemuck", "byteorder", diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 32550eaae..e28ed4055 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -34,3 +34,4 @@ crate-type = ["cdylib"] arrow = { version = "54.1.0", features = ["pyarrow", "chrono-tz"] } iceberg = { path = "../../crates/iceberg" } pyo3 = { version = "0.23.3", features = ["extension-module", "abi3-py39"] } +serde_json = "1.0.138" diff --git a/bindings/python/poetry.lock b/bindings/python/poetry.lock new file mode 100644 index 000000000..b47f473d4 --- /dev/null +++ b/bindings/python/poetry.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +package = [] + +[metadata] +lock-version = "2.1" +python-versions = "~=3.9" +content-hash = "1f82eab15b06f4ff12a4f352b757976e812ed1f4ee7485fc0afc331e33ea981b" diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index 1ef9d2750..813b5bb83 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -33,7 +33,7 @@ classifiers = [ name = "pyiceberg-core" readme = "project-description.md" requires-python = "~=3.9" -version = "0.4.0" +version = "0.22123123.0" [tool.maturin] features = ["pyo3/extension-module"] diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index a16bdac4d..f162300e0 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -19,9 +19,11 @@ use pyo3::prelude::*; mod error; mod transform; +mod manifest; #[pymodule] fn pyiceberg_core_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { transform::register_module(py, m)?; + manifest::register_module(py, m)?; Ok(()) } diff --git a/bindings/python/src/manifest.rs b/bindings/python/src/manifest.rs new file mode 100644 index 000000000..0a0e8fc89 --- /dev/null +++ b/bindings/python/src/manifest.rs @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::spec::{DataContentType, DataFile, DataFileFormat, FieldSummary, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestFile, ManifestList, ManifestStatus, PrimitiveLiteral, StructType}; +use pyo3::prelude::*; +use std::collections::HashMap; +use std::sync::Arc; +use pyo3::types::PyAny; +use iceberg::Error; +use iceberg::ErrorKind; +#[pyclass] +pub struct PyLiteral { + inner: Literal, +} + + +#[pyclass] +pub struct PyPrimitiveLiteral { + inner: PrimitiveLiteral, +} + + +#[pyclass] +pub struct PyDataFile { + inner: DataFile, +} + +#[pymethods] +impl PyDataFile { + + #[getter] + fn content(&self) -> i32 { + // TODO: Maps the enum back to the int + // I have a feeling this can be done more intelligently + match self.inner.content_type() { + DataContentType::Data => 0, + DataContentType::PositionDeletes => 1, + DataContentType::EqualityDeletes => 2, + } + } + + #[getter] + fn file_path(&self) -> &str { + self.inner.file_path() + } + + #[getter] + fn file_format(&self) -> &str { + // TODO: Maps the enum back to the int + // I have a feeling this can be done more intelligently + match self.inner.file_format() { + DataFileFormat::Avro => "avro", + DataFileFormat::Orc => "orc", + DataFileFormat::Parquet => "parquet", + } + } + + #[getter] + fn partition(&self) -> Vec> { + self.inner.partition().fields().iter().map(|p| match p { + Some(lit) => Some(PyLiteral { inner: lit.clone() }), + _ => None + } ).collect() + } + + #[getter] + fn record_count(&self) -> u64 { + self.inner.record_count() + } + + #[getter] + fn file_size_in_bytes(&self) -> u64 { + self.inner.file_size_in_bytes() + } + + #[getter] + fn column_sizes(&self) -> &HashMap { + self.inner.column_sizes() + } + + #[getter] + fn value_counts(&self) -> &HashMap { + self.inner.value_counts() + } + + #[getter] + fn null_value_counts(&self) -> &HashMap { + self.inner.null_value_counts() + } + + #[getter] + fn nan_value_counts(&self) -> &HashMap { + self.inner.nan_value_counts() + } + + #[getter] + fn upper_bounds(&self) -> HashMap> { + self.inner.upper_bounds().into_iter().map(|(k, v)| (k.clone(), v.to_bytes().unwrap().to_vec())).collect() + } + + #[getter] + fn lower_bounds(&self) -> HashMap> { + self.inner.lower_bounds().into_iter().map(|(k, v)| (k.clone(), v.to_bytes().unwrap().to_vec())).collect() + } + + #[getter] + fn key_metadata(&self) -> Option<&[u8]> { + self.inner.key_metadata() + } + + #[getter] + fn split_offsets(&self) -> &[i64] { + self.inner.split_offsets() + } + + #[getter] + fn equality_ids(&self) -> &[i32] { + self.inner.equality_ids() + } + + #[getter] + fn sort_order_id(&self) -> Option { + self.inner.sort_order_id() + } + +} + +#[pyclass] +pub struct PyManifest { + inner: Manifest, +} + + +#[pymethods] +impl PyManifest { + fn entries(&self) -> Vec { + // TODO: Most of the time, we're only interested in 'alive' entries, + // that are the ones that are either ADDED or EXISTING + // so we can add a boolean to skip the DELETED entries right away before + // moving it into the Python world + self.inner.entries().iter().map(|entry| PyManifestEntry { inner: entry.clone() }).collect() + } +} + + +#[pyclass] +pub struct PyFieldSummary { + inner: FieldSummary, +} + + +#[pymethods] +impl crate::manifest::PyFieldSummary { + + #[getter] + fn contains_null(&self) -> bool { + self.inner.contains_null + } + + #[getter] + fn contains_nan(&self) -> Option { + self.inner.contains_nan + } + + #[getter] + fn lower_bound(&self) -> Option { + self.inner.lower_bound.clone().map(|v| PyPrimitiveLiteral{ inner: v.literal().clone() }) + } + + #[getter] + fn upper_bound(&self) -> Option { + self.inner.upper_bound.clone().map(|v| PyPrimitiveLiteral{ inner: v.literal().clone() }) + } + + + +} + +#[pyclass] +pub struct PyManifestFile { + inner: ManifestFile, +} + + +#[pymethods] +impl crate::manifest::PyManifestFile { + #[getter] + fn manifest_path(&self) -> &str { + self.inner.manifest_path.as_str() + } + #[getter] + fn manifest_length(&self) -> i64 { + self.inner.manifest_length + } + #[getter] + fn partition_spec_id(&self) -> i32 { + self.inner.partition_spec_id + } + + #[getter] + fn content(&self) -> i32 { + // TODO: Maps the enum back to the int + // I have a feeling this can be done more intelligently + match self.inner.content { + ManifestContentType::Data => 0, + ManifestContentType::Deletes => 1, + } + } + + #[getter] + fn sequence_number(&self) -> i64 { + self.inner.sequence_number + } + + + #[getter] + fn min_sequence_number(&self) -> i64 { + self.inner.min_sequence_number + } + + #[getter] + fn added_snapshot_id(&self) -> i64 { + self.inner.added_snapshot_id + } + + + #[getter] + fn added_files_count(&self) -> Option { + self.inner.added_files_count + } + + #[getter] + fn existing_files_count(&self) -> Option { + self.inner.existing_files_count + } + + #[getter] + fn deleted_files_count(&self) -> Option { + self.inner.deleted_files_count + } + + #[getter] + fn added_rows_count(&self) -> Option { + self.inner.added_rows_count + } + + #[getter] + fn existing_rows_count(&self) -> Option { + self.inner.existing_rows_count + } + + #[getter] + fn deleted_rows_count(&self) -> Option { + self.inner.deleted_rows_count + } + + #[getter] + fn partitions(&self) -> Vec { + self.inner.partitions.iter().map(|s| PyFieldSummary { + inner: s.clone() + }).collect() + } + + #[getter] + fn key_metadata(&self) -> Vec { + self.inner.key_metadata.clone() + } + +} + +#[pyclass] +pub struct PyManifestEntry { + inner: Arc, +} + +#[pymethods] +impl PyManifestEntry { + + #[getter] + fn status(&self) -> i32 { + // TODO: Maps the enum back to the int + // I have a feeling this can be done more intelligently + match self.inner.status { + ManifestStatus::Existing => 0, + ManifestStatus::Added => 1, + ManifestStatus::Deleted => 2, + } + } + + #[getter] + fn snapshot_id(&self) -> Option { + self.inner.snapshot_id + } + + #[getter] + fn sequence_number(&self) -> Option { + self.inner.sequence_number + } + + #[getter] + fn file_sequence_number(&self) -> Option { + self.inner.file_sequence_number + } + + #[getter] + fn data_file(&self) -> PyDataFile { + PyDataFile { + inner: self.inner.data_file.clone() + } + } +} + + +#[pyfunction] +pub fn read_manifest_entries(bs: &[u8]) -> PyManifest { + // TODO: Some error handling + PyManifest { + inner: Manifest::parse_avro(bs).unwrap() + } +} + +#[pyclass] +// #[derive(Clone)] +pub struct PartitionSpecProviderCallbackHolder { + callback: Py, +} + +#[pymethods] +impl PartitionSpecProviderCallbackHolder { + #[new] + fn new(callback: Py) -> Self { + Self { callback } + } + + fn trigger_from_python(&self, id: i32) -> PyResult { + Self::do_the_callback(self, id) + } +} + +impl PartitionSpecProviderCallbackHolder { + /// Simulate calling the Python callback from "pure Rust" + pub fn do_the_callback(&self, id: i32) -> PyResult { + Python::with_gil(|py| { + let result = self.callback.call1(py, (id,))?; // Call the Python function + let string = result.extract::(py)?; // Try converting the result to a Rust String + Ok(string) + }) + } +} + + +#[pyclass] +pub struct PyManifestList { + inner: ManifestList, +} + + +#[pymethods] +impl crate::manifest::PyManifestList { + fn entries(&self) -> Vec { + self.inner.entries().iter().map(|file| PyManifestFile { inner: file.clone() }).collect() + } +} + + +#[pyfunction] +pub fn read_manifest_list(bs: &[u8], cb: &PartitionSpecProviderCallbackHolder) -> PyManifestList { + // TODO: Some error handling + let provider = move |_id| { + let bound = cb.do_the_callback(_id).unwrap(); + let json = bound.as_str(); + serde_json::from_str::(json).map_err(|_|{ + Error::new( + ErrorKind::DataInvalid, + format!("Invalid JSON: {}", json), + ) + }).map(|v|Some(v)) + }; + + PyManifestList { + inner: ManifestList::parse_with_version(bs, FormatVersion::V2, provider).unwrap() + } +} + +pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { + let this = PyModule::new(py, "manifest")?; + + this.add_function(wrap_pyfunction!(read_manifest_entries, &this)?)?; + this.add_function(wrap_pyfunction!(read_manifest_list, &this)?)?; + this.add_class::()?; + + m.add_submodule(&this)?; + py.import("sys")? + .getattr("modules")? + .set_item("pyiceberg_core.manifest", this) +} diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 5b9ca6c33..a8f2b2d1b 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -942,6 +942,21 @@ mod tests { ) } + #[test] + fn struct_type_empty() { + let record = "{\"type\":\"struct\",\"fields\":[]}"; + + check_type_serde( + record, + Type::Struct(StructType { + fields: vec![], + id_lookup: OnceLock::default(), + name_lookup: OnceLock::default(), + }), + ) + } + + #[test] fn struct_type() { let record = r#" diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 8cf5df8dd..e2d8b29dc 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -883,16 +883,6 @@ pub(super) mod _serde { if let Some(partitions) = partitions { if let Some(partition_type) = partition_type { let partition_types = partition_type.fields(); - if partitions.len() != partition_types.len() { - return Err(Error::new( - crate::ErrorKind::DataInvalid, - format!( - "Invalid partition spec. Expected {} fields, got {}", - partition_types.len(), - partitions.len() - ), - )); - } partitions .into_iter() .zip(partition_types) From cff3d2b2156008112b1f6cd4570fad8dfa23dc98 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 14 May 2025 15:33:05 +0200 Subject: [PATCH 2/4] Expose Avro parsers in Python --- bindings/python/pyproject.toml | 2 +- bindings/python/src/manifest.rs | 24 +++++++++++++++++------- crates/iceberg/src/spec/datatypes.rs | 15 --------------- crates/iceberg/src/spec/manifest_list.rs | 10 ++++++++++ 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index 813b5bb83..1ef9d2750 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -33,7 +33,7 @@ classifiers = [ name = "pyiceberg-core" readme = "project-description.md" requires-python = "~=3.9" -version = "0.22123123.0" +version = "0.4.0" [tool.maturin] features = ["pyo3/extension-module"] diff --git a/bindings/python/src/manifest.rs b/bindings/python/src/manifest.rs index 0a0e8fc89..076624720 100644 --- a/bindings/python/src/manifest.rs +++ b/bindings/python/src/manifest.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use iceberg::spec::{DataContentType, DataFile, DataFileFormat, FieldSummary, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestFile, ManifestList, ManifestStatus, PrimitiveLiteral, StructType}; +use iceberg::spec::{DataContentType, DataFile, DataFileFormat, FieldSummary, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestFile, ManifestList, ManifestStatus, PrimitiveLiteral, StructType, Type}; use pyo3::prelude::*; use std::collections::HashMap; use std::sync::Arc; use pyo3::types::PyAny; -use iceberg::Error; -use iceberg::ErrorKind; +use iceberg::{Error, ErrorKind}; + #[pyclass] pub struct PyLiteral { inner: Literal, @@ -66,6 +66,7 @@ impl PyDataFile { DataFileFormat::Avro => "avro", DataFileFormat::Orc => "orc", DataFileFormat::Parquet => "parquet", + DataFileFormat::Puffin => "puffin", } } @@ -383,12 +384,21 @@ pub fn read_manifest_list(bs: &[u8], cb: &PartitionSpecProviderCallbackHolder) - let provider = move |_id| { let bound = cb.do_the_callback(_id).unwrap(); let json = bound.as_str(); - serde_json::from_str::(json).map_err(|_|{ - Error::new( + + // I don't fully comprehend the deserializer here, + // it works for a Type, but not for a StructType + // So I had to do some awkward stuff to make it work + let res: Result = serde_json::from_str(json); + + let result: Result, Error> = match res { + Ok(Type::Struct(s)) => Ok(Some(s)), + _ => Err(Error::new( ErrorKind::DataInvalid, format!("Invalid JSON: {}", json), - ) - }).map(|v|Some(v)) + )) + }; + + result }; PyManifestList { diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index a8f2b2d1b..5b9ca6c33 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -942,21 +942,6 @@ mod tests { ) } - #[test] - fn struct_type_empty() { - let record = "{\"type\":\"struct\",\"fields\":[]}"; - - check_type_serde( - record, - Type::Struct(StructType { - fields: vec![], - id_lookup: OnceLock::default(), - name_lookup: OnceLock::default(), - }), - ) - } - - #[test] fn struct_type() { let record = r#" diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index e2d8b29dc..8cf5df8dd 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -883,6 +883,16 @@ pub(super) mod _serde { if let Some(partitions) = partitions { if let Some(partition_type) = partition_type { let partition_types = partition_type.fields(); + if partitions.len() != partition_types.len() { + return Err(Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Invalid partition spec. Expected {} fields, got {}", + partition_types.len(), + partitions.len() + ), + )); + } partitions .into_iter() .zip(partition_types) From fb44a0aa1a0e6fc61173fe5f7ec8761ec51dedf4 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 14 May 2025 15:42:07 +0200 Subject: [PATCH 3/4] Cleanup --- bindings/python/poetry.lock | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 bindings/python/poetry.lock diff --git a/bindings/python/poetry.lock b/bindings/python/poetry.lock deleted file mode 100644 index b47f473d4..000000000 --- a/bindings/python/poetry.lock +++ /dev/null @@ -1,7 +0,0 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. -package = [] - -[metadata] -lock-version = "2.1" -python-versions = "~=3.9" -content-hash = "1f82eab15b06f4ff12a4f352b757976e812ed1f4ee7485fc0afc331e33ea981b" From 9bc9baf7b53e6a61e8ac2c118bff3e3b761c3a25 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 14 May 2025 21:42:58 +0200 Subject: [PATCH 4/4] Thanks Scott! --- bindings/python/src/manifest.rs | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/bindings/python/src/manifest.rs b/bindings/python/src/manifest.rs index 076624720..64ebbe103 100644 --- a/bindings/python/src/manifest.rs +++ b/bindings/python/src/manifest.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use iceberg::spec::{DataContentType, DataFile, DataFileFormat, FieldSummary, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestFile, ManifestList, ManifestStatus, PrimitiveLiteral, StructType, Type}; +use iceberg::spec::{DataFile, DataFileFormat, FieldSummary, FormatVersion, Literal, Manifest, ManifestEntry, ManifestFile, ManifestList, ManifestStatus, PrimitiveLiteral, StructType, Type}; +use iceberg::{Error, ErrorKind}; use pyo3::prelude::*; +use pyo3::types::PyAny; use std::collections::HashMap; use std::sync::Arc; -use pyo3::types::PyAny; -use iceberg::{Error, ErrorKind}; #[pyclass] pub struct PyLiteral { @@ -44,13 +44,7 @@ impl PyDataFile { #[getter] fn content(&self) -> i32 { - // TODO: Maps the enum back to the int - // I have a feeling this can be done more intelligently - match self.inner.content_type() { - DataContentType::Data => 0, - DataContentType::PositionDeletes => 1, - DataContentType::EqualityDeletes => 2, - } + self.inner.content_type() as i32 } #[getter] @@ -60,8 +54,6 @@ impl PyDataFile { #[getter] fn file_format(&self) -> &str { - // TODO: Maps the enum back to the int - // I have a feeling this can be done more intelligently match self.inner.file_format() { DataFileFormat::Avro => "avro", DataFileFormat::Orc => "orc", @@ -214,12 +206,7 @@ impl crate::manifest::PyManifestFile { #[getter] fn content(&self) -> i32 { - // TODO: Maps the enum back to the int - // I have a feeling this can be done more intelligently - match self.inner.content { - ManifestContentType::Data => 0, - ManifestContentType::Deletes => 1, - } + self.inner.content as i32 } #[getter] @@ -293,13 +280,7 @@ impl PyManifestEntry { #[getter] fn status(&self) -> i32 { - // TODO: Maps the enum back to the int - // I have a feeling this can be done more intelligently - match self.inner.status { - ManifestStatus::Existing => 0, - ManifestStatus::Added => 1, - ManifestStatus::Deleted => 2, - } + ManifestStatus::Existing as i32 } #[getter]