Skip to content

Commit 1025c44

Browse files
MAPREDUCE-7474. Improve Manifest committer resilience (#6716) (#6825)
Improve task commit resilience everywhere and add an option to reduce delete IO requests on job cleanup (relevant for ABFS and HDFS). Task Commit Resilience ---------------------- Task manifest saving is re-attempted on failure; the number of attempts made is configurable with the option: mapreduce.manifest.committer.manifest.save.attempts * The default is 5. * The minimum is 1; asking for less is ignored. * A retry policy adds 500ms of sleep per attempt. * Move from classic rename() to commitFile() to rename the file, after calling getFileStatus() to get its length and possibly etag. This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach the ResilientCommitByRename callbacks in abfs, which report on the outcome to the caller...which is then logged at WARN. * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file). This is only saved to the manifest on task commit retries, and provides statistics on all previous unsuccessful attempts to save the manifests + test changes to match the codepath changes, including improvements in fault injection. Directory size for deletion --------------------------- New option mapreduce.manifest.committer.cleanup.parallel.delete.base.first This attempts an initial attempt at deleting the base dir, only falling back to parallel deletes if there's a timeout. This option is disabled by default; Consider enabling it for abfs to reduce IO load. Consult the documentation for more details. Success file printing --------------------- The command to print a JSON _SUCCESS file from this committer and any S3A committer is now something which can be invoked from the mapred command: mapred successfile <path to file> Contributed by Steve Loughran
1 parent e6a3bbc commit 1025c44

34 files changed

+1438
-262
lines changed

hadoop-mapreduce-project/bin/mapred

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ function hadoop_usage
3737
hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload"
3838
hadoop_add_subcommand "version" client "print the version"
3939
hadoop_add_subcommand "minicluster" client "CLI MiniCluster"
40+
hadoop_add_subcommand "successfile" client "Print a _SUCCESS manifest from the manifest and S3A committers"
4041
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
4142
}
4243

@@ -102,6 +103,9 @@ function mapredcmd_case
102103
version)
103104
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
104105
;;
106+
successfile)
107+
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
108+
;;
105109
minicluster)
106110
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice"'/*'
107111
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/test"'/*'

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import java.io.IOException;
2222
import java.util.Objects;
2323

