diff --git a/.gitignore b/.gitignore index a5d69d094c8cf..3c104ad89716a 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ .settings target build +*.swp # External tool builders */.externalToolBuilders @@ -31,3 +32,6 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml patchprocess/ +/bin/ +ID +tags diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 0a3afb781f018..bd15cccc6582e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -115,6 +115,23 @@ public class CommonConfigurationKeysPublic { public static final String NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY = "net.topology.dependency.script.file.name"; + // The Link cost related keys + /** + * @see TODO: Add html link + */ + public static final String NET_LINK_SCRIPT_FILE_NAME_KEY = + "net.link.script.file.name"; + + public static final String NET_LINK_DEFAULT_COST_KEY = + "net.link.default.cost"; + /** Default value for NET_LINK_DEFAULT_COST_KEY */ + public static final int NET_LINK_DEFAULT_COST_DEFAULT = 1; + + public static final String NET_LINK_SAME_RACK_PENALTY_KEY = + "net.link.samerack.penalty"; + /** Default value for NET_LINK_SAME_RACK_PENALTY_KEY */ + public static final int NET_LINK_SAME_RACK_PENALTY_DEFAULT = 5; + /** * @see * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index cf5b17678b0c9..adb7f16d0b502 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -24,10 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -389,6 +389,10 @@ public boolean equals(Object to) { private int depthOfAllLeaves = -1; /** rack counter */ protected int numOfRacks = 0; + /** Known rack names */ + protected Map rackNames; + /** Link costs between racks */ + protected Map> rackCosts; /** * Whether or not this cluster has ever consisted of more than 1 rack, @@ -400,6 +404,8 @@ public boolean equals(Object to) { protected ReadWriteLock netlock = new ReentrantReadWriteLock(); public NetworkTopology() { + rackNames = new HashMap(); + rackCosts = new HashMap>(); clusterMap = new InnerNode(InnerNode.ROOT); } @@ -435,6 +441,7 @@ public void add(Node node) { LOG.info("Adding a new node: "+NodeBase.getPath(node)); if (rack == null) { incrementRacks(); + addRackName(node.getNetworkLocation()); } if (!(node instanceof InnerNode)) { if (depthOfAllLeaves == -1) { @@ -448,6 +455,63 @@ public void add(Node node) { } } + /** Set this rack's cost to other rack + * @param rack one rack location + * @param otherRack the other rack location + * @param cost the link cost between the two racks + */ + public void setRackCost(String rack, String otherRack, int cost) { + netlock.writeLock().lock(); + try { + if (!rackCosts.containsKey(rack)) { + rackCosts.put(rack, new HashMap()); + } + Map costs = rackCosts.get(rack); + costs.put(otherRack, cost); + } finally { + netlock.writeLock().unlock(); + } + } + /** Delete this rack's cost to other rack + * @param rack one rack location + * @param otherRack the other rack location + */ + public void deleteRackCost(String rack, String otherRack) { + netlock.writeLock().lock(); + try { + if (rackCosts.containsKey(rack)) { + Map costs = rackCosts.get(rack); + costs.remove(otherRack); + if (costs.size() == 0) { + rackCosts.remove(rack); + } + } + } finally { + netlock.writeLock().unlock(); + } + } + /** @return rack link cost between two nodes */ + public int getRackCost(String rack, String otherRack) { + netlock.readLock().lock(); + int cost = -1; + try { + if (rackCosts.containsKey(rack)) { + cost = rackCosts.get(rack).get(otherRack); + } + } finally { + netlock.readLock().unlock(); + } + return cost; + } + + protected void addRackName(String rackName) { + LOG.info("Adding a new rack: " + rackName); + rackNames.put(rackName, true); + } + protected void removeRackName(String rackName) { + LOG.info("Removing a new rack: " + rackName); + rackNames.remove(rackName); + } protected void incrementRacks() { numOfRacks++; if (!clusterEverBeenMultiRack && numOfRacks > 1) { @@ -510,9 +574,11 @@ public void remove(Node node) { netlock.writeLock().lock(); try { if (clusterMap.remove(node)) { - InnerNode rack = (InnerNode)getNode(node.getNetworkLocation()); + String rackName = node.getNetworkLocation(); + InnerNode rack = (InnerNode)getNode(rackName); if (rack == null) { numOfRacks--; + removeRackName(rackName); } } LOG.debug("NetworkTopology became:\n{}", this.toString()); @@ -583,6 +649,16 @@ public String getRack(String loc) { return loc; } + /** @return the total number of rack names */ + public Set getRackNames() { + netlock.readLock().lock(); + try { + return rackNames.keySet(); + } finally { + netlock.readLock().unlock(); + } + } + /** @return the total number of racks */ public int getNumOfRacks() { netlock.readLock().lock(); @@ -1030,4 +1106,48 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen) { Preconditions.checkState(idx == activeLen, "Sorted the wrong number of nodes!"); } + + /** + * Sort racks array by network distance to rack containing reader. + *

