Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

public class AMRunner {
private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
static int REMAINING_APPS = 0;
static int remainingApps = 0;

private final Configuration conf;
private int AM_ID;
Expand All @@ -63,8 +63,8 @@ public class AMRunner {
private Map<String, Class> amClassMap;
private TraceType inputType;
private String[] inputTraces;
private TaskRunner runner;
private SLSRunner slsRunner;
private final TaskRunner runner;
private final SLSRunner slsRunner;
private int numAMs, numTasks;
private long maxRuntime;
private ResourceManager rm;
Expand All @@ -81,8 +81,8 @@ public void init(Configuration conf) throws ClassNotFoundException {
amClassMap = new HashMap<>();
appIdAMSim = new ConcurrentHashMap<>();
// <AMType, Class> map
for (Map.Entry e : conf) {
String key = e.getKey().toString();
for (Map.Entry<String, String> e : conf) {
String key = e.getKey();
if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
amClassMap.put(amType, Class.forName(conf.get(key)));
Expand Down Expand Up @@ -112,7 +112,7 @@ public void startAM() throws YarnException, IOException {
}

numAMs = amMap.size();
REMAINING_APPS = numAMs;
remainingApps = numAMs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ public static ReservationSubmissionRequest createMRReservation(
deadline, reservationRequests, name);

// outermost request
ReservationSubmissionRequest request = ReservationSubmissionRequest
return ReservationSubmissionRequest
.newInstance(resDef, queueName, reservationId);

return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class RumenToSLSConverter {
private static Map<String, Set<String>> rackNodeMap =
new TreeMap<String, Set<String>>();

public static void main(String args[]) throws Exception {
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addOption("input", true, "input rumen json file");
options.addOption("outputJobs", true, "output jobs file");
Expand Down Expand Up @@ -121,9 +122,10 @@ public static void main(String args[]) throws Exception {
private static void generateSLSLoadFile(String inputFile, String outputFile)
throws IOException {
try (Reader input =
new InputStreamReader(new FileInputStream(inputFile), "UTF-8")) {
new InputStreamReader(new FileInputStream(inputFile),
StandardCharsets.UTF_8)) {
try (Writer output =
new OutputStreamWriter(new FileOutputStream(outputFile), "UTF-8")) {
new OutputStreamWriter(new FileOutputStream(outputFile), StandardCharsets.UTF_8)) {
ObjectMapper mapper = new ObjectMapper();
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
Iterator<Map> i = mapper.readValues(
Expand All @@ -140,7 +142,7 @@ private static void generateSLSLoadFile(String inputFile, String outputFile)
private static void generateSLSNodeFile(String outputFile)
throws IOException {
try (Writer output =
new OutputStreamWriter(new FileOutputStream(outputFile), "UTF-8")) {
new OutputStreamWriter(new FileOutputStream(outputFile), StandardCharsets.UTF_8)) {
ObjectMapper mapper = new ObjectMapper();
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
for (Map.Entry<String, Set<String>> entry : rackNodeMap.entrySet()) {
Expand Down Expand Up @@ -218,7 +220,7 @@ private static List createSLSTasks(String taskType,
task.put("container.priority", priority);
task.put("container.type", taskType);
array.add(task);
String rackHost[] = SLSUtils.getRackHostName(hostname);
String[] rackHost = SLSUtils.getRackHostName(hostname);
if (rackNodeMap.containsKey(rackHost[0])) {
rackNodeMap.get(rackHost[0]).add(rackHost[1]);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@
package org.apache.hadoop.yarn.sls;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.security.Security;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
Expand All @@ -38,10 +33,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
Expand Down Expand Up @@ -73,20 +64,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.security.Security;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Private
@Unstable
public class SLSRunner extends Configured implements Tool {
private static TaskRunner runner = new TaskRunner();
private static final TaskRunner runner = new TaskRunner();
private String[] inputTraces;

// metrics
Expand All @@ -103,6 +84,7 @@ public class SLSRunner extends Configured implements Tool {
private RMRunner rmRunner;
private NMRunner nmRunner;

private TraceType inputType;
private SynthTraceJobProducer stjp;

/**
Expand All @@ -117,7 +99,7 @@ public enum TraceType {
"networkaddress.cache.negative.ttl";

public static int getRemainingApps() {
return AMRunner.REMAINING_APPS;
return AMRunner.remainingApps;
}

public SLSRunner() throws ClassNotFoundException, YarnException {
Expand Down Expand Up @@ -175,31 +157,32 @@ public static Map<String, Object> getSimulateInfoMap() {

/**
* This is invoked before start.
* @param inType
* @param inTraces
* @param nodes
* @param metricsOutputDir
* @param trackApps
* @param printsimulation
* @param inputType The trace type
* @param inTraces Input traces
* @param nodes The node file
* @param metricsOutputDir Output dir for metrics
* @param trackApps Track these applications
* @param printSimulation Whether to print the simulation
*/
public void setSimulationParams(TraceType inType, String[] inTraces,
public void setSimulationParams(TraceType inputType, String[] inTraces,
String nodes, String metricsOutputDir, Set<String> trackApps,
boolean printsimulation) throws YarnException {
boolean printSimulation) throws YarnException {
this.inputType = inputType;
this.inputTraces = inTraces.clone();
this.amRunner.setInputType(inType);
this.amRunner.setInputType(inputType);
this.amRunner.setInputTraces(this.inputTraces);
this.amRunner.setTrackedApps(trackApps);
this.nmRunner.setNodeFile(nodes);
this.nmRunner.setInputType(inType);
this.nmRunner.setInputType(inputType);
this.nmRunner.setInputTraces(this.inputTraces);
this.printSimulation = printsimulation;
this.printSimulation = printSimulation;
this.rmRunner.setMetricsOutputDir(metricsOutputDir);
String tableMapping = metricsOutputDir + "/tableMapping.csv";
this.rmRunner.setTableMapping(tableMapping);
this.nmRunner.setTableMapping(tableMapping);

//We need this.inputTraces to set before creating SynthTraceJobProducer
if (inType == TraceType.SYNTH) {
if (inputType == TraceType.SYNTH) {
this.stjp = getSynthJobTraceProducer();
}
}
Expand Down Expand Up @@ -319,8 +302,8 @@ public Map<NodeId, NMSimulator> getNmMap() {
}

public static void decreaseRemainingApps() {
AMRunner.REMAINING_APPS--;
if (AMRunner.REMAINING_APPS == 0) {
AMRunner.remainingApps--;
if (AMRunner.remainingApps == 0) {
exitSLSRunner();
}
}
Expand Down Expand Up @@ -359,24 +342,15 @@ public int run(final String[] argv) throws IOException, InterruptedException,
CommandLineParser parser = new GnuParser();
CommandLine cmd = parser.parse(options, argv);

String traceType = null;
String traceLocation = null;

// compatibility with old commandline
if (cmd.hasOption("inputrumen")) {
traceType = "RUMEN";
traceLocation = cmd.getOptionValue("inputrumen");
}
if (cmd.hasOption("inputsls")) {
traceType = "SLS";
traceLocation = cmd.getOptionValue("inputsls");
}

if (cmd.hasOption("tracetype")) {
traceType = cmd.getOptionValue("tracetype");
traceLocation = cmd.getOptionValue("tracelocation");
}

boolean hasInputRumenOption = cmd.hasOption("inputrumen");
boolean hasInputSlsOption = cmd.hasOption("inputsls");
boolean hasTraceTypeOption = cmd.hasOption("tracetype");
TraceType traceType = determineTraceType(cmd, hasInputRumenOption,
hasInputSlsOption, hasTraceTypeOption);
String traceLocation = determineTraceLocation(cmd, hasInputRumenOption,
hasInputSlsOption, hasTraceTypeOption);

String output = cmd.getOptionValue("output");

File outputFile = new File(output);
Expand All @@ -396,32 +370,58 @@ public int run(final String[] argv) throws IOException, InterruptedException,
String tempNodeFile =
cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";

TraceType tempTraceType;
switch (traceType) {
case "SLS":
tempTraceType = TraceType.SLS;
break;
case "RUMEN":
tempTraceType = TraceType.RUMEN;
break;
case "SYNTH":
tempTraceType = TraceType.SYNTH;
break;
default:
printUsage();
throw new YarnException("Misconfigured input");
}

String[] inputFiles = traceLocation.split(",");

setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output,
setSimulationParams(traceType, inputFiles, tempNodeFile, output,
trackedJobSet, cmd.hasOption("printsimulation"));

start();

return 0;
}

private TraceType determineTraceType(CommandLine cmd, boolean hasInputRumenOption,
boolean hasInputSlsOption, boolean hasTraceTypeOption) throws YarnException {
String traceType = null;
if (hasInputRumenOption) {
traceType = "RUMEN";
}
if (hasInputSlsOption) {
traceType = "SLS";
}
if (hasTraceTypeOption) {
traceType = cmd.getOptionValue("tracetype");
}
if (traceType == null) {
throw new YarnException("Misconfigured input");
}
switch (traceType) {
case "SLS":
return TraceType.SLS;
case "RUMEN":
return TraceType.RUMEN;
case "SYNTH":
return TraceType.SYNTH;
default:
printUsage();
throw new YarnException("Misconfigured input");
}
}

private String determineTraceLocation(CommandLine cmd, boolean hasInputRumenOption,
boolean hasInputSlsOption, boolean hasTraceTypeOption) throws YarnException {
if (hasInputRumenOption) {
return cmd.getOptionValue("inputrumen");
}
if (hasInputSlsOption) {
return cmd.getOptionValue("inputsls");
}
if (hasTraceTypeOption) {
return cmd.getOptionValue("tracelocation");
}
throw new YarnException("Misconfigured input! ");
}

public static void main(String[] argv) throws Exception {
exitAtTheFinish = true;
ToolRunner.run(new Configuration(), new SLSRunner(), argv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
heartBeatInterval);
// create resource
String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr);
String[] rackHostName = SLSUtils.getRackHostName(nodeIdStr);
this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
Resources.clone(nodeResource));
this.rm = pRm;
Expand Down Expand Up @@ -128,7 +128,7 @@ public void firstStep() {
@Override
public void middleStep() throws Exception {
// we check the lifetime for each running containers
ContainerSimulator cs = null;
ContainerSimulator cs;
synchronized(completedContainerList) {
while ((cs = containerQueue.poll()) != null) {
runningContainers.remove(cs.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,8 @@ public static RMNode newNodeInfo(String rackName, String hostName,
final Resource resource, int port) {
final NodeId nodeId = newNodeID(hostName, port);
final String nodeAddr = hostName + ":" + port;
final String httpAddress = hostName;

return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress,

return new FakeRMNodeImpl(nodeId, nodeAddr, hostName,
resource, rackName, "Me good",
port, hostName, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;

import java.util.Map;

public class MockAMLauncher extends ApplicationMasterLauncher
implements EventHandler<AMLauncherEvent> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class SLSSchedulerCommons {
private final Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<>();
private final Tracker tracker;

public SLSSchedulerCommons(AbstractYarnScheduler scheduler) {
public SLSSchedulerCommons(AbstractYarnScheduler<?, ?> scheduler) {
this.scheduler = scheduler;
this.tracker = new Tracker();
}
Expand Down Expand Up @@ -174,7 +174,7 @@ private void updateQueueWithAllocateRequest(Allocation allocation,
}
}
// containers released/preemption from scheduler
Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
Set<ContainerId> preemptionContainers = new HashSet<>();
if (allocation.getContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getContainerPreemptions());
}
Expand Down Expand Up @@ -277,7 +277,7 @@ public void handle(SchedulerEvent schedulerEvent) {
AppAttemptAddedSchedulerEvent appAddEvent =
(AppAttemptAddedSchedulerEvent) schedulerEvent;
SchedulerApplication app =
(SchedulerApplication) scheduler.getSchedulerApplications()
scheduler.getSchedulerApplications()
.get(appAddEvent.getApplicationAttemptId().getApplicationId());
appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
.getQueueName());
Expand Down
Loading