-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Flink: Refactor WriteResult aggregation in DynamicIcebergSink #14312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
byte[][] writeToManifests( | ||
String tableName, Collection<WriteResult> writeResults, long checkpointId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to create multiple manifest files?
Could it be just a single one for every table+branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create a separate manifest file for each partition spec according to the Iceberg spec:
A manifest stores files for a single partition spec. When a table’s partition spec changes, old files remain in the older manifest and newer files are written to a new manifest. This is required because a manifest file’s schema is based on its partition spec (see below).
I attempted to hack this and write a single ManifestFile
with multiple DataFiles
/DeleteFiles
using different partition specs. This approach resulted in incorrect partition specs returned when reading the manifest back using the ManifestReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me confirm:
- A manifest can only contain files for a single partition spec
- The
RowDelta
operation could commit files for multiple partition spec?
How does the RowDelta
do this? Does it create multiple manifest files behind the scenes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a manifest can only contain files for a single partition spec.
Yes, the RowDelta
writes multiple manifests behind the scenes. It keeps track of DataFile
and DeleteFile
by their partition specs. See the implementation of MergingSnapshotProducer:
// update data
private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap();
The current implementation of DynamicWriteResultAggregator implicitly writes a new temporary manifest for each unique WriteTarget
, which creates multiple DynamicCommittables
per (table, branch, checkpoint) triplet (incorrect behaviour).
With this refactor, we write even fewer manifests (only for unique partition specs), which makes the implementation explicit:
- Create multiple manifests only for different partition specs (similar to
RowDelta
) - Create only one
DynamicCommittable
per checkpoint, and use multiple manifests for serialisation - Remove all assumptions of multiple commit requests in
DynamicCommitter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a look soon.
QQ in the meantime: How do we handle Iceberg version change for long running jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both the current code and the refactor use ManifestReader/Writer with a hard-coded version 2 of the Iceberg format for committable state serialisation in a checkpoint. See FlinkManifestUtil.
But we change the DynamicCommittable to contain TableKey instead of WriteTarget and a byte[][] instead of byte[].
Should we add a new version to the DynamicCommittableSerializer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to maintain checkpoint state backwards compatibility, given this would go into Iceberg 1.11.0?
I can see that the DynamicIcebergSink
is annotated with @Experimental
, which doesn't promise any compatibility guarantees.
I am also planning to change the committable state serialisation a couple more times in the following commits to replace manifest serialisation completely. If we want to keep backwards compatibility, should we introduce a new version in the last commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need some time to think through the implications, and how we handled these situations in the past.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past, we have always made sure, that users can upgrade their job to a newer Iceberg version without dropping the state. This is important for long running jobs, where in-place upgrade is critical.
I think here we should follow the same pattern. If we change how we store data in the state, then we need to make sure, that the old state could be read. This is done by versioning the serializer. The groundwork is there, and we need to use it.
I understand it is extra work to do if we want to change the serialization again, but I'm still not convinced that we have a good solution to that problem.
I see 2 options:
- Implement a serialization for the multiple manifests now, and remove it if we change it again before the next release
- Block this PR until we agree upon the next serialization solution.
I'm leaning towards option 1, because I'm a bit skeptical about other serialization methods, and I think we will need longer time to agree on a way to move forward.
One argument against it is that the multiple manifest serialization doesn't add too much performance gain for us. It "just" helps by simplifying the committer code.
Your thoughts?
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java
Show resolved
Hide resolved
DynamicWriteResultAggregator currently produces multiple committables per (table, branch, checkpoint), which get aggregated in the downstream committer. Refactor the commit aggregator to output only one committable per triplet. Clean up DynamicCommitter to remove assumptions of multiple commit requests per table, branch, and checkpoint. This requires serializing the aggregated WriteResult using multiple temporary manifest files for each unique partition spec because the Iceberg manifest writer requires a single spec per manifest file. We can improve this later by refactoring serialization in the following changes. Change-Id: I6fda8d23a01dc5de3e5c9e612a88533a6986fa54
5904b13
to
3b00257
Compare
This PR moves the
WriteResult
aggregation logic fromDynamicCommitter
toDynamicWriteResultAggregator
, as described in this comment: #14182 (comment).DynamicWriteResultAggregator
currently produces multipleDynamicCommittables
per (table, branch, checkpoint) triplet. This initially broke the commit recovery of the dynamic Iceberg sink (see #14090), and was later addressed by a hot fix to aggregateWriteResults
in theDynamicCommitter
.Refactor the
DynamicWriteResultAggregator
to output only one committable per triplet. Clean upDynamicCommitter
to remove assumptions of multiple commit requests per table, branch, and checkpoint. This requires serializing the aggregated WriteResult using multiple temporary manifests for each unique partition spec because the Iceberg manifest writer requires a single partition spec per file. We can improve this later by changing how we serializeDataFiles
andDeleteFiles
for Flink checkpoints in theDynamicSink
.