24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
2427
import org.apache.commons.lang3.tuple.Pair;
2528
import org.apache.hadoop.conf.Configuration;
2629
import org.apache.hadoop.fs.FileSystem;
@@ -51,6 +54,9 @@
5154
*/
5255
public final class ManifestCommitterConfig implements IOStatisticsSource {
5356

57+
private static final Logger LOG = LoggerFactory.getLogger(
58+
ManifestCommitterConfig.class);
59+
5460
/**
5561
* Final destination of work.
5662
* This is <i>unqualified</i>.
@@ -153,6 +159,12 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
153159
*/
154160
private final int writerQueueCapacity;
155161

162+
/**
163+
* How many attempts to save a task manifest by save and rename
164+
* before giving up.
165+
*/
166+
private final int saveManifestAttempts;
167+
156168
/**
157169
* Constructor.
158170
* @param outputPath destination path of the job.
@@ -198,6 +210,14 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
198210
this.writerQueueCapacity = conf.getInt(
199211
OPT_WRITER_QUEUE_CAPACITY,
200212
DEFAULT_WRITER_QUEUE_CAPACITY);
213+
int attempts = conf.getInt(OPT_MANIFEST_SAVE_ATTEMPTS,
214+
OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT);
215+
if (attempts < 1) {
216+
LOG.warn("Invalid value for {}: {}",
217+
OPT_MANIFEST_SAVE_ATTEMPTS, attempts);
218+
attempts = 1;
219+
}
220+
this.saveManifestAttempts = attempts;
201221

202222
// if constructed with a task attempt, build the task ID and path.
203223
if (context instanceof TaskAttemptContext) {
@@ -332,6 +352,10 @@ public String getName() {
332352
return name;
333353
}
334354

355+
public int getSaveManifestAttempts() {
356+
return saveManifestAttempts;
357+
}
358+
335359
/**
336360
* Get writer queue capacity.
337361
* @return the queue capacity

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ public final class ManifestCommitterConstants {
132132
* Should dir cleanup do parallel deletion of task attempt dirs
133133
* before trying to delete the toplevel dirs.
134134
* For GCS this may deliver speedup, while on ABFS it may avoid
135-
* timeouts in certain deployments.
135+
* timeouts in certain deployments, something
136+
* {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}
137+
* can alleviate.
136138
* Value: {@value}.
137139
*/
138140
public static final String OPT_CLEANUP_PARALLEL_DELETE =
@@ -143,6 +145,20 @@ public final class ManifestCommitterConstants {
143145
*/
144146
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true;
145147

148+
/**
149+
* Should parallel cleanup try to delete the base first?
150+
* Best for azure as it skips the task attempt deletions unless
151+
* the toplevel delete fails.
152+
* Value: {@value}.
153+
*/
154+
public static final String OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST =
155+
OPT_PREFIX + "cleanup.parallel.delete.base.first";
156+
157+
/**
158+
* Default value of option {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}: {@value}.
159+
*/
160+
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT = false;
161+
146162
/**
147163
* Threads to use for IO.
148164
*/
@@ -260,6 +276,19 @@ public final class ManifestCommitterConstants {
260276
*/
261277
public static final int DEFAULT_WRITER_QUEUE_CAPACITY = OPT_IO_PROCESSORS_DEFAULT;
262278

279+
/**
280+
* How many attempts to save a task manifest by save and rename
281+
* before giving up.
282+
* Value: {@value}.
283+
*/
284+
public static final String OPT_MANIFEST_SAVE_ATTEMPTS =
285+
OPT_PREFIX + "manifest.save.attempts";
286+
287+
/**
288+
* Default value of {@link #OPT_MANIFEST_SAVE_ATTEMPTS}: {@value}.
289+
*/
290+
public static final int OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT = 5;
291+
263292
private ManifestCommitterConstants() {
264293
}
265294

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterStatisticNames.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ public final class ManifestCommitterStatisticNames {
187187
public static final String OP_SAVE_TASK_MANIFEST =
188188
"task_stage_save_task_manifest";
189189

190+
/**
191+
* Save a summary file: {@value}.
192+
*/
193+
public static final String OP_SAVE_SUMMARY_FILE =
194+
"task_stage_save_summary_file";
195+
190196
/**
191197
* Task abort: {@value}.
192198
*/
@@ -259,6 +265,9 @@ public final class ManifestCommitterStatisticNames {
259265
public static final String OP_STAGE_TASK_SCAN_DIRECTORY
260266
= "task_stage_scan_directory";
261267

268+
/** Delete a directory: {@value}. */
269+
public static final String OP_DELETE_DIR = "op_delete_dir";
270+
262271
private ManifestCommitterStatisticNames() {
263272
}
264273
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestPrinter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
*/
3737
public class ManifestPrinter extends Configured implements Tool {
3838

39-
private static final String USAGE = "ManifestPrinter <success-file>";
39+
private static final String USAGE = "successfile <success-file>";
4040

4141
/**
4242
* Output for printing.
@@ -88,7 +88,7 @@ public ManifestSuccessData loadAndPrintManifest(FileSystem fs, Path path)
8888
return success;
8989
}
9090

91-
private void printManifest(ManifestSuccessData success) {
91+
public void printManifest(ManifestSuccessData success) {
9292
field("succeeded", success.getSuccess());
9393
field("created", success.getDate());
9494
field("committer", success.getCommitter());

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ private InternalConstants() {
7373
OP_CREATE_ONE_DIRECTORY,
7474
OP_DIRECTORY_SCAN,
7575
OP_DELETE,
76+
OP_DELETE_DIR,
7677
OP_DELETE_FILE_UNDER_DESTINATION,
7778
OP_GET_FILE_STATUS,
7879
OP_IS_DIRECTORY,
@@ -85,6 +86,7 @@ private InternalConstants() {
8586
OP_MSYNC,
8687
OP_PREPARE_DIR_ANCESTORS,
8788
OP_RENAME_FILE,
89+
OP_SAVE_SUMMARY_FILE,
8890
OP_SAVE_TASK_MANIFEST,
8991

9092
OBJECT_LIST_REQUEST,
@@ -127,4 +129,11 @@ private InternalConstants() {
127129
/** Schemas of filesystems we know to not work with this committer. */
128130
public static final Set<String> UNSUPPORTED_FS_SCHEMAS =
129131
ImmutableSet.of("s3a", "wasb");
132+
133+
/**
134+
* Interval in milliseconds between save retries.
135+
* Value {@value} milliseconds.
136+
*/
137+
public static final int SAVE_SLEEP_INTERVAL = 500;
138+
130139
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,35 @@ public boolean isFile(Path path) throws IOException {
9797
public abstract boolean delete(Path path, boolean recursive)
9898
throws IOException;
9999

100+
/**
101+
* Forward to {@code delete(Path, true)}
102+
* unless overridden.
103+
* <p>
104+
* If it returns without an error: there is no file at
105+
* the end of the path.
106+
* @param path path
107+
* @return outcome
108+
* @throws IOException failure.
109+
*/
110+
public boolean deleteFile(Path path)
111+
throws IOException {
112+
return delete(path, false);
113+
}
114+
115+
/**
116+
* Call {@code FileSystem#delete(Path, true)} or equivalent.
117+
* <p>
118+
* If it returns without an error: there is nothing at
119+
* the end of the path.
120+
* @param path path
121+
* @return outcome
122+
* @throws IOException failure.
123+
*/
124+
public boolean deleteRecursive(Path path)
125+
throws IOException {
126+
return delete(path, true);
127+
}
128+
100129
/**
101130
* Forward to {@link FileSystem#mkdirs(Path)}.
102131
* Usual "what does 'false' mean" ambiguity.

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ public boolean delete(Path path, boolean recursive)
108108
return fileSystem.delete(path, recursive);
109109
}
110110

111+
@Override
112+
public boolean deleteRecursive(final Path path) throws IOException {
113+
return fileSystem.delete(path, true);
114+
}
115+
111116
@Override
112117
public boolean mkdirs(Path path)
113118
throws IOException {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.hadoop.fs.Path;
2727

28+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR;
2829
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_ABORT_TASK;
2930

3031
/**
@@ -55,7 +56,11 @@ protected Path executeStage(final Boolean suppressExceptions)
5556
final Path dir = getTaskAttemptDir();
5657
if (dir != null) {
5758
LOG.info("{}: Deleting task attempt directory {}", getName(), dir);
58-
deleteDir(dir, suppressExceptions);
59+
if (suppressExceptions) {
60+
deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR);
61+
} else {
62+
deleteRecursive(dir, OP_DELETE_DIR);
63+
}
5964
}
6065
return dir;
6166
}

0 commit comments

Comments
 (0)