+ * In a three-level topology, a node can be either local, on the same rack, + * or on a different rack from the reader. Sorting the nodes based on network + * distance from the reader reduces network traffic and improves + * performance. + *

+ * As an additional twist, we also randomize the nodes at each network + * distance. This helps with load balancing when there is data skew. + * + * @param reader Node where data will be read + * @param racks Available racks containing replicas with the requested data + * @param activeLen Number of active nodes at the front of the array + */ + public TreeMap> sortRacksByDistance(Node reader, + List racks) { + /** Sort weights for the nodes array */ + int[] weights = new int[racks.size()]; + String readerRack = reader.getNetworkLocation(); + for (int i = 0; i < racks.size(); i++) { + weights[i] = getRackCost(readerRack, racks.get(i)); + } + // Add weight/rack pairs to a TreeMap to sort + // NOTE - TreeMap keys are sorted in their natural order + TreeMap> tree = new TreeMap>(); + for (int i = 0; i < racks.size(); i++) { + int weight = weights[i]; + String rack = racks.get(i); + List list = tree.get(weight); + if (list == null) { + list = Lists.newArrayListWithExpectedSize(1); + tree.put(weight, list); + } + list.add(rack); + } + for (List list: tree.values()) { + if (list != null) { + Collections.shuffle(list, r); + } + } + return tree; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerantGDA.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerantGDA.java new file mode 100644 index 0000000000000..c6c201e711c7f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerantGDA.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.*; + +/** + * The class is responsible for choosing the desired number of targets + * for placing block replicas in a GDA-aware fashion. + * The strategy is that it tries its best to place the replicas to most racks + * while keeping in mind the link costs associated with geo-distributed nodes. + */ +@InterfaceAudience.Private +public class BlockPlacementPolicyRackFaultTolerantGDA extends BlockPlacementPolicyDefault { + + private int sameRackPenalty; + + @Override + public void initialize(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + this.sameRackPenalty = conf.getInt( + DFSConfigKeys.NET_LINK_SAME_RACK_PENALTY_KEY, + DFSConfigKeys.NET_LINK_SAME_RACK_PENALTY_DEFAULT); + super.initialize(conf, stats, clusterMap, host2datanodeMap); + } + + @Override + protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = numOfChosen + numOfReplicas; + if (totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; + } + // No calculation needed when there is only one rack or picking one node. + int numOfRacks = clusterMap.getNumOfRacks(); + if (numOfRacks == 1 || totalNumOfReplicas <= 1) { + return new int[] {numOfReplicas, totalNumOfReplicas}; + } + // Don't set any restrictions. Let GDA decide the limit. + return new int[] {numOfReplicas, totalNumOfReplicas}; + } + + /** + * Choose numOfReplicas in order: + * TODO kbavishi - Fill this up + * @return local node of writer + */ + @Override + protected Node chooseTargetInOrder(int numOfReplicas, + Node writer, + final Set excludedNodes, + final long blocksize, + final int maxNodesPerRack, + final List results, + final boolean avoidStaleNodes, + final boolean newBlock, + EnumMap storageTypes) + throws NotEnoughReplicasException { + final int numOfResults = results.size(); + LOG.warn("Writer: " + writer + ", rack: " + writer.getNetworkLocation()); + if (numOfResults == 0) { + writer = chooseLocalStorage(writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) + .getDatanodeDescriptor(); + if (--numOfReplicas == 0) { + return writer; + } + } + chooseRandomGDA(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + return writer; + } + + /** + * TODO kbavishi - Fill this up + */ + protected DatanodeStorageInfo chooseRandomGDA(int numOfReplicas, + String scope, + Set excludedNodes, + long blocksize, + int maxNodesPerRack, + List results, + boolean avoidStaleNodes, + EnumMap storageTypes) + throws NotEnoughReplicasException { + StringBuilder builder = null; + if (LOG.isDebugEnabled()) { + } + boolean badTarget = false; + DatanodeStorageInfo firstChosen = null; + + // Assume that one replica has already been placed at local storage + final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); + + // 1. Find all racks + ArrayList racks = new ArrayList(clusterMap.getRackNames()); + + // 2. Sort all racks by distance from rack hosting DN0 + TreeMap> tree; + tree = clusterMap.sortRacksByDistance(dn0, racks); + + // 3. Greedily pick the closest racks and pawn off one replica to a randomly + // selected node on it. + while (numOfReplicas > 0) { + // Pick the rack with the least cost. + Map.Entry> treeEntry = tree.pollFirstEntry(); + int cost = treeEntry.getKey(); + List rackNames = treeEntry.getValue(); + String chosenRack = rackNames.remove(0); + LOG.warn("GDA: Picked rack " + chosenRack + " with cost " + cost); + + DatanodeDescriptor chosenNode = chooseDataNode(chosenRack, + excludedNodes); + LOG.warn("GDA: Yielded datanode " + chosenNode); + if (chosenNode == null) { + // Chosen rack is no good because it did not yield any data nodes. + // Do not consider it again for selection. + if (tree.size() == 0 && rackNames.size() == 0) { + // No more racks left. Quit + break; + } else if (rackNames.size() == 0) { + // No racks exist at the same cost. Don't add back cost entry + } else { + // Other racks exist with the same cost. Add them back to the tree. + tree.put(cost, rackNames); + } + // Try again + continue; + + } else { + // Chosen rack generated a datanode. Reconsider it for selection after + // updating with the same rack penalty. + int updatedCost = cost + this.sameRackPenalty; + List list = tree.get(updatedCost); + if (list == null) { + list = Lists.newArrayListWithExpectedSize(1); + tree.put(updatedCost, list); + } + list.add(chosenRack); + + if (rackNames.size() == 0) { + // No racks exist at the previous cost. Don't add back cost entry + } else { + // Other racks exist with the same cost. Add them back to the tree. + tree.put(cost, rackNames); + } + } + + Preconditions.checkState(excludedNodes.add(chosenNode), "chosenNode " + + chosenNode + " is already in excludedNodes " + excludedNodes); + + DatanodeStorageInfo storage = null; + if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad, + results, avoidStaleNodes)) { + for (Iterator> iter = storageTypes + .entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = iter.next(); + storage = chooseStorage4Block( + chosenNode, blocksize, results, entry.getKey()); + if (storage != null) { + numOfReplicas--; + LOG.warn("GDA: Finally chosen node " + chosenNode); + if (firstChosen == null) { + firstChosen = storage; + } + + // add node (subclasses may also add related nodes) to excludedNode + addToExcludedNodes(chosenNode, excludedNodes); + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + break; + } + } + // If no candidate storage was found on this DN then set badTarget. + badTarget = (storage == null); + } + } + if (numOfReplicas > 0) { + throw new NotEnoughReplicasException("Could not find enough GDA replicas"); + } + return firstChosen; + } + + @Override + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, + int numberOfReplicas) { + if (locs == null) + locs = DatanodeDescriptor.EMPTY_ARRAY; + if (!clusterMap.hasClusterEverBeenMultiRack()) { + // only one rack + return new BlockPlacementStatusDefault(1, 1, 1); + } + // 1. Check that all locations are different. + // 2. Count locations on different racks. + Set racks = new TreeSet<>(); + for (DatanodeInfo dn : locs) { + racks.add(dn.getNetworkLocation()); + } + return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas, + clusterMap.getNumOfRacks()); + } + + @Override + protected Collection pickupReplicaSet( + Collection moreThanOne, + Collection exactlyOne, + Map> rackMap) { + return moreThanOne.isEmpty() ? exactlyOne : moreThanOne; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index da02a9035b5a1..7c9ae82c3fe55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -50,7 +51,10 @@ import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; @@ -65,6 +69,16 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class DatanodeManager { + + /** + * Default number of arguments: {@value} + */ + static final int DEFAULT_ARG_COUNT = + CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT; + + private final String netLinkScriptName; + private final int defaultLinkCost; + static final Log LOG = LogFactory.getLog(DatanodeManager.class); private final Namesystem namesystem; @@ -277,6 +291,10 @@ public class DatanodeManager { this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong( DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY, DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); + this.netLinkScriptName = conf.get(DFSConfigKeys.NET_LINK_SCRIPT_FILE_NAME_KEY); + this.defaultLinkCost = conf.getInt( + DFSConfigKeys.NET_LINK_DEFAULT_COST_KEY, + DFSConfigKeys.NET_LINK_DEFAULT_COST_DEFAULT); } private static long getStaleIntervalFromConf(Configuration conf, @@ -635,6 +653,7 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) { heartbeatManager.removeDatanode(nodeInfo); blockManager.removeBlocksAssociatedTo(nodeInfo); networktopology.remove(nodeInfo); + removeRackCosts(nodeInfo); decrementVersionCount(nodeInfo.getSoftwareVersion()); blockManager.getBlockReportLeaseManager().unregister(nodeInfo); @@ -1012,6 +1031,7 @@ nodes with its data cleared (or user can just remove the StorageID getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); + addRackCosts(nodeS); resolveUpgradeDomain(nodeS); // also treat the registration message as a heartbeat @@ -1044,6 +1064,7 @@ nodes with its data cleared (or user can just remove the StorageID getNetworkDependenciesWithDefault(nodeDescr)); } networktopology.add(nodeDescr); + addRackCosts(nodeDescr); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); resolveUpgradeDomain(nodeDescr); @@ -1078,6 +1099,76 @@ nodes with its data cleared (or user can just remove the StorageID throw e; } } + /** + * Build and execute the resolution command. The command is + * executed in the directory specified by the system property + * "user.dir" if set; otherwise the current working directory is used + * @param args a list of arguments + * @return null if the number of arguments is out of range, + * or the output of the command. + */ + protected int runLinkCostCommand(List cmdList) { + if (cmdList.size() == 0) { + return defaultLinkCost; + } + File dir = null; + String userDir; + if ((userDir = System.getProperty("user.dir")) != null) { + dir = new File(userDir); + } + ShellCommandExecutor s = new ShellCommandExecutor( + cmdList.toArray(new String[cmdList.size()]), dir); + try { + s.execute(); + String output = StringUtils.strip(s.getOutput(), "\n"); + return Integer.parseInt(output); + } catch (Exception e) { + LOG.warn("Exception running link cost command " + e); + return defaultLinkCost; + } + } + + protected int getLinkCost(String rack1, String rack2) { + if (netLinkScriptName == null) { + LOG.warn("Could not find link cost script"); + return defaultLinkCost; + } + List m = new ArrayList(3); + m.add(netLinkScriptName); + m.add(rack1); + m.add(rack2); + return runLinkCostCommand(m); + } + + protected void addRackCosts(Node node) { + Set rackNames = getNetworkTopology().getRackNames(); + String thisRack = node.getNetworkLocation(); + LOG.debug("Add rack costs for " + thisRack); + + for (String otherRack : rackNames) { + LOG.debug("Check cost between " + thisRack + " and " + otherRack); + int cost = getLinkCost(thisRack, otherRack); + getNetworkTopology().setRackCost(thisRack, otherRack, cost); + getNetworkTopology().setRackCost(otherRack, thisRack, cost); + } + } + + protected void removeRackCosts(Node node) { + String thisRack = node.getNetworkLocation(); + Set rackNames = getNetworkTopology().getRackNames(); + if (rackNames.contains(thisRack)) { + // The node may have been deleted, but it seems like the rack contains + // other nodes. So no need to remove the link costs associated with this + // rack just yet. + return; + } + LOG.debug("Delete rack costs for " + thisRack); + + for (String otherRack : rackNames) { + getNetworkTopology().deleteRackCost(thisRack, otherRack); + getNetworkTopology().deleteRackCost(otherRack, thisRack); + } + } /** * Rereads conf to get hosts and exclude list file names. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index be8a0f06b406d..a51a66bbf5a55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -370,6 +371,133 @@ public void HelperFunction(String scriptFileName) is(DatanodeInfo.AdminStates.DECOMMISSIONED)); } + /** + * Execute a functional link cost script and make sure that helper + * function works correctly + * + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testGoodLinkCostScript() throws IOException, URISyntaxException { + LinkCostHelperFunction("/" + Shell.appendScriptExtension("link-script"), + false); + } + + + /** + * Run a broken link cost script and verify that helper function is able to + * ignore the broken script and work correctly + * + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testBadLinkCostScript() throws IOException, URISyntaxException { + LinkCostHelperFunction("/"+ Shell.appendScriptExtension("link-broken-script"), + true); + } + /** + * Helper function that tests the DatanodeManagers updateRackCosts function + * we invoke this function with and without topology scripts + * + * @param scriptFileName - Script Name or null + * + * @throws URISyntaxException + * @throws IOException + */ + public void LinkCostHelperFunction(String linkScriptFileName, + Boolean brokenScript) + throws URISyntaxException, IOException { + // create the DatanodeManager which will be tested + Configuration conf = new Configuration(); + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + + // Setup topology script so that DM can distinguish between racks + String topoScriptFileName = "/"+ Shell.appendScriptExtension("topology-script"); + assertEquals(true, topoScriptFileName != null && !topoScriptFileName.isEmpty()); + URL shellScript = getClass().getResource(topoScriptFileName); + Path resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + + // Setup link cost script so that DM can set link costs between racks + assertEquals(true, linkScriptFileName != null && !linkScriptFileName.isEmpty()); + shellScript = getClass().getResource(linkScriptFileName); + resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + conf.set(DFSConfigKeys.NET_LINK_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + + // Start DatanodeManager + DatanodeManager dm = mockDatanodeManager(fsn, conf); + + // Register 5 datanodes each in a different rack + for (int i = 0; i < 5; i++) { + // register new datanode + String uuid = "KUUID-" + i; + String ip = "KIP-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); + Mockito.when(dr.getXferPort()).thenReturn(9000); + Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); + dm.registerDatanode(dr); + } + + // Verify that we store the correct number of rack names + Set racks = dm.getNetworkTopology().getRackNames(); + Assert.assertEquals(5, racks.size()); + + // Verify that rack costs have been updated as expected + // We have written the working script such that it returns a value of 10 + // everytime. If the script doesn't work, we assume a default value of 1. + int expectedCost = brokenScript ? 1 : 10; + for (String rack : racks) { + for (String otherRack : racks) { + if (!brokenScript && rack.equals(otherRack)) { + // Cost between a rack and itself must be zero. + assertEquals(0, dm.getNetworkTopology().getRackCost(rack, otherRack)); + } else { + assertEquals(expectedCost, + dm.getNetworkTopology().getRackCost(rack, otherRack)); + } + } + } + + // Remove the last datanode. + String uuid = "KUUID-4"; + String ip = "KIP-4"; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); + Mockito.when(dr.getXferPort()).thenReturn(9000); + Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); + dm.removeDatanode(dr); + + // Verify that the rack lists are updated + racks = dm.getNetworkTopology().getRackNames(); + Assert.assertEquals(4, racks.size()); + + // Verify that rack costs have also been updated + expectedCost = brokenScript ? 1 : 10; + for (String rack : racks) { + for (String otherRack : racks) { + if (!brokenScript && rack.equals(otherRack)) { + // Cost between a rack and itself must be zero. + assertEquals(0, dm.getNetworkTopology().getRackCost(rack, otherRack)); + } else { + assertEquals(expectedCost, + dm.getNetworkTopology().getRackCost(rack, otherRack)); + } + } + } + } + /** * Test whether removing a host from the includes list without adding it to * the excludes list will exclude it from data node reports. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerantGDA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerantGDA.java new file mode 100644 index 0000000000000..e4d4f8d64854f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerantGDA.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerantGDA; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.net.StaticMapping; +import org.apache.hadoop.util.Shell; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.*; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBlockPlacementPolicyRackFaultTolerantGDA { + + private static final int DEFAULT_BLOCK_SIZE = 1024; + private MiniDFSCluster cluster = null; + private NamenodeProtocols nameNodeRpc = null; + private FSNamesystem namesystem = null; + private PermissionStatus perm = null; + + public Map> defineLinkCosts(int[][] costs) { + Map> linkCosts = + new HashMap>(); + + for (int i = 0; i < costs.length; i++) { + String rack_i = "/rack" + i; + linkCosts.put(rack_i, new HashMap()); + Map costs_i = linkCosts.get(rack_i); + for (int j = 0; j < costs.length; j++) { + String rack_j = "/rack" + j; + costs_i.put(rack_j, costs[i][j]); + } + } + return linkCosts; + } + + public void createLinkCostScript(File src, int[][] costs) throws IOException { + Map> linkCosts = defineLinkCosts(costs); + PrintWriter writer = new PrintWriter(src); + writer.println("#!/bin/bash"); + try { + for (String rack1 : linkCosts.keySet()) { + Map otherRacks = linkCosts.get(rack1); + for (String rack2 : otherRacks.keySet()) { + int cost = otherRacks.get(rack2); + writer.format( + "if [[ \"$1\" == \"%s\" && \"$2\" == \"%s\" ]]; then echo \"%d\"; fi\n", + rack1, rack2, cost); + } + } + } finally { + writer.close(); + } + } + + private MiniDFSCluster testGDASetup(int numRacks, + int numHostsPerRack, int[][] costs) throws URISyntaxException, IOException { + StaticMapping.resetMap(); + Configuration conf = new HdfsConfiguration(); + final ArrayList rackList = new ArrayList(); + final ArrayList hostList = new ArrayList(); + for (int i = 0; i < numRacks; i++) { + for (int j = 0; j < numHostsPerRack; j++) { + rackList.add("/rack" + i); + hostList.add("/host" + i + j); + } + } + + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerantGDA.class, + BlockPlacementPolicy.class); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); + + // Setup link cost script. + String scriptFileName = "/" + + Shell.appendScriptExtension("custom-link-script"); + assertEquals(true, scriptFileName != null && !scriptFileName.isEmpty()); + URL shellScript = getClass().getResource(scriptFileName); + Path resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + FileUtil.setWritable(resourcePath.toFile(), true); + + // Manually update the script with link cost logic + createLinkCostScript(resourcePath.toFile(), costs); + conf.set(DFSConfigKeys.NET_LINK_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(hostList.size()) + .racks(rackList.toArray(new String[rackList.size()])) + .hosts(hostList.toArray(new String[hostList.size()])) + .build(); + cluster.waitActive(); + nameNodeRpc = cluster.getNameNodeRpc(); + namesystem = cluster.getNamesystem(); + perm = new PermissionStatus("TestBlockPlacementPolicyEC", null, + FsPermission.getDefault()); + + return cluster; + } + + private void doTestGDA(String filename, int numRacks, short[][] testSuite, + int[][] expectedReplication, int[][] expectedAdditionalReplication, + String clientMachine) throws Exception { + // Test 5 files + int fileCount = 0; + for (int i = 0; i < 5; i++) { + int idx = 0; + for (short[] testCase : testSuite) { + short replication = testCase[0]; + short additionalReplication = testCase[1]; + String src = "/" + filename + (fileCount++); + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, + clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, + replication, DEFAULT_BLOCK_SIZE, null, false); + + //test chooseTarget for new file + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, + null, null, fileStatus.getFileId(), null, null); + testBlockLocations(replication, locatedBlock, + expectedReplication, idx, numRacks); + + //test chooseTarget for existing file. + LocatedBlock additionalLocatedBlock = + nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(), + locatedBlock.getBlock(), locatedBlock.getLocations(), + locatedBlock.getStorageIDs(), new DatanodeInfo[0], + additionalReplication, clientMachine); + testBlockLocations(replication + additionalReplication, + additionalLocatedBlock, + expectedAdditionalReplication, idx, numRacks); + idx++; + } + } + } + + @Test + public void doTestGDASimple1() throws Exception { + // Simple GDA testcase. 3 racks, 2 hosts per rack + // Rack0, Rack1 are in the same DC. Rack2 is in another DC + int numRacks = 3; + int numHostsPerRack = 2; + int[][] costs = { + {0, 1, 10}, + {1, 0, 10}, + {10, 10, 0}, + }; + // Setup + MiniDFSCluster cluster = testGDASetup(numRacks, numHostsPerRack, costs); + + String clientMachine = "/host00"; + short[][] testSuite = { + {3,2}, {3,1}, {4,1}, {4,2}, + }; + int[][] expectedReplication = { + {2, 1, 0}, + {2, 1, 0}, + {2, 2, 0}, + {2, 2, 0}, + }; + int[][] expectedAdditionalReplication = { + {2, 2, 1}, + {2, 2, 0}, + {2, 2, 1}, + {2, 2, 2}, + }; + doTestGDA("testfile", numRacks, testSuite, expectedReplication, + expectedAdditionalReplication, clientMachine); + + // Cleanup + cluster.shutdown(); + } + + @Test + public void doTestGDASimple2() throws Exception { + // Simple GDA testcase. 4 racks, 2 hosts per rack + // Rack0, Rack1 are in one DC. Rack2, Rack3 are in another DC + int numRacks = 4; + int numHostsPerRack = 2; + int[][] costs = { + {0, 1, 10, 10}, + {1, 0, 10, 10}, + {10, 10, 0, 1}, + {10, 10, 1, 0}, + }; + // Setup + MiniDFSCluster cluster = testGDASetup(numRacks, numHostsPerRack, costs); + + // Test when client is on a host on Rack0. It should try to stick to Rack0 + // and Rack1 as much as possible + String clientMachine1 = "/host00"; + // XXX-kbavishi Can't pick testcases where there is a chance of + // randomization kicking in. How do we fix this? + short[][] testSuite = { + {2,2}, {3,1}, {4,4}, + }; + int[][] expectedReplication1 = { + {2, 0, 0, 0}, + {2, 1, 0, 0}, + {2, 2, 0, 0}, + }; + int[][] expectedAdditionalReplication1 = { + {2, 2, 0, 0}, + {2, 2, 0, 0}, + {2, 2, 2, 2}, + }; + doTestGDA("testfile1", numRacks, testSuite, expectedReplication1, + expectedAdditionalReplication1, clientMachine1); + + // Test when client is on a host on Rack2. It should try to stick to Rack2 + // and Rack3 as much as possible + String clientMachine2 = "/host20"; + int[][] expectedReplication2 = { + {0, 0, 2, 0}, + {0, 0, 2, 1}, + {0, 0, 2, 2}, + }; + int[][] expectedAdditionalReplication2 = { + {0, 0, 2, 2}, + {0, 0, 2, 2}, + {2, 2, 2, 2}, + }; + doTestGDA("testfile2", numRacks, testSuite, expectedReplication2, + expectedAdditionalReplication2, clientMachine2); + + // Cleanup + cluster.shutdown(); + } + + private void testBlockLocations(int replication, LocatedBlock locatedBlock, + int[][] expected, int idx, int numRacks) { + assertEquals(replication, locatedBlock.getLocations().length); + + HashMap racksCount = new HashMap(); + for (DatanodeInfo node : locatedBlock.getLocations()) { + addToRacksCount(node.getNetworkLocation(), racksCount); + } + + for (int i = 0; i < numRacks; i++) { + assertEquals(expected[idx][i], getRacksCount(racksCount, "/rack" + i)); + } + } + + private void addToRacksCount(String rack, HashMap racksCount) { + Integer count = racksCount.get(rack); + if (count == null) { + racksCount.put(rack, 1); + } else { + racksCount.put(rack, count + 1); + } + } + private int getRacksCount(HashMap racksCount, String rack) { + if (racksCount.containsKey(rack)) { + return racksCount.get(rack); + } else { + return 0; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/custom-link-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/custom-link-script.sh new file mode 100644 index 0000000000000..c6cd28a39e64c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/custom-link-script.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Empty link script. We expect the test to overwrite this and then use it +exit 1 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-broken-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-broken-script.sh new file mode 100644 index 0000000000000..8e5cf00587d8c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-broken-script.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## yes, this is a broken script, please don't fix this. +## This is used in a test case to verify that we can handle broken +## topology scripts. + +exit 1 + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-script.sh new file mode 100644 index 0000000000000..ab0319ed0f15c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-script.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +if [ "$1" == "$2" ] ; then + echo "0" +else + echo "10" +fi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh index 2a308e72c7719..582ac794d6f63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh @@ -18,4 +18,3 @@ echo $1 | awk -F'-' '{printf("/rackID-%s",$2)}' -