Skip to content

Commit 68dff78

Browse files
committed
MAPREDUCE-7474. use commitFile() to rename the temporary task manifest
Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. test changes to match the codepath changes, including improvements in fault injection. Change-Id: I757a77c8d2b7a7f1cf2ce32d109ce1baa6a90ec2
1 parent 2b38434 commit 68dff78

File tree

6 files changed

+69
-40
lines changed

6 files changed

+69
-40
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -676,15 +676,20 @@ protected final <T extends AbstractManifestData> T saveManifest(
676676
// -this is just a little bit of due diligence.
677677
deleteRecursive(tempPath, OP_DELETE);
678678

679-
// save the temp file, overwriting any which remains from an earlier attempt
679+
// save the temp file.
680680
operations.save(manifestData, tempPath, true);
681+
// get the length and etag.
682+
final FileStatus st = getFileStatus(tempPath);
683+
684+
// commit rename of temporary file to the final path; deleting the destination first.
685+
final CommitOutcome outcome = commitFile(
686+
new FileEntry(tempPath, finalPath, st.getLen(), getEtag(st)),
687+
true);
688+
if (outcome.recovered) {
689+
LOG.warn("Task manifest file {} committed using rename recovery",
690+
manifestData);
691+
}
681692

682-
// delete the destination in case it exists either from a failed previous
683-
// attempt or from a concurrent task commit.
684-
delete(finalPath, true, OP_DELETE);
685-
686-
// rename temporary file to the final path.
687-
renameFile(tempPath, finalPath);
688693
});
689694
// success: save the manifest and declare success
690695
savedManifest = manifestData;
@@ -797,13 +802,14 @@ protected final CommitOutcome commitFile(FileEntry entry,
797802
// note any delay which took place
798803
noteAnyRateLimiting(STORE_IO_RATE_LIMITED, result.getWaitTime());
799804
}
805+
return new CommitOutcome(result.recovered());
800806
} else {
801807
// commit with a simple rename; failures will be escalated.
802808
executeRenamingOperation("renameFile", source, dest,
803809
OP_COMMIT_FILE_RENAME, () ->
804810
operations.renameFile(source, dest));
811+
return new CommitOutcome(false);
805812
}
806-
return new CommitOutcome();
807813
}
808814

809815
/**
@@ -923,6 +929,14 @@ private PathIOException escalateRenameFailure(String operation,
923929
*/
924930
public static final class CommitOutcome {
925931

932+
/**
933+
* Dit the commit recover from a failure?
934+
*/
935+
public final boolean recovered;
936+
937+
public CommitOutcome(final boolean recovered) {
938+
this.recovered = recovered;
939+
}
926940
}
927941

