Skip to content

Commit d9f2fe5

Browse files
authored
Expose datafusion table provider as python binding (#1324)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #865 ## What changes are included in this PR? This PR creates a new `IcebergDataFusionTable` python class and exposes it through the new `pyiceberg_core.datafusion` module. ``` from pyiceberg_core.datafusion import IcebergDataFusionTable ``` The goal of exposing `IcebergDataFusionTable` is to be able to register the Iceberg table provider to datafusion-python, using the `register_table_provider` API. See the usage example in `bindings/python/tests/test_datafusion_table_provider.py` The integration relies on the `FFI_TableProvider` API as described in https://datafusion.apache.org/python/user-guide/io/table_provider.html Note that this integration only works for `datafusion >= 45` due to this issue apache/datafusion#13851 <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, unit tests. To build and test locally: ``` cd bindings/python hatch run dev:develop hatch run dev:test ```
1 parent f9061cc commit d9f2fe5

File tree

8 files changed

+2046
-441
lines changed

8 files changed

+2046
-441
lines changed

bindings/python/Cargo.lock

Lines changed: 1739 additions & 439 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bindings/python/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@ crate-type = ["cdylib"]
3434
arrow = { version = "55", features = ["pyarrow", "chrono-tz"] }
3535
iceberg = { path = "../../crates/iceberg" }
3636
pyo3 = { version = "0.24", features = ["extension-module", "abi3-py39"] }
37+
iceberg-datafusion = { path = "../../crates/integrations/datafusion" }
38+
datafusion-ffi = { version = "47" }
39+
tokio = { version = "1.44", default-features = false }

bindings/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ python-source = "python"
4444
ignore = ["F403", "F405"]
4545

4646
[tool.hatch.envs.dev]
47-
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "pyarrow>=17.0.0"]
47+
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "pyarrow>=17.0.0", "datafusion>=45", "pyiceberg[sql-sqlite]>=0.9.1"]
4848

4949
[tool.hatch.envs.dev.scripts]
5050
build = "maturin build --out dist --sdist"
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::ffi::CString;
20+
use std::sync::Arc;
21+
22+
use datafusion_ffi::table_provider::FFI_TableProvider;
23+
use iceberg::io::FileIO;
24+
use iceberg::table::StaticTable;
25+
use iceberg::TableIdent;
26+
use iceberg_datafusion::table::IcebergTableProvider;
27+
use pyo3::exceptions::PyRuntimeError;
28+
use pyo3::prelude::*;
29+
use pyo3::types::PyCapsule;
30+
31+
use crate::runtime::runtime;
32+
33+
#[pyclass(name = "IcebergDataFusionTable")]
34+
pub struct PyIcebergDataFusionTable {
35+
inner: Arc<IcebergTableProvider>,
36+
}
37+
38+
#[pymethods]
39+
impl PyIcebergDataFusionTable {
40+
#[new]
41+
fn new(
42+
identifier: Vec<String>,
43+
metadata_location: String,
44+
file_io_properties: Option<HashMap<String, String>>,
45+
) -> PyResult<Self> {
46+
let runtime = runtime();
47+
48+
let provider = runtime.block_on(async {
49+
let table_ident = TableIdent::from_strs(identifier)
50+
.map_err(|e| PyRuntimeError::new_err(format!("Invalid table identifier: {e}")))?;
51+
52+
let mut builder = FileIO::from_path(&metadata_location)
53+
.map_err(|e| PyRuntimeError::new_err(format!("Failed to init FileIO: {e}")))?;
54+
55+
if let Some(props) = file_io_properties {
56+
builder = builder.with_props(props);
57+
}
58+
59+
let file_io = builder
60+
.build()
61+
.map_err(|e| PyRuntimeError::new_err(format!("Failed to build FileIO: {e}")))?;
62+
63+
let static_table =
64+
StaticTable::from_metadata_file(&metadata_location, table_ident, file_io)
65+
.await
66+
.map_err(|e| {
67+
PyRuntimeError::new_err(format!("Failed to load static table: {e}"))
68+
})?;
69+
70+
let table = static_table.into_table();
71+
72+
IcebergTableProvider::try_new_from_table(table)
73+
.await
74+
.map_err(|e| {
75+
PyRuntimeError::new_err(format!("Failed to create table provider: {e}"))
76+
})
77+
})?;
78+
79+
Ok(Self {
80+
inner: Arc::new(provider),
81+
})
82+
}
83+
84+
fn __datafusion_table_provider__<'py>(
85+
&self,
86+
py: Python<'py>,
87+
) -> PyResult<Bound<'py, PyCapsule>> {
88+
let capsule_name = CString::new("datafusion_table_provider").unwrap();
89+
90+
let ffi_provider = FFI_TableProvider::new(self.inner.clone(), false, Some(runtime()));
91+
92+
PyCapsule::new(py, ffi_provider, Some(capsule_name))
93+
}
94+
}
95+
96+
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
97+
let this = PyModule::new(py, "datafusion")?;
98+
99+
this.add_class::<PyIcebergDataFusionTable>()?;
100+
101+
m.add_submodule(&this)?;
102+
py.import("sys")?
103+
.getattr("modules")?
104+
.set_item("pyiceberg_core.datafusion", this)?;
105+
106+
Ok(())
107+
}

