Skip to content

Commit 2b38434

Browse files
committed
MAPREDUCE-7474. review feedback
* and use delay from retry class for sleeping Change-Id: I4f5ea48f6c22412d55ecb1bfd00c82b6cc7e4be5
1 parent cd40e7f commit 2b38434

File tree

3 files changed

+24
-16
lines changed

3 files changed

+24
-16
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.time.Duration;
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicBoolean;
26-
import java.util.concurrent.atomic.AtomicInteger;
2726
import java.util.function.Supplier;
2827

2928
import org.slf4j.Logger;
@@ -653,23 +652,28 @@ protected final <T extends AbstractManifestData> T saveManifest(
653652
final Path finalPath,
654653
String statistic) throws IOException {
655654

656-
AtomicInteger retryCount = new AtomicInteger(0);
655+
int retryCount = 0;
657656
RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
658657
getStageConfig().getManifestSaveAttempts(),
659658
SAVE_SLEEP_INTERVAL,
660659
TimeUnit.MILLISECONDS);
661660

661+
boolean success = false;
662+
T savedManifest = null;
662663
// loop until returning a value or raising an exception
663-
while (true) {
664+
while (!success) {
664665
try {
665-
T manifestData = requireNonNull(manifestSource.get());
666+
// get the latest manifest, which may include updated statistics
667+
final T manifestData = requireNonNull(manifestSource.get());
668+
LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
669+
getName(), tempPath, finalPath, retryCount);
666670
trackDurationOfInvocation(getIOStatistics(), statistic, () -> {
667-
LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
668-
getName(), tempPath, finalPath, retryCount);
669671

670672
// delete temp path.
671673
// even though this is written with overwrite=true, this extra recursive
672674
// delete also handles a directory being there.
675+
// this should not happen as no part of the commit protocol creates a directory
676+
// -this is just a little bit of due diligence.
673677
deleteRecursive(tempPath, OP_DELETE);
674678

675679
// save the temp file, overwriting any which remains from an earlier attempt
@@ -679,21 +683,22 @@ protected final <T extends AbstractManifestData> T saveManifest(
679683
// attempt or from a concurrent task commit.
680684
delete(finalPath, true, OP_DELETE);
681685

682-
// rename temp to final
686+
// rename temporary file to the final path.
683687
renameFile(tempPath, finalPath);
684688
});
685-
// success: exit and return the final manifest data.
686-
return manifestData;
689+
// success: save the manifest and declare success
690+
savedManifest = manifestData;
691+
success = true;
687692
} catch (IOException e) {
688693
// failure.
689694
// log then decide whether to sleep and retry or give up.
690695
LOG.warn("{}: Failed to save and commit file {} renamed to {}; retry count={}",
691696
getName(), tempPath, finalPath, retryCount, e);
692697
// increment that count.
693-
retryCount.incrementAndGet();
698+
retryCount++;
694699
RetryPolicy.RetryAction retryAction;
695700
try {
696-
retryAction = retryPolicy.shouldRetry(e, retryCount.get(), 0, true);
701+
retryAction = retryPolicy.shouldRetry(e, retryCount, 0, true);
697702
} catch (Exception ex) {
698703
// it's not clear why this probe can raise an exception; it is just
699704
// caught and mapped to a fail.
@@ -709,13 +714,14 @@ protected final <T extends AbstractManifestData> T saveManifest(
709714
try {
710715
LOG.info("{}: Sleeping for {} ms before retrying",
711716
getName(), retryAction.delayMillis);
712-
Thread.sleep(SAVE_SLEEP_INTERVAL);
717+
Thread.sleep(retryAction.delayMillis);
713718
} catch (InterruptedException ie) {
714719
Thread.currentThread().interrupt();
715720
}
716721
}
717722
}
718-
723+
// success: return the manifest which was saved.
724+
return savedManifest;
719725
}
720726

721727
/**
@@ -1040,7 +1046,7 @@ protected final TaskPool.Submitter getIOProcessors(int size) {
10401046
}
10411047

10421048
/**
1043-
* Delete a directory (or a file)
1049+
* Delete a directory (or a file).
10441050
* @param dir directory.
10451051
* @param statistic statistic to use
10461052
* @return true if the path is no longer present.

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ protected Result executeStage(
189189
} else {
190190
// failure: log and continue
191191
LOG.warn("{}: Exception on initial attempt at deleting base dir {}"
192-
+ " and directory count {}. Falling back to parallel delete",
192+
+ " with directory count {}. Falling back to parallel delete",
193193
getName(), baseDir, directoryCount, exception);
194194
}
195195
}
@@ -241,6 +241,9 @@ protected Result executeStage(
241241
exception = deleteOneDir(baseDir);
242242
if (exception != null) {
243243
// failure, report and continue
244+
LOG.warn("{}: Exception on final attempt at deleting base dir {}"
245+
+ " with directory count {}",
246+
getName(), baseDir, directoryCount, exception);
244247
// assume failure.
245248
outcome = Outcome.FAILURE;
246249
} else {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterTestSupport.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.hadoop.util.functional.RemoteIterators;
5757

5858
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
59-
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
6059
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_CLASSNAME;
6160
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
6261
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO.toPath;

0 commit comments

Comments
 (0)