928942
/**

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ protected Result executeStage(
184184
if (exception == null) {
185185
// success: record this as the outcome,
186186
outcome = Outcome.DELETED;
187-
// and will skip the parallel delete
187+
// and flag that the the parallel delete should be skipped because the
188+
// base directory is alredy deleted.
188189
baseDirDeleted = true;
189190
} else {
190191
// failure: log and continue

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,15 @@ The protocol and its correctness are covered in
3434
[Manifest Committer Protocol](manifest_committer_protocol.html).
3535

3636
It was added in March 2022.
37-
As of April 2024, the only problems have been scale related, rather than
38-
algorithm correctness.
37+
As of April 2024, the problems which surfaced have been
38+
* Memory use at scale.
39+
* Directory deletion scalability.
40+
* Resilience to task commit to rename failures.
3941

40-
<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
42+
That is: the core algorithms is correct, but task commit
43+
robustness was insufficient to some failure conditions.
44+
And scale is always a challenge, even with components tested through
45+
large TPC-DS test runs.
4146

4247
## Problem:
4348

@@ -566,12 +571,12 @@ may surface in cloud storage.
566571
* General resilience to cleanup issues escalating to job failures.
567572

568573

569-
| Option | Meaning | Default Value |
570-
|--------|---------|---------------|
571-
| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory| `false` |
572-
| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` |
573-
| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete task attempt directories in parallel | `true` |
574-
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` |
574+
| Option | Meaning | Default Value |
575+
|-------------------------------------------------------------------|--------------------------------------------------------------------|---------------|
576+
| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory | `false` |
577+
| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` |
578+
| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete task attempt directories in parallel | `true` |
579+
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` |
575580

576581
The algorithm is:
577582

@@ -790,12 +795,12 @@ spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: U
790795
There are some advanced options which are intended for development and testing,
791796
rather than production use.
792797

793-
| Option | Meaning | Default Value |
794-
|--------|----------------------------------------------|---------------|
795-
| `mapreduce.manifest.committer.manifest.save.attempts` | How many attempts should be made to commit a task manifest? | `5` |
796-
| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` |
797-
| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` |
798-
| `mapreduce.manifest.committer.writer.queue.capacity` | Queue capacity for writing intermediate file | `32` |
798+
| Option | Meaning | Default Value |
799+
|-----------------------------------------------------------|-------------------------------------------------------------|---------------|
800+
| `mapreduce.manifest.committer.manifest.save.attempts` | How many attempts should be made to commit a task manifest? | `5` |
801+
| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` |
802+
| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` |
803+
| `mapreduce.manifest.committer.writer.queue.capacity` | Queue capacity for writing intermediate file | `32` |
799804

800805
### `mapreduce.manifest.committer.manifest.save.attempts`
801806

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCommitTaskStage.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
5353

5454
/**
55-
* Test committing a task.
55+
* Test committing a task, with lots of fault injection to validate
56+
* resilience to transient failures.
5657
*/
5758
public class TestCommitTaskStage extends AbstractManifestCommitterTest {
5859

@@ -303,8 +304,8 @@ public void testManifestRenameLateTimeoutsRecovery() throws Throwable {
303304
}
304305

305306
@Test
306-
public void testManifestDeleteInRenameErrorHandlerFailure() throws Throwable {
307-
describe("Testing failure in the delete call made during cleanup");
307+
public void testFailureToDeleteManifestPath() throws Throwable {
308+
describe("Testing failure in the delete call made before renaming the manifest");
308309

309310
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
310311
StageConfig stageConfig = createStageConfig();
@@ -315,6 +316,9 @@ public void testManifestDeleteInRenameErrorHandlerFailure() throws Throwable {
315316
// final manifest file is by task ID
316317
Path manifestFile = manifestPathForTask(manifestDir,
317318
stageConfig.getTaskId());
319+
// put a file in as there is a check for it before the delete
320+
ContractTestUtils.touch(getFileSystem(), manifestFile);
321+
/* and the delete shall fail */
318322
failures.addDeletePathToFail(manifestFile);
319323
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
320324
stageConfig.getTaskAttemptId());
@@ -332,10 +336,10 @@ public void testManifestDeleteInRenameErrorHandlerFailure() throws Throwable {
332336

333337

334338
/**
335-
* Failure of delete before rename.
339+
* Failure of delete before saving the manifest to a temporary path.
336340
*/
337341
@Test
338-
public void testFailureOfDeleteBeforeRename() throws Throwable {
342+
public void testFailureOfDeleteBeforeSavingTemporaryFile() throws Throwable {
339343
describe("Testing failure in the delete call made before rename");
340344

341345
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
@@ -344,14 +348,11 @@ public void testFailureOfDeleteBeforeRename() throws Throwable {
344348
new SetupTaskStage(stageConfig).apply("setup");
345349

346350
final Path manifestDir = stageConfig.getTaskManifestDir();
347-
// final manifest file is by task ID
348-
Path manifestFile = manifestPathForTask(manifestDir,
349-
stageConfig.getTaskId());
350351
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
351352
stageConfig.getTaskAttemptId());
352353

353354
// delete will fail
354-
failures.addDeletePathToFail(manifestFile);
355+
failures.addDeletePathToFail(manifestTempFile);
355356

356357
// first verify that if too many attempts fail, the task will fail
357358
failures.setFailureLimit(SAVE_ATTEMPTS + 1);

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -452,13 +452,19 @@ public boolean storeSupportsResilientCommit() {
452452
@Override
453453
public CommitFileResult commitFile(final FileEntry entry)
454454
throws IOException {
455+
final String op = "commitFile";
456+
final Path source = entry.getSourcePath();
457+
maybeTimeout(op, source, renamePathsToTimeoutBeforeRename);
455458
if (renameToFailWithException) {
456-
maybeRaiseIOE("commitFile",
457-
entry.getSourcePath(), renameSourceFilesToFail);
458-
maybeRaiseIOE("commitFile",
459+
maybeRaiseIOE(op,
460+
source, renameSourceFilesToFail);
461+
maybeRaiseIOE(op,
459462
entry.getDestPath().getParent(), renameDestDirsToFail);
460463
}
461-
return wrappedOperations.commitFile(entry);
464+
final CommitFileResult result = wrappedOperations.commitFile(entry);
465+
// post rename timeout.
466+
maybeTimeout(op, source, renamePathsToTimeoutAfterRename);
467+
return result;
462468
}
463469

464470
@Override

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants;
2424

2525
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY;
26+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST;
2627
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
2728

2829
/**
@@ -51,9 +52,10 @@ static Configuration prepareTestConfiguration(
5152
final String size = Integer.toString(192);
5253
conf.setIfUnset(ManifestCommitterConstants.OPT_IO_PROCESSORS, size);
5354
conf.setIfUnset(ManifestCommitterConstants.OPT_WRITER_QUEUE_CAPACITY, size);
54-
// no need for parallel delete here as we aren't at the scale where unified delete
55-
// is going to time out
56-
conf.setBooleanIfUnset(ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE, false);
55+
// enable parallel delete but ask for base deletion first,
56+
// which is now our recommended azure option
57+
conf.setBoolean(ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE, true);
58+
conf.setBoolean(OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST, true);
5759

5860
return conf;
5961
}

0 commit comments

Comments
 (0)