bindings/python/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
use pyo3::prelude::*;
1919

20+
mod datafusion_table_provider;
2021
mod error;
22+
mod runtime;
2123
mod transform;
2224

2325
#[pymodule]
2426
fn pyiceberg_core_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
27+
datafusion_table_provider::register_module(py, m)?;
2528
transform::register_module(py, m)?;
2629
Ok(())
2730
}

bindings/python/src/runtime.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::OnceLock;
19+
20+
use tokio::runtime::{Handle, Runtime};
21+
22+
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
23+
24+
pub fn runtime() -> Handle {
25+
match Handle::try_current() {
26+
Ok(h) => h.clone(),
27+
_ => {
28+
let rt = RUNTIME.get_or_init(|| Runtime::new().unwrap());
29+
rt.handle().clone()
30+
}
31+
}
32+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
from datetime import date, datetime
20+
import uuid
21+
import pytest
22+
from pyiceberg_core.datafusion import IcebergDataFusionTable
23+
from datafusion import SessionContext
24+
from pyiceberg.catalog import Catalog, load_catalog
25+
import pyarrow as pa
26+
from pathlib import Path
27+
import datafusion
28+
29+
assert (
30+
datafusion.__version__ >= "45"
31+
) # iceberg table provider only works for datafusion >= 45
32+
33+
34+
@pytest.fixture(scope="session")
35+
def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path:
36+
return tmp_path_factory.mktemp("warehouse")
37+
38+
39+
@pytest.fixture(scope="session")
40+
def catalog(warehouse: Path) -> Catalog:
41+
catalog = load_catalog(
42+
"default",
43+
**{
44+
"uri": f"sqlite:///{warehouse}/pyiceberg_catalog.db",
45+
"warehouse": f"file://{warehouse}",
46+
},
47+
)
48+
return catalog
49+
50+
51+
@pytest.fixture(scope="session")
52+
def arrow_table_with_null() -> "pa.Table":
53+
"""Pyarrow table with all kinds of columns."""
54+
import pyarrow as pa
55+
56+
return pa.Table.from_pydict(
57+
{
58+
"bool": [False, None, True],
59+
"string": ["a", None, "z"],
60+
# Go over the 16 bytes to kick in truncation
61+
"string_long": ["a" * 22, None, "z" * 22],
62+
"int": [1, None, 9],
63+
"long": [1, None, 9],
64+
"float": [0.0, None, 0.9],
65+
"double": [0.0, None, 0.9],
66+
# 'time': [1_000_000, None, 3_000_000], # Example times: 1s, none, and 3s past midnight #Spark does not support time fields
67+
"timestamp": [
68+
datetime(2023, 1, 1, 19, 25, 00),
69+
None,
70+
datetime(2023, 3, 1, 19, 25, 00),
71+
],
72+
# "timestamptz": [
73+
# datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
74+
# None,
75+
# datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
76+
# ],
77+
"date": [date(2023, 1, 1), None, date(2023, 3, 1)],
78+
# Not supported by Spark
79+
# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
80+
# Not natively supported by Arrow
81+
# 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
82+
"binary": [b"\01", None, b"\22"],
83+
"fixed": [
84+
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
85+
None,
86+
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
87+
],
88+
},
89+
)
90+
91+
92+
def test_register_iceberg_table_provider(
93+
catalog: Catalog, arrow_table_with_null: pa.Table
94+
) -> None:
95+
catalog.create_namespace_if_not_exists("default")
96+
iceberg_table = catalog.create_table_if_not_exists(
97+
"default.dataset",
98+
schema=arrow_table_with_null.schema,
99+
)
100+
iceberg_table.append(arrow_table_with_null)
101+
102+
iceberg_table_provider = IcebergDataFusionTable(
103+
identifier=iceberg_table.name(),
104+
metadata_location=iceberg_table.metadata_location,
105+
file_io_properties=iceberg_table.io.properties,
106+
)
107+
108+
ctx = SessionContext()
109+
ctx.register_table_provider("test", iceberg_table_provider)
110+
111+
datafusion_table = ctx.table("test")
112+
assert datafusion_table is not None
113+
114+
# check that the schema is the same
115+
from pyiceberg.io.pyarrow import _pyarrow_schema_ensure_small_types
116+
117+
assert _pyarrow_schema_ensure_small_types(
118+
datafusion_table.schema()
119+
) == _pyarrow_schema_ensure_small_types(iceberg_table.schema().as_arrow())
120+
# large/small type mismatches, fixed in pyiceberg 0.10.0
121+
# assert datafusion_table.schema() == iceberg_table.schema().as_arrow()
122+
123+
# check that the data is the same
124+
assert (
125+
datafusion_table.to_arrow_table().to_pylist()
126+
== iceberg_table.scan().to_arrow().to_pylist()
127+
)
128+
# large/small type mismatches, fixed in pyiceberg 0.10.0
129+
# assert datafusion_table.to_arrow_table() == iceberg_table.scan().to_arrow()
130+
131+
132+
def test_register_pyiceberg_table(
133+
catalog: Catalog, arrow_table_with_null: pa.Table
134+
) -> None:
135+
from types import MethodType
136+
137+
catalog.create_namespace_if_not_exists("default")
138+
iceberg_table = catalog.create_table_if_not_exists(
139+
"default.dataset",
140+
schema=arrow_table_with_null.schema,
141+
)
142+
iceberg_table.append(arrow_table_with_null)
143+
144+
# monkey patch the __datafusion_table_provider__ method to the iceberg table
145+
def __datafusion_table_provider__(self):
146+
return IcebergDataFusionTable(
147+
identifier=self.name(),
148+
metadata_location=self.metadata_location,
149+
file_io_properties=self.io.properties,
150+
).__datafusion_table_provider__()
151+
152+
iceberg_table.__datafusion_table_provider__ = MethodType(
153+
__datafusion_table_provider__, iceberg_table
154+
)
155+
156+
ctx = SessionContext()
157+
ctx.register_table_provider("test", iceberg_table)
158+
159+
datafusion_table = ctx.table("test")
160+
assert datafusion_table is not None

crates/integrations/datafusion/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ pub use error::*;
2323

2424
mod physical_plan;
2525
mod schema;
26-
mod table;
26+
pub mod table;
2727
pub use table::table_provider_factory::IcebergTableProviderFactory;
2828
pub use table::*;

0 commit comments

Comments
 (0)