Skip to content

Conversation

arifazmidd
Copy link

Closes #3703

Description

This PR adds streaming support to the remove_orphan_files Spark procedure to prevent driver OOM issues when dealing with tables that have many orphan files.

This mimics the existing behavior for expire_snapshots with the stream_results parameter that was added in #4152

Changes

  • Added stream-results option to DeleteOrphanFilesSparkAction
  • Added deleteFilesStreaming() method that processes files using toLocalIterator()
  • Returns sample of up to 20,000 file paths with summary row showing total count
  • Added STREAM_RESULTS_PARAM to RemoveOrphanFilesProcedure
  • Added test coverage (3 new tests)

Testing

  • Added testStreamResults() - verifies streaming functionality with multiple orphan files
  • Added testStreamResultsBackwardsCompatibility() - ensures non-streaming mode still works
  • Added testStreamResultsWithDryRun() - tests streaming with dry run mode

Real-World Testing Results

Tested on AWS EMR with a production table containing ~3PB of orphaned data:

Run Description Learnings
Dry Run using Original Implementation Driver crashes due to OOM.
Dry Run using Stream Results Completed successfully. Returned a sample of 20k file paths that will be deleted and the total count of files that will be deleted as ~45M.
Full Run using Original Implementation Driver crashes due to OOM.
Full Run using Stream Results Terminated after 4 hours because that was the timeout set; however, it successfully iterated through and deleted ~31M files.
Full Run using Original Implementation (after streaming run) Completed successfully. Deleted the remaining ~14M orphaned files.
Full Run using Stream Results on sps-eta (validation run) Completed successfully. Deleted ~1200 new orphan files.

Key Findings:

  • Original implementation consistently crashes with OOM on large-scale orphan file cleanup (~45M files)
  • Streaming implementation successfully handles massive workloads without memory issues
  • Successfully deleted ~31M files in a single streaming run (terminated by timeout, not failure)
  • Streaming approach enables incremental cleanup of large orphan file sets

Backwards Compatibility

Fully backwards compatible. Default behavior unchanged. Streaming is opt-in via stream_results parameter.

@github-actions github-actions bot added the spark label Oct 8, 2025
@arifazmidd
Copy link
Author

Hi @RussellSpitzer @pvary @liziyan-lzy @huaxingao, if any of you have time to help review this that would be greatly appreciated!

@pvary
Copy link
Contributor

pvary commented Oct 14, 2025

@arifazmidd: Could you fix the test please?

@arifazmidd
Copy link
Author

arifazmidd commented Oct 14, 2025

Thanks for running the CI @pvary; I have fixed the formatting issues.

+ "See that IO's documentation to learn how to adjust parallelism for that particular "
+ "IO's bulk delete.",
"max_concurrent_deletes only works with FileIOs that do not support bulk deletes."
+ " Thistable is currently using {} which supports bulk deletes so the"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit space

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we change this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space wasn't there from before but I can add it. When I ran spotlessApply locally it updated this string concatenation.

Copy link
Contributor

@pvary pvary Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please share the command which you have used?

When I run:

./gradlew spotlessApply -DallModules=true

And I don't see any changes

+ "at the same time. If you are absolutely confident that no concurrent operations will be "
+ "affected by removing orphan files with such a short interval, you can use the Action API "
+ "to remove orphan files with an arbitrary interval.");
"Cannot remove orphan files with an interval less than 24 hours. Executing this procedure"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we change this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, the content of the string is the exact same but running spotlessApply locally updated the string concatenation.

LOG.warn(
"Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size(), e);
}
private boolean streamResults() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth it's own method? Only used once

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was keeping it consistent with how it's already setup for ExpireSnapshotsSparkAction

private ExpireSnapshots.Result doExecute() {
if (streamResults()) {
return deleteFiles(expireFiles().toLocalIterator());
} else {
return deleteFiles(expireFiles().collectAsList().iterator());
}
}
private boolean streamResults() {
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
}

Copy link
Contributor

@pvary pvary Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some places where we handle differently, like:

long defaultMaxSnapshotAgeMs =
PropertyUtil.propertyAsLong(
base.properties(), MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT);

I see your point. Still a bit strange for me, but I don't have strong opinion on this

* @param orphanFiles list of file paths to delete (already in driver memory)
*/
private void deleteFilesCollected(List<String> orphanFiles) {
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change for another feature?

I think this should be an independent PR/change where we start using bulk delete when the io supports bulk operation. This has nothing to do with streaming the results

Copy link
Author

@arifazmidd arifazmidd Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bulk delete support already existed in the original implementation. This not being added as new functionality rather refactoring into separate functions for streaming vs non-streaming deletion.

Current implementation supporting bulk delete:

private void deleteFiles(SupportsBulkOperations io, List<String> paths) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. you inlined the method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that now the logic is duplicated.

Is there a way to reuse the code?
Maybe reusing the deleteFilesCollected inside the deleteFilesStreaming?

assumeThat(usePrefixListing)
.as(
"This test verifies default listing behavior and does not require prefix listing to be enabled.")
"This test verifies default listing behavior and does not require prefix listing to be"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, the content of the string is the exact same but running spotlessApply locally updated the string concatenation.

Comment on lines +291 to +296
// Collect sample paths before deleting
for (String path : fileGroup) {
if (samplePaths.size() < MAX_ORPHAN_FILE_PATHS_TO_RETURN_WHEN_STREAMING) {
samplePaths.add(path);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe do it after deleting, so we only put successfully deleted files to the sample?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DeleteOrphanFiles or ExpireSnapshots outofmemory

3 participants