diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java index da95c687ee71c..301b4260f35cd 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java @@ -151,6 +151,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { // if we use the nodeFile this could have been not initialized yet. if (stjp == null) { stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); + slsRunner.setStjp(stjp); } SynthJob job; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java new file mode 100644 index 0000000000000..dbded4b306e19 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.TableMapping; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; +import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; +import java.util.HashMap; +import java.util.Map; + +public class RMRunner { + private ResourceManager rm; + private String metricsOutputDir; + private Configuration conf; + private SLSRunner slsRunner; + private String tableMapping; + private Map queueAppNumMap; + + public RMRunner(Configuration conf, SLSRunner slsRunner) { + this.conf = conf; + this.slsRunner = slsRunner; + this.queueAppNumMap = new HashMap<>(); + } + + public void startRM() throws ClassNotFoundException, YarnException { + Configuration rmConf = new YarnConfiguration(conf); + String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); + + if (Class.forName(schedulerClass) == CapacityScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSCapacityScheduler.class.getName()); + rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getName()); + } else if (Class.forName(schedulerClass) == FairScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSFairScheduler.class.getName()); + } else if (Class.forName(schedulerClass) == FifoScheduler.class) { + // TODO add support for FifoScheduler + throw new YarnException("Fifo Scheduler is not supported yet."); + } + rmConf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + TableMapping.class, DNSToSwitchMapping.class); + rmConf.set( + CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, + tableMapping); + rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); + + rm = new ResourceManager() { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new MockAMLauncher(slsRunner, this.rmContext); + } + }; + + // Across runs of parametrized tests, the JvmMetrics objects is retained, + // but is not registered correctly + JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); + jvmMetrics.registerIfNeeded(); + + // Init and start the actual ResourceManager + rm.init(rmConf); + rm.start(); + } + + public void increaseQueueAppNum(String queue) throws YarnException { + SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); + String queueName = wrapper.getRealQueueName(queue); + Integer appNum = queueAppNumMap.get(queueName); + if (appNum == null) { + appNum = 1; + } else { + appNum = appNum + 1; + } + + queueAppNumMap.put(queueName, appNum); + SchedulerMetrics metrics = wrapper.getSchedulerMetrics(); + if (metrics != null) { + metrics.trackQueue(queueName); + } + } + + public void setMetricsOutputDir(String metricsOutputDir) { + this.metricsOutputDir = metricsOutputDir; + } + + public String getTableMapping() { + return tableMapping; + } + + public void setTableMapping(String tableMapping) { + this.tableMapping = tableMapping; + } + + public void stop() { + rm.stop(); + } + + public ResourceManager getRm() { + return rm; + } + + public Map getQueueAppNumMap() { + return queueAppNumMap; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 2110e3c196ca3..4d1c6714f039c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -53,11 +53,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.metrics2.source.JvmMetrics; -import org.apache.hadoop.net.DNSToSwitchMapping; -import org.apache.hadoop.net.TableMapping; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -66,24 +62,14 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; -import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; -import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; -import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.Tracker; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -109,11 +95,8 @@ @Private @Unstable public class SLSRunner extends Configured implements Tool { - // RM, Runner - private ResourceManager rm; private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private Map queueAppNumMap; private int poolSize; // NM simulator @@ -122,12 +105,10 @@ public class SLSRunner extends Configured implements Tool { private String nodeFile; // metrics - private String metricsOutputDir; private boolean printSimulation; // other simulation information private int numNMs, numRacks; - private String tableMapping; private final static Map simulateInfoMap = new HashMap<>(); @@ -136,6 +117,7 @@ public class SLSRunner extends Configured implements Tool { private static boolean exitAtTheFinish = false; private AMRunner amRunner; + private RMRunner rmRunner; /** * The type of trace in input. @@ -179,8 +161,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException { setConf(tempConf); nmMap = new ConcurrentHashMap<>(); - queueAppNumMap = new HashMap<>(); amRunner = new AMRunner(runner, this); + rmRunner = new RMRunner(tempConf, this); // runner poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, @@ -225,12 +207,12 @@ public static Map getSimulateInfoMap() { * @param inType * @param inTraces * @param nodes - * @param outDir + * @param metricsOutputDir * @param trackApps * @param printsimulation */ public void setSimulationParams(TraceType inType, String[] inTraces, - String nodes, String outDir, Set trackApps, + String nodes, String metricsOutputDir, Set trackApps, boolean printsimulation) { this.inputType = inType; @@ -240,8 +222,8 @@ public void setSimulationParams(TraceType inType, String[] inTraces, this.amRunner.setTrackedApps(trackApps); this.nodeFile = nodes; this.printSimulation = printsimulation; - metricsOutputDir = outDir; - tableMapping = outDir + "/tableMapping.csv"; + this.rmRunner.setMetricsOutputDir(metricsOutputDir); + this.rmRunner.setTableMapping(metricsOutputDir + "/tableMapping.csv"); } public void start() throws IOException, ClassNotFoundException, YarnException, @@ -250,17 +232,19 @@ public void start() throws IOException, ClassNotFoundException, YarnException, enableDNSCaching(getConf()); // start resource manager - startRM(); - amRunner.setResourceManager(rm); + rmRunner.startRM(); + amRunner.setResourceManager(rmRunner.getRm()); // start node managers startNM(); // start application masters amRunner.startAM(); + // set queue & tracked apps information - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setQueueSet(this.queueAppNumMap.keySet()); - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setTrackedAppSet(amRunner.getTrackedApps()); + SchedulerWrapper resourceScheduler = + (SchedulerWrapper) rmRunner.getRm().getResourceScheduler(); + Tracker tracker = resourceScheduler.getTracker(); + tracker.setQueueSet(rmRunner.getQueueAppNumMap().keySet()); + tracker.setTrackedAppSet(amRunner.getTrackedApps()); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING @@ -286,49 +270,6 @@ static void enableDNSCaching(Configuration conf) { } } - private void startRM() throws ClassNotFoundException, YarnException { - Configuration rmConf = new YarnConfiguration(getConf()); - String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); - - if (Class.forName(schedulerClass) == CapacityScheduler.class) { - rmConf.set(YarnConfiguration.RM_SCHEDULER, - SLSCapacityScheduler.class.getName()); - rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); - rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - ProportionalCapacityPreemptionPolicy.class.getName()); - } else if (Class.forName(schedulerClass) == FairScheduler.class) { - rmConf.set(YarnConfiguration.RM_SCHEDULER, - SLSFairScheduler.class.getName()); - } else if (Class.forName(schedulerClass) == FifoScheduler.class) { - // TODO add support for FifoScheduler - throw new YarnException("Fifo Scheduler is not supported yet."); - } - rmConf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - TableMapping.class, DNSToSwitchMapping.class); - rmConf.set( - CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, - tableMapping); - rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); - - final SLSRunner se = this; - rm = new ResourceManager() { - @Override - protected ApplicationMasterLauncher createAMLauncher() { - return new MockAMLauncher(se, this.rmContext); - } - }; - - // Across runs of parametrized tests, the JvmMetrics objects is retained, - // but is not registered correctly - JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); - jvmMetrics.registerIfNeeded(); - - // Init and start the actual ResourceManager - rm.init(rmConf); - rm.start(); - } - private void startNM() throws YarnException, IOException, InterruptedException { // nm configuration @@ -368,7 +309,7 @@ private void startNM() throws YarnException, IOException, throw new YarnException("No node! Please configure nodes."); } - SLSUtils.generateNodeTableMapping(nodeSet, tableMapping); + SLSUtils.generateNodeTableMapping(nodeSet, rmRunner.getTableMapping()); // create NM simulators Random random = new Random(); @@ -391,7 +332,7 @@ private void startNM() throws YarnException, IOException, Set nodeLabels = nodeDetails.getLabels(); nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), - heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); + heartbeatInterval, rmRunner.getRm(), resourceUtilizationRatio, nodeLabels); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -411,7 +352,7 @@ private void waitForNodesRunning() throws InterruptedException { long startTimeMS = System.currentTimeMillis(); while (true) { int numRunningNodes = 0; - for (RMNode node : rm.getRMContext().getRMNodes().values()) { + for (RMNode node : rmRunner.getRm().getRMContext().getRMNodes().values()) { if (node.getState() == NodeState.RUNNING) { numRunningNodes++; } @@ -435,21 +376,8 @@ Resource getDefaultContainerResource() { return Resources.createResource(containerMemory, containerVCores); } - void increaseQueueAppNum(String queue) throws YarnException { - SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); - String queueName = wrapper.getRealQueueName(queue); - Integer appNum = queueAppNumMap.get(queueName); - if (appNum == null) { - appNum = 1; - } else { - appNum = appNum + 1; - } - - queueAppNumMap.put(queueName, appNum); - SchedulerMetrics metrics = wrapper.getSchedulerMetrics(); - if (metrics != null) { - metrics.trackQueue(queueName); - } + public void increaseQueueAppNum(String queue) throws YarnException { + rmRunner.increaseQueueAppNum(queue); } private void printSimulationInfo() { @@ -457,6 +385,7 @@ private void printSimulationInfo() { final int numTasks = amRunner.getNumTasks(); final long maxRuntime = amRunner.getMaxRuntime(); Map amMap = amRunner.getAmMap(); + Map queueAppNumMap = rmRunner.getQueueAppNumMap(); if (printSimulation) { // node @@ -523,7 +452,7 @@ public static void exitSLSRunner() { } public void stop() throws InterruptedException { - rm.stop(); + rmRunner.stop(); runner.stop(); } @@ -696,14 +625,14 @@ public int hashCode() { } } - public ResourceManager getRm() { - return rm; - } - public SynthTraceJobProducer getStjp() { return stjp; } + public void setStjp(SynthTraceJobProducer stjp) { + this.stjp = stjp; + } + public AMSimulator getAMSimulatorByAppId(ApplicationId appId) { return amRunner.getAMSimulator(appId); }