From e57e08b7de9e294e062882385624a93acd3ac3df Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 4 Jun 2025 14:29:18 +0200 Subject: [PATCH] I/O: Adapter for Apache Iceberg --- cratedb_toolkit/cli.py | 6 +- cratedb_toolkit/cluster/core.py | 28 +++++++ cratedb_toolkit/io/cli.py | 61 ++++++++++++++- cratedb_toolkit/io/iceberg.py | 134 ++++++++++++++++++++++++++++++++ doc/io/iceberg/index.md | 12 +++ pyproject.toml | 1 + 6 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 cratedb_toolkit/io/iceberg.py create mode 100644 doc/io/iceberg/index.md diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index 5a2af225..784f83f8 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -9,7 +9,8 @@ from .cmd.tail.cli import cli as tail_cli from .docs.cli import cli as docs_cli from .info.cli import cli as info_cli -from .io.cli import cli as io_cli +from .io.cli import cli_load as io_cli_load +from .io.cli import cli_save as io_cli_save from .query.cli import cli as query_cli from .settings.cli import cli as settings_cli from .shell.cli import cli as shell_cli @@ -30,7 +31,8 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): cli.add_command(cfr_cli, name="cfr") cli.add_command(cloud_cli, name="cluster") cli.add_command(docs_cli, name="docs") -cli.add_command(io_cli, name="load") +cli.add_command(io_cli_load, name="load") +cli.add_command(io_cli_save, name="save") cli.add_command(query_cli, name="query") cli.add_command(rockset_cli, name="rockset") cli.add_command(shell_cli, name="shell") diff --git a/cratedb_toolkit/cluster/core.py b/cratedb_toolkit/cluster/core.py index d579bcd0..79152e50 100644 --- a/cratedb_toolkit/cluster/core.py +++ b/cratedb_toolkit/cluster/core.py @@ -20,6 +20,7 @@ DatabaseAddressMissingError, OperationFailed, ) +from cratedb_toolkit.io.iceberg import from_iceberg, to_iceberg from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress from cratedb_toolkit.util.client import jwt_token_patch from cratedb_toolkit.util.data import asbool @@ -569,6 +570,9 @@ def load_table( else: raise NotImplementedError("Loading full data via Kinesis not implemented yet") + elif source_url_obj.scheme.startswith("iceberg") or source_url_obj.scheme.endswith("iceberg"): + return from_iceberg(str(source_url_obj), target_url) + elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]: if "+cdc" in source_url_obj.scheme: source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") @@ -599,6 +603,30 @@ def load_table( return self + def save_table( + self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None + ) -> "StandaloneCluster": + """ + Export data from a database table on a standalone CrateDB Server. + + Synopsis + -------- + export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo + + ctk load table influxdb2://example:token@localhost:8086/testdrive/demo + ctk load table mongodb://localhost:27017/testdrive/demo + """ + source_url = self.address.dburi + target_url_obj = URL(target.url) + # source_url = source.url + + if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"): + return to_iceberg(source_url, target.url) + else: + raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}") + + return self + class DatabaseCluster: """ diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6bd72ee4..16efc42c 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -18,14 +18,14 @@ @click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") @click.version_option() @click.pass_context -def cli(ctx: click.Context, verbose: bool, debug: bool): +def cli_load(ctx: click.Context, verbose: bool, debug: bool): """ Load data into CrateDB. """ return boot_click(ctx, verbose, debug) -@make_command(cli, name="table") +@make_command(cli_load, name="table") @click.argument("url") @option_cluster_id @option_cluster_name @@ -67,3 +67,60 @@ def load_table( cluster_url=cluster_url, ) cluster.load_table(source=source, target=target, transformation=transformation) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") +@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") +@click.version_option() +@click.pass_context +def cli_save(ctx: click.Context, verbose: bool, debug: bool): + """ + Export data from CrateDB. + """ + return boot_click(ctx, verbose, debug) + + +@make_command(cli_save, name="table") +@click.argument("url") +@option_cluster_id +@option_cluster_name +@option_cluster_url +@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data") +@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") +@click.option("--format", "format_", type=str, required=False, help="File format of the import resource") +@click.option("--compression", type=str, required=False, help="Compression format of the import resource") +@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file") +@click.pass_context +def save_table( + ctx: click.Context, + url: str, + cluster_id: str, + cluster_name: str, + cluster_url: str, + schema: str, + table: str, + format_: str, + compression: str, + transformation: t.Union[Path, None], +): + """ + Export data from CrateDB and CrateDB Cloud clusters. + """ + + # When `--transformation` is given, but empty, fix it. + if transformation is not None and transformation.name == "": + transformation = None + + # Encapsulate source and target parameters. + source = TableAddress(schema=schema, table=table) + target = InputOutputResource(url=url, format=format_, compression=compression) + print("target:", target) + + # Dispatch "load table" operation. + cluster = DatabaseCluster.create( + cluster_id=cluster_id, + cluster_name=cluster_name, + cluster_url=cluster_url, + ) + cluster.save_table(source=source, target=target, transformation=transformation) diff --git a/cratedb_toolkit/io/iceberg.py b/cratedb_toolkit/io/iceberg.py new file mode 100644 index 00000000..cc57501a --- /dev/null +++ b/cratedb_toolkit/io/iceberg.py @@ -0,0 +1,134 @@ +import dataclasses +import logging + +import polars as pl +import pyarrow.parquet as pq +import sqlalchemy as sa +from boltons.urlutils import URL +from pyiceberg.catalog import Catalog, load_catalog +from sqlalchemy_cratedb import insert_bulk + +from cratedb_toolkit.model import DatabaseAddress + +logger = logging.getLogger(__name__) + + +CHUNK_SIZE = 75_000 + + +@dataclasses.dataclass +class IcebergAddress: + path: str + catalog: str + table: str + + @classmethod + def from_url(cls, url: str): + iceberg_url = URL(url) + if iceberg_url.host == ".": + iceberg_url.path = iceberg_url.path.lstrip("/") + return cls( + path=iceberg_url.path, + catalog=iceberg_url.query_params.get("catalog"), + table=iceberg_url.query_params.get("table"), + ) + + def load_catalog(self) -> Catalog: + return load_catalog( + self.catalog, + **{ + "type": "sql", + "uri": f"sqlite:///{self.path}/pyiceberg_catalog.db", + "warehouse": f"file://{self.path}", + }, + ) + + @property + def identifier(self): + return (self.catalog, self.table) + + def load_table(self) -> pl.LazyFrame: + if self.catalog is not None: + catalog = self.load_catalog() + return catalog.load_table(self.identifier).to_polars() + else: + return pl.scan_iceberg(self.path) + + +def from_iceberg(source_url, cratedb_url, progress: bool = False): + """ + Scan an Iceberg table from local filesystem or object store, and load into CrateDB. + https://docs.pola.rs/api/python/stable/reference/api/polars.scan_iceberg.html + + Synopsis + -------- + export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo + ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" + ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" + """ + + iceberg_address = IcebergAddress.from_url(source_url) + + # Parse parameters. + logger.info( + f"Iceberg address: Path: {iceberg_address.path}, catalog: {iceberg_address.catalog}, table: {iceberg_address.table}" + ) + + cratedb_address = DatabaseAddress.from_string(cratedb_url) + cratedb_url, cratedb_table = cratedb_address.decode() + if cratedb_table.table is None: + raise ValueError("Table name is missing. Please adjust CrateDB database URL.") + logger.info(f"Target address: {cratedb_address}") + + # Invoke copy operation. + logger.info("Running Iceberg copy") + engine = sa.create_engine(str(cratedb_url)) + + pl.Config.set_streaming_chunk_size(CHUNK_SIZE) + table = iceberg_address.load_table() + + # This conversion to pandas is zero-copy, + # so we can utilize their SQL utils for free. + # https://github.com/pola-rs/polars/issues/7852 + # Note: This code also uses the most efficient `insert_bulk` method with CrateDB. + # https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas + table.collect(streaming=True).to_pandas().to_sql( + name=cratedb_table.table, + schema=cratedb_table.schema, + con=engine, + if_exists="replace", + index=False, + chunksize=CHUNK_SIZE, + method=insert_bulk, + ) + + # Note: This was much slower. + # table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace") + + +def to_iceberg(source_url, target_url, progress: bool = False): + """ + Synopsis + -------- + export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo + ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" + ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" + """ + + iceberg_address = IcebergAddress.from_url(target_url) + catalog = iceberg_address.load_catalog() + print("catalog:", catalog) + + # https://py.iceberg.apache.org/#write-a-pyarrow-dataframe + df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet") + + # Create a new Iceberg table. + catalog.create_namespace_if_not_exists("default") + table = catalog.create_table_if_not_exists( + "default.taxi_dataset", + schema=df.schema, + ) + + # Append the dataframe to the table. + table.append(df) + len(table.scan().to_arrow()) diff --git a/doc/io/iceberg/index.md b/doc/io/iceberg/index.md new file mode 100644 index 00000000..cde8e21b --- /dev/null +++ b/doc/io/iceberg/index.md @@ -0,0 +1,12 @@ +(iceberg)= +# Apache Iceberg I/O + +## About +Import and export data into/from Iceberg tables, for humans and machines. + + +```{toctree} +:maxdepth: 1 + +loader +``` diff --git a/pyproject.toml b/pyproject.toml index 53cfdac1..408f88c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -173,6 +173,7 @@ optional-dependencies.io = [ "fsspec[s3,http]", "pandas>=1,<2.3", "polars<1.30", + "pyiceberg[pyarrow,sql-postgres]<0.10", "sqlalchemy>=2", "universal-pathlib<0.3", ]