-
Notifications
You must be signed in to change notification settings - Fork 9.1k
MAPREDUCE-7474. Improve Manifest committer resilience #6716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MAPREDUCE-7474. Improve Manifest committer resilience #6716
Conversation
testing: azure cardiff |
4dc5a60
to
c4faf5d
Compare
Reviews invited from @mukund-thakur @anmolanmol1234 @anujmodi2021 @HarshitGupta11 |
* retries of save() * split delete into deleteFile and rmdir * needs tests Change-Id: Idb6cf0e85c62c973881fdc96a3ded97b1cfc43ff
* Retries of save() 5 attempts, with 500 millis sleep between them. No configuration. Issue: should we make this configurable? * Split delete(path, recursive) into deleteFile and rmdir for separate statistics. Test simulation expands to: * Support recovery through a countdown of calls to fail. * Simulate timeout before *and after* rename calls. Change-Id: I3f86c5a238515955e9b82ed37727d40d2d8d3f96
Change-Id: I039ec6e4dc12f68690ffa977ebb81056ab0d1711
New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first this attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. Best for abfs; for gcs it works but is suboptimal. Enabled by default. Also: changed default abfs io rate to 1_000 from 10_000. +docs and tests updated Change-Id: Idd10aecc3cb6747a6367573ef9547675641afe8c
Change-Id: I24778ab4d817a77afbbf1d5b132be270698382a4
Change-Id: Ic67f41449d1e46d9fb81c47012bb41d5fade84a9
a3117cf
to
16e1be4
Compare
🎊 +1 overall
This message was automatically generated. |
The number of attempts to commit a manifest is now configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is still 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. (using retryUpToMaximumCountWithProportionalSleep policy) Documented. Making it configurable avoids having to guess what the ideal value should be, instead the default is something which could cope with briefly transient failures. Change-Id: I276aaf39bff73544a633126425cc7ec1e9848ec1
.../org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java
Outdated
Show resolved
Hide resolved
.../org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java
Show resolved
Hide resolved
return trackDuration(getIOStatistics(), statistic, () -> { | ||
return operations.delete(path, recursive); | ||
}); | ||
if (recursive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unable to understand this. How is a recursive flag determining that it is a dir or a file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, deleteDir will also delete a file. let me highlight that.
I'd done this delete dir/file split to support different capacity requests, without that it is a bit over-complex. it does let us collect different statistics though, which may be useful
...org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java
Outdated
Show resolved
Hide resolved
/** | ||
* Default value of option {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}: {@value}. | ||
*/ | ||
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it is bad for GCS, shouldn't the default be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really don't know here. In the docs I try to cover this
...hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
Show resolved
Hide resolved
...hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
Show resolved
Hide resolved
🎊 +1 overall
This message was automatically generated. |
@@ -143,6 +145,20 @@ public final class ManifestCommitterConstants { | |||
*/ | |||
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true; | |||
|
|||
/** | |||
* Should parallel cleanup try to delete teh base first? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: the
getName(), tempPath, finalPath, retryCount); | ||
|
||
trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () -> | ||
operations.save(manifestData, tempPath, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if rename failed in the first attempt but succeeded in the backend, will the save operation on tmpPath fail with an error and if yes how to recover from that
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so renameFile() has always deleted the destination because we need to do that to cope with failures of a previous/concurrent task attempt. Whoever commits last wins.
To make this clearer I'm pulling up more of the code into this method and adding comments.
...n/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
Show resolved
Hide resolved
...n/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
Show resolved
Hide resolved
try (DurationInfo info = new DurationInfo(LOG, true, | ||
"Initial delete of %s", baseDir)) { | ||
exception = deleteOneDir(baseDir); | ||
if (exception == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As added by you in this logic, when the directory tree is very large and is over OAuth authentication, Azure cloud could fail the baseDir delete due to exhaustive ACL permissions checks. But this delete will entry the retry loop as it was request timeout and for this scenario all the retries too might fail and can take a while to report failure with backoff and retry attempts as per AZURE_MAX_IO_RETRIES (default value 30).
Default max retry count is 30 today just to ensure any 5-10 min network/service transient failures do not lead failures of long running workloads.
If this logic to attempt basedir delete before falling back to parallel deletes, is optimal only for Azure cloud, we could look for ways to fail fast for Delete with recursive.
Would this work - Add a new config MAX_RETRIES_RECURSIVE_DELETE which by default will be the same as AZURE_MAX_IO_RETRIES in ABFS driver. AzureManifestCommitterFactory could probably set this config to 0 before FileSystem.get() call happens.
If this sounds ok, we can look into changes needed in AbfsClient, AbfsRestOperation and ExponentialRetry to make MAX_RETRIES_RECURSIVE_DELETE config effective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh, so it's going to be quite a long time to fall back.
I'm going to make the option default to false for now.
AzureManifestCommitterFactory could probably set this config to 0 before FileSystem.get() call happens.
it'll come from the cache, we don't want to set it for everything else, but a low MAX_RETRIES_RECURSIVE_DELETE might make sense everywhere. something to consider later.
Simulating more failure conditions. Still more to explore there, in particular "what if delete of rename target fails" Change-Id: Idb84f9c17a195702e6a2345b095f41e72865dd5b
🎊 +1 overall
This message was automatically generated. |
@snvijaya we actually know the total number of subdirs for the deletion! it is propagated via the manifests: each TA manifest includes the #of dirs as an IOStatistic, the aggregate summary adds these all up. the number of paths under the job dir is that number (counter committer_task_directory_count ) + any of failed task attempts. which means we could actually have a threshold of how many subdirectories will trigger an automatic switch to parallel delete. I'm just going to pass this down and log immediately before the cleanup kicks off, so if there are problems we will get the diagnostics adjacent to the error. Note that your details on retry timings imply that on a mapreduce job (rather than spark one) the progress() callback will not take place -so there's a risk that the job will actually timeout. I don't think that's an issue in MR job actions, the way it is is in task-side actions where a heartbeat back to the MapRed AM is required. |
Statistics Collection and Printing * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file) * After a failure to save a task attempt, the iostats of the manifest are rebuilt so the stats on failures are updated. This will get into the final job _SUCCESS statistics so we can see if anything happened * Make the manifest print command something which can be invoked from the commandline: mapred successfile This is covered in the docs. The failure stats regeneration is nice; works by passing down a lambda-expression of the logic to (re)generate the manifest, and invoking this on every attempt. As this is where the stats are aggregated, it includes details on the previous failing attempts. Directory size for deletion * Optionally pass down directory count under job dir to cleanup stage * This is determined in job commit from aggregate statistics; unknown elsewhere (app abort etc.). * It is currently only logged; it may be possible to support an option of when to skip the initial serial delete, though it will depend on abfs login mechanism. Testing * More fault injection scenarios. * Ability to assert that iostats do not contain specific non-zero stats. This is used in ITestAbfsTerasort to assert no task save or rename failures. The stats before this change imply this did happen in a job commit; no other details, hence the new probe. * Log manifest committer at debug in mapred-core Note: if there's a retry process which means the operation can take minutes, the initial operation will block progress() callbacks so mapreduce jobs will fail. Spark is unaffected Change-Id: Id423267de89c7f31e4b1283f9c433b729ff0d87b
💔 -1 overall
This message was automatically generated. |
Change-Id: I3048e959efdc1fc7707061e137eb9524d795ff90
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great pr! Some comments.
Thanks!
...org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java
Outdated
Show resolved
Hide resolved
delete(finalPath, true, OP_DELETE); | ||
|
||
// rename temp to final | ||
renameFile(tempPath, finalPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be a parallel process which might create a directory in between line 680 and 683, should we check post line 683, if finalPath is a file or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should we check if the filesystem rename returned true or false.
Reason for these above checks would help us know if there was no object on destination and the rename is completely succesful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- renameFile javadocs
throws PathIOException – if the rename() call returned false.
. so no need to check the result here - directory deletion, maybe: but what is going to create a directory here? nothing in the committer will, and if some other process is doing stuff in the job attempt dir you are doomed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got your point for the directory case.
For the first point, I now understand that executeRenamingOperation
would call escalateRenameFailure
on fs.rename() failure which would raise PathIOException. I was thinking if instead of calling renameFile
if we can do operation.renameFile()
directly and raise exception from there. Reason being, escalateRenameFailure
does a getFileStatus on both src and dst for logging. We can save 2 filesystem calls if we know the renameFile for the saveManifest has failed. Would like to know your view. But, I am good with this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean use commitFile() after creating a file entry, so pushing more of the recovery down? we could do that. we won't have the etag of the create file though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shall be better, as if the recovery also fail, we would not do additional HEAD calls for escalateRenameFailure
. This looks good!
One thing I'm considering here, make that "initial attempt at base dir delete" a numeric threshold. good: agile for now, leaving a simple switch |
* and use delay from retry class for sleeping Change-Id: I4f5ea48f6c22412d55ecb1bfd00c82b6cc7e4be5
🎊 +1 overall
This message was automatically generated. |
...n/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
Show resolved
Hide resolved
🎊 +1 overall
This message was automatically generated. |
Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. test changes to match the codepath changes, including improvements in fault injection. Change-Id: I757a77c8d2b7a7f1cf2ce32d109ce1baa6a90ec2
I've now moved to commitFile() to rename the task manifest, after doing a getFileStatus() call first...which means its iO cost is the same as a rename with recovery enabled. it does let us see what happened, which we log at WARN. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking the comment!
@saxenapranav what do you think of the patch now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my perspective, change look good to me. Thanks for taking all the thoughts!
However, since I am new to the component, would be great if we can get +1 from other reviewers as well.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
🎊 +1 overall
This message was automatically generated. |
Improve task commit resilience everywhere and add an option to reduce delete IO requests on job cleanup (relevant for ABFS and HDFS). Task Commit Resilience ---------------------- Task manifest saving is re-attempted on failure; the number of attempts made is configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. * Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file). This is only saved to the manifest on task commit retries, and provides statistics on all previous unsuccessful attempts to save the manifests + test changes to match the codepath changes, including improvements in fault injection. Directory size for deletion --------------------------- New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first This attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. This option is disabled by default; Consider enabling it for abfs to reduce IO load. Consult the documentation for more details. Success file printing --------------------- The command to print a JSON _SUCCESS file from this committer and any S3A committer is now something which can be invoked from the mapred command: mapred successfile <path to file> Contributed by Steve Loughran
Improve task commit resilience everywhere and add an option to reduce delete IO requests on job cleanup (relevant for ABFS and HDFS). Task Commit Resilience ---------------------- Task manifest saving is re-attempted on failure; the number of attempts made is configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. * Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file). This is only saved to the manifest on task commit retries, and provides statistics on all previous unsuccessful attempts to save the manifests + test changes to match the codepath changes, including improvements in fault injection. Directory size for deletion --------------------------- New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first This attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. This option is disabled by default; Consider enabling it for abfs to reduce IO load. Consult the documentation for more details. Success file printing --------------------- The command to print a JSON _SUCCESS file from this committer and any S3A committer is now something which can be invoked from the mapred command: mapred successfile <path to file> Contributed by Steve Loughran
Improve task commit resilience everywhere and add an option to reduce delete IO requests on job cleanup (relevant for ABFS and HDFS). Task Commit Resilience ---------------------- Task manifest saving is re-attempted on failure; the number of attempts made is configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. * Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file). This is only saved to the manifest on task commit retries, and provides statistics on all previous unsuccessful attempts to save the manifests + test changes to match the codepath changes, including improvements in fault injection. Directory size for deletion --------------------------- New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first This attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. This option is disabled by default; Consider enabling it for abfs to reduce IO load. Consult the documentation for more details. Success file printing --------------------- The command to print a JSON _SUCCESS file from this committer and any S3A committer is now something which can be invoked from the mapred command: mapred successfile <path to file> Contributed by Steve Loughran
Improve task commit resilience everywhere and add an option to reduce delete IO requests on job cleanup (relevant for ABFS and HDFS). Task Commit Resilience ---------------------- Task manifest saving is re-attempted on failure; the number of attempts made is configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. * Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file). This is only saved to the manifest on task commit retries, and provides statistics on all previous unsuccessful attempts to save the manifests + test changes to match the codepath changes, including improvements in fault injection. Directory size for deletion --------------------------- New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first This attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. This option is disabled by default; Consider enabling it for abfs to reduce IO load. Consult the documentation for more details. Success file printing --------------------- The command to print a JSON _SUCCESS file from this committer and any S3A committer is now something which can be invoked from the mapred command: mapred successfile <path to file> Contributed by Steve Loughran
Improve task commit resilience everywhere and add an option to reduce delete IO requests on job cleanup (relevant for ABFS and HDFS). Task Commit Resilience ---------------------- Task manifest saving is re-attempted on failure; the number of attempts made is configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. * Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file). This is only saved to the manifest on task commit retries, and provides statistics on all previous unsuccessful attempts to save the manifests + test changes to match the codepath changes, including improvements in fault injection. Directory size for deletion --------------------------- New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first This attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. This option is disabled by default; Consider enabling it for abfs to reduce IO load. Consult the documentation for more details. Success file printing --------------------- The command to print a JSON _SUCCESS file from this committer and any S3A committer is now something which can be invoked from the mapred command: mapred successfile <path to file> Contributed by Steve Loughran
Improve task commit resilience everywhere and add an option to reduce delete IO requests on job cleanup (relevant for ABFS and HDFS). Task Commit Resilience ---------------------- Task manifest saving is re-attempted on failure; the number of attempts made is configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. * Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file). This is only saved to the manifest on task commit retries, and provides statistics on all previous unsuccessful attempts to save the manifests + test changes to match the codepath changes, including improvements in fault injection. Directory size for deletion --------------------------- New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first This attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. This option is disabled by default; Consider enabling it for abfs to reduce IO load. Consult the documentation for more details. Success file printing --------------------- The command to print a JSON _SUCCESS file from this committer and any S3A committer is now something which can be invoked from the mapred command: mapred successfile <path to file> Contributed by Steve Loughran
MAPREDUCE-7474. Improve resilience of task commit save and rename operation with retries.
5 attempts, with 500 millis sleep between them. No configuration.
Issue: should we make this configurable?
statistics.
mapreduce.manifest.committer.cleanup.parallel.delete.base.first
This attempts to delete the base dir, and only on failure (timeout, permissions)
does it attempt the parallel delete and (re) attempt at deleting base dir.
This is to cut back on azure load while still handling timeouts on deep tree
deletion
Test simulation expands to:
This is based on #6596 but skips the rate limiting logic spanning common and azure,
instead it only contains changes in manifest committer -easier to backport.
How was this patch tested?
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?