Skip to content

Feature: Write to branches #941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table import TableProperties
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.properties import property_as_int

Expand Down Expand Up @@ -444,7 +444,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
refs = table.refs()
if type:
type = type.lower()
if type not in {"branch", "tag"}:
if type not in {SnapshotRefType.BRANCH, SnapshotRefType.TAG}:
raise ValueError(f"Type must be either branch or tag, got: {type}")

relevant_refs = [
Expand All @@ -458,7 +458,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:

def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]:
retention_properties = {}
if ref.snapshot_ref_type == "branch":
if ref.snapshot_ref_type == SnapshotRefType.BRANCH:
default_min_snapshots_to_keep = property_as_int(
table_properties,
TableProperties.MIN_SNAPSHOTS_TO_KEEP,
Expand Down
48 changes: 32 additions & 16 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
from pyiceberg.table.name_mapping import (
NameMapping,
)
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import (
Snapshot,
SnapshotLogEntry,
Expand Down Expand Up @@ -363,21 +363,22 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
name_mapping=self.table_metadata.name_mapping(),
)

def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> UpdateSnapshot:
"""Create a new UpdateSnapshot to produce a new snapshot for the table.

Returns:
A new UpdateSnapshot
"""
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)
return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties)

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
"""
Shorthand API for appending a PyArrow table to a table transaction.

Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the append operation
"""
try:
import pyarrow as pa
Expand Down Expand Up @@ -405,7 +406,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
update_snapshot = self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties)
append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

with append_method() as append_files:
Expand All @@ -422,6 +423,7 @@ def overwrite(
df: pa.Table,
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: str = MAIN_BRANCH,
) -> None:
"""
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
Expand All @@ -437,6 +439,7 @@ def overwrite(
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the overwrite operation
"""
try:
import pyarrow as pa
Expand All @@ -459,9 +462,9 @@ def overwrite(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)

self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)

with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
Expand All @@ -470,7 +473,12 @@ def overwrite(
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def delete(
self,
delete_filter: Union[str, BooleanExpression],
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: str = MAIN_BRANCH,
) -> None:
"""
Shorthand for deleting record from a table.

Expand All @@ -482,6 +490,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
Args:
delete_filter: A boolean expression to delete rows from a table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the delete operation
"""
from pyiceberg.io.pyarrow import (
ArrowScan,
Expand All @@ -498,15 +507,15 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
if isinstance(delete_filter, str):
delete_filter = _parse_row_filter(delete_filter)

with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).delete() as delete_snapshot:
delete_snapshot.delete_by_predicate(delete_filter)

# Check if there are any files that require an actual rewrite of a data file
if delete_snapshot.rewrites_needed is True:
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

files = self._scan(row_filter=delete_filter).plan_files()
files = self._scan(row_filter=delete_filter).use_ref(branch).plan_files()

commit_uuid = uuid.uuid4()
counter = itertools.count(0)
Expand Down Expand Up @@ -546,7 +555,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
))

if len(replaced_files) > 0:
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).overwrite(
commit_uuid=commit_uuid
) as overwrite_snapshot:
for original_data_file, replaced_data_files in replaced_files:
Expand Down Expand Up @@ -972,22 +981,24 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
"""
Shorthand API for appending a PyArrow table to the table.

Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the append operation
"""
with self.transaction() as tx:
tx.append(df=df, snapshot_properties=snapshot_properties)
tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)

def overwrite(
self,
df: pa.Table,
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: str = MAIN_BRANCH,
) -> None:
"""
Shorthand for overwriting the table with a PyArrow table.
Expand All @@ -1003,22 +1014,27 @@ def overwrite(
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the overwrite operation
"""
with self.transaction() as tx:
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)

def delete(
self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
self,
delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: str = MAIN_BRANCH,
) -> None:
"""
Shorthand for deleting rows from the table.

Args:
delete_filter: The predicate that used to remove rows
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the delete operation
"""
with self.transaction() as tx:
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)

def add_files(
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
Expand Down
8 changes: 6 additions & 2 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import (
MetadataLogEntry,
Snapshot,
Expand Down Expand Up @@ -154,7 +154,7 @@ class AddSnapshotUpdate(IcebergBaseModel):
class SetSnapshotRefUpdate(IcebergBaseModel):
action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref")
ref_name: str = Field(alias="ref-name")
type: Literal["tag", "branch"]
type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH]
snapshot_id: int = Field(alias="snapshot-id")
max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)]
max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)]
Expand Down Expand Up @@ -614,6 +614,10 @@ class AssertRefSnapshotId(ValidatableTableRequirement):
def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif len(base_metadata.snapshots) == 0 and self.ref != MAIN_BRANCH:
raise CommitFailedException(
f"Requirement failed: Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."
)
elif snapshot_ref := base_metadata.refs.get(self.ref):
ref_type = snapshot_ref.snapshot_ref_type
if self.snapshot_id is None:
Expand Down
49 changes: 39 additions & 10 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from pyiceberg.partitioning import (
PartitionSpec,
)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType
from pyiceberg.table.snapshots import (
Operation,
Snapshot,
Expand Down Expand Up @@ -103,12 +104,14 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
_branch: str

def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
branch: str,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
Expand All @@ -117,9 +120,9 @@ def __init__(
self._io = io
self._operation = operation
self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
# Since we only support the main branch for now
self._branch = branch
self._parent_snapshot_id = (
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._branch)) else None
)
self._added_data_files = []
self._deleted_data_files = set()
Expand Down Expand Up @@ -272,10 +275,20 @@ def _commit(self) -> UpdatesAndRequirements:
(
AddSnapshotUpdate(snapshot=snapshot),
SetSnapshotRefUpdate(
snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
ref_name=self._branch,
type=SnapshotRefType.BRANCH,
),
),
(
AssertRefSnapshotId(
snapshot_id=self._transaction.table_metadata.refs[self._branch].snapshot_id
if self._branch in self._transaction.table_metadata.refs
else self._transaction.table_metadata.current_snapshot_id,
ref=self._branch,
),
),
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
)

@property
Expand Down Expand Up @@ -324,10 +337,11 @@ def __init__(
operation: Operation,
transaction: Transaction,
io: FileIO,
branch: str,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()

def _commit(self) -> UpdatesAndRequirements:
Expand Down Expand Up @@ -482,12 +496,13 @@ def __init__(
operation: Operation,
transaction: Transaction,
io: FileIO,
branch: str,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
from pyiceberg.table import TableProperties

super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
self._target_size_bytes = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
Expand Down Expand Up @@ -533,7 +548,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
"""Determine if there are any existing manifest files."""
existing_files = []

if snapshot := self._transaction.table_metadata.current_snapshot():
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._branch):
for manifest_file in snapshot.manifests(io=self._io):
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)
found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]
Expand Down Expand Up @@ -603,21 +618,33 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
class UpdateSnapshot:
_transaction: Transaction
_io: FileIO
_branch: str
_snapshot_properties: Dict[str, str]

def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def __init__(
self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH
) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties
self._branch = branch

def fast_append(self) -> _FastAppendFiles:
return _FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
operation=Operation.APPEND,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

def merge_append(self) -> _MergeAppendFiles:
return _MergeAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
operation=Operation.APPEND,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
Expand All @@ -628,6 +655,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
else Operation.APPEND,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

Expand All @@ -636,6 +664,7 @@ def delete(self) -> _DeleteFiles:
operation=Operation.DELETE,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

Expand Down
Loading