-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Description
Feature Request / Improvement
Problem
When using the create_changelog_view
procedure with net_changes=true
to remove intermediate changes across multiple snapshots, the current implementation is computationally expensive. The operation performs a repartition and sort on ALL columns in the table, which becomes a significant bottleneck for wide/large tables.
See
Lines 220 to 227 in 36bb826
Column[] repartitionSpec = | |
Arrays.stream(df.columns()) | |
.filter(columnsToKeep) | |
.map(CreateChangelogViewProcedure::delimitedName) | |
.map(df::col) | |
.toArray(Column[]::new); | |
return applyCarryoverRemoveIterator(df, repartitionSpec, netChanges); |
This isn't true for the compute_updates
path wherein the repartition is only performed on the identifier fields
+ change_ordinal
.
Current Limitation
The net_changes=true
path does not support identifier columns:
- If the table has identifier columns defined in the schema, they are ignored when net_changes=true is used
- If identifier columns are explicitly provided as a parameter along with net_changes=true, the procedure fails
This forces users to choose between:
- Getting net changes (expensive all-column repartition)
- Using identifier columns for optimization (but only getting per-snapshot updates)
Proposed Solution
Expected Behavior
Allow net_changes=true
to work with identifier columns:
net_changes | identifier_columns | compute_updates | Behavior |
---|---|---|---|
false | not provided | false | Remove carryovers only (existing) |
false | provided | true (auto) | Per-snapshot pre/post images (existing) |
true | not provided | false | Net changes via all-column repartition (existing) |
true | provided | true | Net changes with pre/post images across the entire snapshot range (NEW - currently fails) |
Implementation Details
We implement ComputeNetUpdates which combines net_changes=true
with identifier columns:
- Repartition by
identifier_columns
and sort within partition byidentifier_columns + change_ordinal
- Apply
RemoveCarryoverIterator
. - Use window functions to identify first and last changes for each logical row
- Filter to keep only first and last changes (as per
change_ordinal
) for each logical row - Calculate pre/post images using DELETE-INSERT pairs (similar to existing
ComputeUpdateIterator
).
This mirrors the existingcompute_updates
approach (see), but operates across the entire snapshot range (similar tonet_changes
) rather than per-snapshot.
Performance Impact
- Reduced shuffle data volume: Only identifier columns are used for partitioning (typically 1-3 columns) instead of all columns (potentially dozens)
- Reduced sort key size: Sorting on fewer columns is exponentially faster
- Less data to process: Window function pre-filters data before the final iterator logic
This is particularly helpful for Tables with:
- Wide schemas (20+ columns)
- High cardinality data
- Multiple snapshots with many intermediate changes
Query engine
Spark
Willingness to contribute
- I can contribute this improvement/feature independently
- I would be willing to contribute this improvement/feature with guidance from the Iceberg community
- I cannot contribute this improvement/feature at this time