diff --git a/bindings/python/Cargo.lock b/bindings/python/Cargo.lock index 03f1a4ca5..30581723d 100644 --- a/bindings/python/Cargo.lock +++ b/bindings/python/Cargo.lock @@ -533,9 +533,9 @@ checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" [[package]] name = "bitflags" -version = "2.9.0" +version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" [[package]] name = "bitvec" @@ -1893,10 +1893,11 @@ dependencies = [ [[package]] name = "generator" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" +checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827" dependencies = [ + "cc", "cfg-if", "libc", "log", @@ -2152,7 +2153,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.61.0", + "windows-core", ] [[package]] @@ -3165,6 +3166,7 @@ dependencies = [ "iceberg", "iceberg-datafusion", "pyo3", + "serde_json", "tokio", ] @@ -4739,25 +4741,24 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows" -version = "0.58.0" +version = "0.61.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +checksum = "c5ee8f3d025738cb02bad7868bbb5f8a6327501e870bf51f1b455b0a2454a419" dependencies = [ - "windows-core 0.58.0", - "windows-targets 0.52.6", + "windows-collections", + "windows-core", + "windows-future", + "windows-link", + "windows-numerics", ] [[package]] -name = "windows-core" -version = "0.58.0" +name = "windows-collections" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" dependencies = [ - "windows-implement 0.58.0", - "windows-interface 0.58.0", - "windows-result 0.2.0", - "windows-strings 0.1.0", - "windows-targets 0.52.6", + "windows-core", ] [[package]] @@ -4766,22 +4767,21 @@ version = "0.61.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" dependencies = [ - "windows-implement 0.60.0", - "windows-interface 0.59.1", + "windows-implement", + "windows-interface", "windows-link", - "windows-result 0.3.2", + "windows-result", "windows-strings 0.4.0", ] [[package]] -name = "windows-implement" -version = "0.58.0" +name = "windows-future" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +checksum = "7a1d6bbefcb7b60acd19828e1bc965da6fcf18a7e39490c5f8be71e54a19ba32" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", + "windows-core", + "windows-link", ] [[package]] @@ -4795,17 +4795,6 @@ dependencies = [ "syn 2.0.101", ] -[[package]] -name = "windows-interface" -version = "0.58.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "windows-interface" version = "0.59.1" @@ -4824,23 +4813,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" [[package]] -name = "windows-registry" -version = "0.4.0" +name = "windows-numerics" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ - "windows-result 0.3.2", - "windows-strings 0.3.1", - "windows-targets 0.53.0", + "windows-core", + "windows-link", ] [[package]] -name = "windows-result" -version = "0.2.0" +name = "windows-registry" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3" dependencies = [ - "windows-targets 0.52.6", + "windows-result", + "windows-strings 0.3.1", + "windows-targets 0.53.0", ] [[package]] @@ -4852,16 +4842,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-strings" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" -dependencies = [ - "windows-result 0.2.0", - "windows-targets 0.52.6", -] - [[package]] name = "windows-strings" version = "0.3.1" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 7d126650a..bb7254eb9 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -34,6 +34,7 @@ crate-type = ["cdylib"] arrow = { version = "55", features = ["pyarrow", "chrono-tz"] } iceberg = { path = "../../crates/iceberg" } pyo3 = { version = "0.24", features = ["extension-module", "abi3-py39"] } +serde_json = "1.0.138" iceberg-datafusion = { path = "../../crates/integrations/datafusion" } datafusion-ffi = { version = "47" } tokio = { version = "1.44", default-features = false } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 283dc1b14..dd8d1f408 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -21,10 +21,12 @@ mod datafusion_table_provider; mod error; mod runtime; mod transform; +mod manifest; #[pymodule] fn pyiceberg_core_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { datafusion_table_provider::register_module(py, m)?; transform::register_module(py, m)?; + manifest::register_module(py, m)?; Ok(()) } diff --git a/bindings/python/src/manifest.rs b/bindings/python/src/manifest.rs new file mode 100644 index 000000000..64ebbe103 --- /dev/null +++ b/bindings/python/src/manifest.rs @@ -0,0 +1,401 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::spec::{DataFile, DataFileFormat, FieldSummary, FormatVersion, Literal, Manifest, ManifestEntry, ManifestFile, ManifestList, ManifestStatus, PrimitiveLiteral, StructType, Type}; +use iceberg::{Error, ErrorKind}; +use pyo3::prelude::*; +use pyo3::types::PyAny; +use std::collections::HashMap; +use std::sync::Arc; + +#[pyclass] +pub struct PyLiteral { + inner: Literal, +} + + +#[pyclass] +pub struct PyPrimitiveLiteral { + inner: PrimitiveLiteral, +} + + +#[pyclass] +pub struct PyDataFile { + inner: DataFile, +} + +#[pymethods] +impl PyDataFile { + + #[getter] + fn content(&self) -> i32 { + self.inner.content_type() as i32 + } + + #[getter] + fn file_path(&self) -> &str { + self.inner.file_path() + } + + #[getter] + fn file_format(&self) -> &str { + match self.inner.file_format() { + DataFileFormat::Avro => "avro", + DataFileFormat::Orc => "orc", + DataFileFormat::Parquet => "parquet", + DataFileFormat::Puffin => "puffin", + } + } + + #[getter] + fn partition(&self) -> Vec> { + self.inner.partition().fields().iter().map(|p| match p { + Some(lit) => Some(PyLiteral { inner: lit.clone() }), + _ => None + } ).collect() + } + + #[getter] + fn record_count(&self) -> u64 { + self.inner.record_count() + } + + #[getter] + fn file_size_in_bytes(&self) -> u64 { + self.inner.file_size_in_bytes() + } + + #[getter] + fn column_sizes(&self) -> &HashMap { + self.inner.column_sizes() + } + + #[getter] + fn value_counts(&self) -> &HashMap { + self.inner.value_counts() + } + + #[getter] + fn null_value_counts(&self) -> &HashMap { + self.inner.null_value_counts() + } + + #[getter] + fn nan_value_counts(&self) -> &HashMap { + self.inner.nan_value_counts() + } + + #[getter] + fn upper_bounds(&self) -> HashMap> { + self.inner.upper_bounds().into_iter().map(|(k, v)| (k.clone(), v.to_bytes().unwrap().to_vec())).collect() + } + + #[getter] + fn lower_bounds(&self) -> HashMap> { + self.inner.lower_bounds().into_iter().map(|(k, v)| (k.clone(), v.to_bytes().unwrap().to_vec())).collect() + } + + #[getter] + fn key_metadata(&self) -> Option<&[u8]> { + self.inner.key_metadata() + } + + #[getter] + fn split_offsets(&self) -> &[i64] { + self.inner.split_offsets() + } + + #[getter] + fn equality_ids(&self) -> &[i32] { + self.inner.equality_ids() + } + + #[getter] + fn sort_order_id(&self) -> Option { + self.inner.sort_order_id() + } + +} + +#[pyclass] +pub struct PyManifest { + inner: Manifest, +} + + +#[pymethods] +impl PyManifest { + fn entries(&self) -> Vec { + // TODO: Most of the time, we're only interested in 'alive' entries, + // that are the ones that are either ADDED or EXISTING + // so we can add a boolean to skip the DELETED entries right away before + // moving it into the Python world + self.inner.entries().iter().map(|entry| PyManifestEntry { inner: entry.clone() }).collect() + } +} + + +#[pyclass] +pub struct PyFieldSummary { + inner: FieldSummary, +} + + +#[pymethods] +impl crate::manifest::PyFieldSummary { + + #[getter] + fn contains_null(&self) -> bool { + self.inner.contains_null + } + + #[getter] + fn contains_nan(&self) -> Option { + self.inner.contains_nan + } + + #[getter] + fn lower_bound(&self) -> Option { + self.inner.lower_bound.clone().map(|v| PyPrimitiveLiteral{ inner: v.literal().clone() }) + } + + #[getter] + fn upper_bound(&self) -> Option { + self.inner.upper_bound.clone().map(|v| PyPrimitiveLiteral{ inner: v.literal().clone() }) + } + + + +} + +#[pyclass] +pub struct PyManifestFile { + inner: ManifestFile, +} + + +#[pymethods] +impl crate::manifest::PyManifestFile { + #[getter] + fn manifest_path(&self) -> &str { + self.inner.manifest_path.as_str() + } + #[getter] + fn manifest_length(&self) -> i64 { + self.inner.manifest_length + } + #[getter] + fn partition_spec_id(&self) -> i32 { + self.inner.partition_spec_id + } + + #[getter] + fn content(&self) -> i32 { + self.inner.content as i32 + } + + #[getter] + fn sequence_number(&self) -> i64 { + self.inner.sequence_number + } + + + #[getter] + fn min_sequence_number(&self) -> i64 { + self.inner.min_sequence_number + } + + #[getter] + fn added_snapshot_id(&self) -> i64 { + self.inner.added_snapshot_id + } + + + #[getter] + fn added_files_count(&self) -> Option { + self.inner.added_files_count + } + + #[getter] + fn existing_files_count(&self) -> Option { + self.inner.existing_files_count + } + + #[getter] + fn deleted_files_count(&self) -> Option { + self.inner.deleted_files_count + } + + #[getter] + fn added_rows_count(&self) -> Option { + self.inner.added_rows_count + } + + #[getter] + fn existing_rows_count(&self) -> Option { + self.inner.existing_rows_count + } + + #[getter] + fn deleted_rows_count(&self) -> Option { + self.inner.deleted_rows_count + } + + #[getter] + fn partitions(&self) -> Vec { + self.inner.partitions.iter().map(|s| PyFieldSummary { + inner: s.clone() + }).collect() + } + + #[getter] + fn key_metadata(&self) -> Vec { + self.inner.key_metadata.clone() + } + +} + +#[pyclass] +pub struct PyManifestEntry { + inner: Arc, +} + +#[pymethods] +impl PyManifestEntry { + + #[getter] + fn status(&self) -> i32 { + ManifestStatus::Existing as i32 + } + + #[getter] + fn snapshot_id(&self) -> Option { + self.inner.snapshot_id + } + + #[getter] + fn sequence_number(&self) -> Option { + self.inner.sequence_number + } + + #[getter] + fn file_sequence_number(&self) -> Option { + self.inner.file_sequence_number + } + + #[getter] + fn data_file(&self) -> PyDataFile { + PyDataFile { + inner: self.inner.data_file.clone() + } + } +} + + +#[pyfunction] +pub fn read_manifest_entries(bs: &[u8]) -> PyManifest { + // TODO: Some error handling + PyManifest { + inner: Manifest::parse_avro(bs).unwrap() + } +} + +#[pyclass] +// #[derive(Clone)] +pub struct PartitionSpecProviderCallbackHolder { + callback: Py, +} + +#[pymethods] +impl PartitionSpecProviderCallbackHolder { + #[new] + fn new(callback: Py) -> Self { + Self { callback } + } + + fn trigger_from_python(&self, id: i32) -> PyResult { + Self::do_the_callback(self, id) + } +} + +impl PartitionSpecProviderCallbackHolder { + /// Simulate calling the Python callback from "pure Rust" + pub fn do_the_callback(&self, id: i32) -> PyResult { + Python::with_gil(|py| { + let result = self.callback.call1(py, (id,))?; // Call the Python function + let string = result.extract::(py)?; // Try converting the result to a Rust String + Ok(string) + }) + } +} + + +#[pyclass] +pub struct PyManifestList { + inner: ManifestList, +} + + +#[pymethods] +impl crate::manifest::PyManifestList { + fn entries(&self) -> Vec { + self.inner.entries().iter().map(|file| PyManifestFile { inner: file.clone() }).collect() + } +} + + +#[pyfunction] +pub fn read_manifest_list(bs: &[u8], cb: &PartitionSpecProviderCallbackHolder) -> PyManifestList { + // TODO: Some error handling + let provider = move |_id| { + let bound = cb.do_the_callback(_id).unwrap(); + let json = bound.as_str(); + + // I don't fully comprehend the deserializer here, + // it works for a Type, but not for a StructType + // So I had to do some awkward stuff to make it work + let res: Result = serde_json::from_str(json); + + let result: Result, Error> = match res { + Ok(Type::Struct(s)) => Ok(Some(s)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid JSON: {}", json), + )) + }; + + result + }; + + PyManifestList { + inner: ManifestList::parse_with_version(bs, FormatVersion::V2, provider).unwrap() + } +} + +pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { + let this = PyModule::new(py, "manifest")?; + + this.add_function(wrap_pyfunction!(read_manifest_entries, &this)?)?; + this.add_function(wrap_pyfunction!(read_manifest_list, &this)?)?; + this.add_class::()?; + + m.add_submodule(&this)?; + py.import("sys")? + .getattr("modules")? + .set_item("pyiceberg_core.manifest", this) +}