-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Core: Group binpack fileGroup by output partitionSpec #14281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 1.10.x
Are you sure you want to change the base?
Conversation
}); | ||
} | ||
|
||
private static Object convertPartitionValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not exhaustive, only the common year/month/day/hour case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have an exhaustive implementation?
Adding something half finished seems problematic to me.
groups, | ||
group -> | ||
enoughInputFiles(group) | ||
(group.size() >= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly for single file case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we rewriting a single file just so we can change the specId?
@pvary @Parth-Brahmbhatt could you help to review it? thanks. |
What happens if the file contains data for multiple new partitions? Say the new output spec is more detailed than the original one, or maybe even contain absolutely different partitions? I didn't have time to go through your code, but there are existing tools to get the partition value for a row with a given spec. You might want to use them. Also please fix the test issues. |
let's say we have event/date partition,
i think you mean coercePartition(), it does not deal with the hidden partition correctly e.g. extract date from hour, and need to calculate the extraction from spec every time. let me see if i can just use coercePartition.
will do. |
Iceberg 1.8+ introduces support in `rewrite_data_files` for writing to an output spec that differs from the table’s current partition spec. In our use case, the current table is partitioned by event/date/hour/batchId, where `batchId` serves as a work unit identifier. To address the small files problem, we want to roll up mature data to a coarser partition layout (event/date). This feature enables an efficient in-table rollup. We observed two issues: 1. Files are grouped by the current table’s partition spec, which places each small file into its own group — effectively preventing any bin-packing. 2. When running `rewrite_data_files` again, because the files no longer conform to the table’s current output partition spec, Spark places all files into a single group, causing a large and unnecessary shuffle. This patch resolves the issue by grouping files according to the *output* partition spec instead. As an additional benefit, when the output spec is only partially compatible with the current one, the grouping logic will still align on shared partitions — for example, when rolling up from event/date/hour to event/server/date, it will group data primarily by event/date.
tasks, | ||
task -> | ||
outsideDesiredFileSizeRange(task) || tooManyDeletes(task) || tooHighDeleteRatio(task)); | ||
(task.file() != null && task.file().specId() != outputSpecId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we rewriting a single file just so we can change the specId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's more to make the storage layout match the spec, e.g. in our case we want to rewrite the file from
s3://bucket/datalake/event=xxx/team=xxx/batchid=xxx/date=xxx/hour=xxx/xxx.parquet (current spec)
to
s3://bucket/datalake/event=xxx/team=xxx/date=xxx/xxx.parquet (rollup spec)
given that a lot of policy is based on s3 prefix, it's better to rewrite even there is only one file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this to a different PR?
I think this one is a bit questionable, and other community members might have different options around this.
What does |
in the hidden partition case, date(ts) and hour(ts) are 2 different field and has different fieldId, then it will be null. it's better to compare the sourceId() and see they are "compatible" (satisfilesOrderOf?) but without spec, the information is lost. |
Thanks for the explanation. This makes sense. |
Iceberg 1.8+ introduces support in
rewrite_data_files
for writing to an output spec that differs from the table’s current partition spec.In our use case, the current table is partitioned by event/date/hour/batchId, where
batchId
serves as a work unit identifier. To address the small files problem, we want to roll up mature data to a coarser partition layout (event/date). This feature enables an efficient in-table rollup.We observed two issues:
rewrite_data_files
again, because the files no longer conform to the table’s current output partition spec, Spark places all files into a single group, causing a large and unnecessary shuffle.This patch resolves the issue by grouping files according to the output partition spec instead. As an additional benefit, when the output spec is only partially compatible with the current one, the grouping logic will still align on shared partitions — for example, when rolling up from event/date/hour to event/server/date, it will group data primarily by event/date.