From e16bd98244bc0fa2e767c2946ba0c45452c225e0 Mon Sep 17 00:00:00 2001 From: Karan Bavishi Date: Mon, 21 Nov 2016 09:21:36 -0600 Subject: [PATCH 1/3] Add support for storing link costs by invoking rack awareness script The NetworkTopology can now store costs between a pair of racks. The DatanodeManager invokes the rack awareness script with the "--cost" switch, when a new datanode is registered. This allows it to store the costs between each pair of racks. This basic cost matrix can be used later to make GDA aware decisions --- .gitignore | 4 + .../apache/hadoop/net/NetworkTopology.java | 130 +++++++++++++++++- .../blockmanagement/DatanodeManager.java | 93 +++++++++++++ .../blockmanagement/TestDatanodeManager.java | 115 ++++++++++++++++ .../src/test/resources/topology-script.sh | 14 +- 5 files changed, 353 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index a5d69d094c8cf..3c104ad89716a 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ .settings target build +*.swp # External tool builders */.externalToolBuilders @@ -31,3 +32,6 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml patchprocess/ +/bin/ +ID +tags diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index cf5b17678b0c9..05119ba10014a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -24,10 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -389,6 +389,10 @@ public boolean equals(Object to) { private int depthOfAllLeaves = -1; /** rack counter */ protected int numOfRacks = 0; + /** Known rack names */ + protected Map rackNames; + /** Link costs between racks */ + protected Map> rackCosts; /** * Whether or not this cluster has ever consisted of more than 1 rack, @@ -400,6 +404,8 @@ public boolean equals(Object to) { protected ReadWriteLock netlock = new ReentrantReadWriteLock(); public NetworkTopology() { + rackNames = new HashMap(); + rackCosts = new HashMap>(); clusterMap = new InnerNode(InnerNode.ROOT); } @@ -435,6 +441,7 @@ public void add(Node node) { LOG.info("Adding a new node: "+NodeBase.getPath(node)); if (rack == null) { incrementRacks(); + addRackName(node.getNetworkLocation()); } if (!(node instanceof InnerNode)) { if (depthOfAllLeaves == -1) { @@ -448,6 +455,63 @@ public void add(Node node) { } } + /** Set this rack's cost to other rack + * @param rack one rack location + * @param otherRack the other rack location + * @param cost the link cost between the two racks + */ + public void setRackCost(String rack, String otherRack, int cost) { + netlock.writeLock().lock(); + try { + if (!rackCosts.containsKey(rack)) { + rackCosts.put(rack, new HashMap()); + } + Map costs = rackCosts.get(rack); + costs.put(otherRack, cost); + } finally { + netlock.writeLock().unlock(); + } + } + /** Delete this rack's cost to other rack + * @param rack one rack location + * @param otherRack the other rack location + */ + public void deleteRackCost(String rack, String otherRack) { + netlock.writeLock().lock(); + try { + if (rackCosts.containsKey(rack)) { + Map costs = rackCosts.get(rack); + costs.remove(otherRack); + if (costs.size() == 0) { + rackCosts.remove(rack); + } + } + } finally { + netlock.writeLock().unlock(); + } + } + /** @return rack link cost between two nodes */ + public int getRackCost(String rack, String otherRack) { + netlock.readLock().lock(); + int cost = -1; + try { + if (rackCosts.containsKey(rack)) { + cost = rackCosts.get(rack).get(otherRack); + } + } finally { + netlock.readLock().unlock(); + } + return cost; + } + + protected void addRackName(String rackName) { + LOG.info("Adding a new rack: " + rackName); + rackNames.put(rackName, true); + } + protected void removeRackName(String rackName) { + LOG.info("Removing a new rack: " + rackName); + rackNames.remove(rackName); + } protected void incrementRacks() { numOfRacks++; if (!clusterEverBeenMultiRack && numOfRacks > 1) { @@ -510,9 +574,11 @@ public void remove(Node node) { netlock.writeLock().lock(); try { if (clusterMap.remove(node)) { - InnerNode rack = (InnerNode)getNode(node.getNetworkLocation()); + String rackName = node.getNetworkLocation(); + InnerNode rack = (InnerNode)getNode(rackName); if (rack == null) { numOfRacks--; + removeRackName(rackName); } } LOG.debug("NetworkTopology became:\n{}", this.toString()); @@ -583,6 +649,16 @@ public String getRack(String loc) { return loc; } + /** @return the total number of rack names */ + public Set getRackNames() { + netlock.readLock().lock(); + try { + return rackNames.keySet(); + } finally { + netlock.readLock().unlock(); + } + } + /** @return the total number of racks */ public int getNumOfRacks() { netlock.readLock().lock(); @@ -1030,4 +1106,54 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen) { Preconditions.checkState(idx == activeLen, "Sorted the wrong number of nodes!"); } + + /** + * Sort racks array by network distance to rack containing reader. + *

+ * In a three-level topology, a node can be either local, on the same rack, + * or on a different rack from the reader. Sorting the nodes based on network + * distance from the reader reduces network traffic and improves + * performance. + *

+ * As an additional twist, we also randomize the nodes at each network + * distance. This helps with load balancing when there is data skew. + * + * @param reader Node where data will be read + * @param racks Available racks containing replicas with the requested data + * @param activeLen Number of active nodes at the front of the array + */ + public void sortRacksByDistance(Node reader, String[] racks, int activeLen) { + /** Sort weights for the nodes array */ + int[] weights = new int[activeLen]; + String readerRack = reader.getNetworkLocation(); + for (int i=0; i> tree = new TreeMap>(); + for (int i=0; i list = tree.get(weight); + if (list == null) { + list = Lists.newArrayListWithExpectedSize(1); + tree.put(weight, list); + } + list.add(rack); + } + + int idx = 0; + for (List list: tree.values()) { + if (list != null) { + Collections.shuffle(list, r); + for (String rack: list) { + racks[idx] = rack; + idx++; + } + } + } + Preconditions.checkState(idx == activeLen, + "Sorted the wrong number of nodes!"); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index da02a9035b5a1..d743e80ccef26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -50,7 +51,10 @@ import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; @@ -65,6 +69,18 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class DatanodeManager { + + /** + * Default number of arguments: {@value} + */ + static final int DEFAULT_ARG_COUNT = + CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT; + + private final String scriptName; + // Assume a default link cost of 1 + private final int defaultLinkCost = 1; + private final int maxArgs; //max hostnames per call of the script + static final Log LOG = LogFactory.getLog(DatanodeManager.class); private final Namesystem namesystem; @@ -277,6 +293,9 @@ public class DatanodeManager { this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong( DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY, DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); + this.scriptName = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY); + this.maxArgs = conf.getInt( + DFSConfigKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY, DEFAULT_ARG_COUNT); } private static long getStaleIntervalFromConf(Configuration conf, @@ -635,6 +654,7 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) { heartbeatManager.removeDatanode(nodeInfo); blockManager.removeBlocksAssociatedTo(nodeInfo); networktopology.remove(nodeInfo); + removeRackCosts(nodeInfo); decrementVersionCount(nodeInfo.getSoftwareVersion()); blockManager.getBlockReportLeaseManager().unregister(nodeInfo); @@ -1012,6 +1032,7 @@ nodes with its data cleared (or user can just remove the StorageID getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); + addRackCosts(nodeS); resolveUpgradeDomain(nodeS); // also treat the registration message as a heartbeat @@ -1044,6 +1065,7 @@ nodes with its data cleared (or user can just remove the StorageID getNetworkDependenciesWithDefault(nodeDescr)); } networktopology.add(nodeDescr); + addRackCosts(nodeDescr); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); resolveUpgradeDomain(nodeDescr); @@ -1078,6 +1100,77 @@ nodes with its data cleared (or user can just remove the StorageID throw e; } } + /** + * Build and execute the resolution command. The command is + * executed in the directory specified by the system property + * "user.dir" if set; otherwise the current working directory is used + * @param args a list of arguments + * @return null if the number of arguments is out of range, + * or the output of the command. + */ + protected int runLinkCostCommand(List cmdList) { + if (cmdList.size() == 0) { + return defaultLinkCost; + } + File dir = null; + String userDir; + if ((userDir = System.getProperty("user.dir")) != null) { + dir = new File(userDir); + } + ShellCommandExecutor s = new ShellCommandExecutor( + cmdList.toArray(new String[cmdList.size()]), dir); + try { + s.execute(); + String output = StringUtils.strip(s.getOutput(), "\n"); + return Integer.parseInt(output); + } catch (Exception e) { + LOG.warn("Exception running link cost command " + e); + return defaultLinkCost; + } + } + + protected int getLinkCost(String rack1, String rack2) { + if (scriptName == null) { + LOG.warn("Could not find link cost script"); + return defaultLinkCost; + } + List m = new ArrayList(4); + m.add(scriptName); + m.add("--cost"); + m.add(rack1); + m.add(rack2); + return runLinkCostCommand(m); + } + + protected void addRackCosts(Node node) { + Set rackNames = getNetworkTopology().getRackNames(); + String thisRack = node.getNetworkLocation(); + LOG.debug("Add rack costs for " + thisRack); + + for (String otherRack : rackNames) { + LOG.debug("Check cost between " + thisRack + " and " + otherRack); + int cost = getLinkCost(thisRack, otherRack); + getNetworkTopology().setRackCost(thisRack, otherRack, cost); + getNetworkTopology().setRackCost(otherRack, thisRack, cost); + } + } + + protected void removeRackCosts(Node node) { + String thisRack = node.getNetworkLocation(); + Set rackNames = getNetworkTopology().getRackNames(); + if (rackNames.contains(thisRack)) { + // The node may have been deleted, but it seems like the rack contains + // other nodes. So no need to remove the link costs associated with this + // rack just yet. + return; + } + LOG.debug("Delete rack costs for " + thisRack); + + for (String otherRack : rackNames) { + getNetworkTopology().deleteRackCost(thisRack, otherRack); + getNetworkTopology().deleteRackCost(otherRack, thisRack); + } + } /** * Rereads conf to get hosts and exclude list file names. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index be8a0f06b406d..c8ad5dd9e3e95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -370,6 +371,120 @@ public void HelperFunction(String scriptFileName) is(DatanodeInfo.AdminStates.DECOMMISSIONED)); } + /** + * Execute a functional link cost script and make sure that helper + * function works correctly + * + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testGoodLinkCostScript() throws IOException, URISyntaxException { + LinkCostHelperFunction("/" + Shell.appendScriptExtension("topology-script"), + false); + } + + + /** + * Run a broken link cost script and verify that helper function is able to + * ignore the broken script and work correctly + * + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testBadLinkCostScript() throws IOException, URISyntaxException { + LinkCostHelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"), + true); + } + /** + * Helper function that tests the DatanodeManagers updateRackCosts function + * we invoke this function with and without topology scripts + * + * @param scriptFileName - Script Name or null + * + * @throws URISyntaxException + * @throws IOException + */ + public void LinkCostHelperFunction(String scriptFileName, Boolean brokenScript) + throws URISyntaxException, IOException { + // create the DatanodeManager which will be tested + Configuration conf = new Configuration(); + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + assertThat(scriptFileName != null && !scriptFileName.isEmpty(), is(true)); + URL shellScript = getClass().getResource(scriptFileName); + Path resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + DatanodeManager dm = mockDatanodeManager(fsn, conf); + + // Register 5 datanodes each in a different rack + for (int i = 0; i < 5; i++) { + // register new datanode + String uuid = "KUUID-" + i; + String ip = "KIP-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); + Mockito.when(dr.getXferPort()).thenReturn(9000); + Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); + dm.registerDatanode(dr); + } + + // Verify that we store the correct number of rack names + Set racks = dm.getNetworkTopology().getRackNames(); + int expectedRacks = brokenScript ? 1 : 5; + assertThat(racks.size(), is(expectedRacks)); + + // Verify that rack costs have been updated as expected + // We have written the working script such that it returns a value of 10 + // everytime. If the script doesn't work, we assume a default value of 1. + int expectedCost = brokenScript ? 1 : 10; + for (String rack : racks) { + for (String otherRack : racks) { + if (!brokenScript && rack.equals(otherRack)) { + assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), is(0)); + } else { + assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), + is(expectedCost)); + } + } + } + + // Remove the last datanode. + String uuid = "KUUID-4"; + String ip = "KIP-4"; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); + Mockito.when(dr.getXferPort()).thenReturn(9000); + Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); + dm.removeDatanode(dr); + + // Verify that the rack lists are updated + racks = dm.getNetworkTopology().getRackNames(); + expectedRacks = brokenScript ? 1 : 4; + assertThat(racks.size(), is(expectedRacks)); + + // Verify that rack costs have also been updated + expectedCost = brokenScript ? 1 : 10; + for (String rack : racks) { + for (String otherRack : racks) { + if (!brokenScript && rack.equals(otherRack)) { + // Cost between a rack and itself must be zero. + assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), is(0)); + } else { + assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), + is(expectedCost)); + } + } + } + } + /** * Test whether removing a host from the includes list without adding it to * the excludes list will exclude it from data node reports. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh index 2a308e72c7719..f4e5e6091d5e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh @@ -17,5 +17,17 @@ # limitations under the License. -echo $1 | awk -F'-' '{printf("/rackID-%s",$2)}' +case "$1" in + (-c|--cost) + if [ "$2" == "$3" ] ; then + echo "0" + else + echo "10" + fi + ;; + (*) + echo $1 | awk -F'-' '{printf("/rackID-%s",$2)}' + ;; +esac + From e018e117a8c0ad4c9dda73791f271755993d38ba Mon Sep 17 00:00:00 2001 From: Karan Bavishi Date: Mon, 21 Nov 2016 11:33:46 -0600 Subject: [PATCH 2/3] Added conf key in core-default.xml for setting up link cost script This allows us to configure any link particular link cost script. The earlier approach was to reuse the rack awareness script. This is much cleaner. --- .../fs/CommonConfigurationKeysPublic.java | 11 +++++ .../blockmanagement/DatanodeManager.java | 11 +++-- .../blockmanagement/TestDatanodeManager.java | 43 ++++++++++++------- .../src/test/resources/link-broken-script.sh | 23 ++++++++++ .../src/test/resources/link-script.sh | 24 +++++++++++ .../src/test/resources/topology-script.sh | 15 +------ 6 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-broken-script.sh create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-script.sh diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 0a3afb781f018..2f1fcbfef5450 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -115,6 +115,17 @@ public class CommonConfigurationKeysPublic { public static final String NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY = "net.topology.dependency.script.file.name"; + // The Link cost related keys + /** + * @see TODO: Add html link + */ + public static final String NET_LINK_SCRIPT_FILE_NAME_KEY = + "net.link.script.file.name"; + public static final String NET_LINK_SAME_RACK_PENALTY_KEY = + "net.link.samerack.penalty"; + /** Default value for NET_LINK_SAME_RACK_PENALTY_KEY */ + public static final long NET_LINK_SAME_RACK_PENALTY_DEFAULT = 5; + /** * @see * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index d743e80ccef26..630767e8c5d24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -76,7 +76,7 @@ public class DatanodeManager { static final int DEFAULT_ARG_COUNT = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT; - private final String scriptName; + private final String netLinkScriptName; // Assume a default link cost of 1 private final int defaultLinkCost = 1; private final int maxArgs; //max hostnames per call of the script @@ -293,7 +293,7 @@ public class DatanodeManager { this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong( DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY, DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); - this.scriptName = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY); + this.netLinkScriptName = conf.get(DFSConfigKeys.NET_LINK_SCRIPT_FILE_NAME_KEY); this.maxArgs = conf.getInt( DFSConfigKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY, DEFAULT_ARG_COUNT); } @@ -1130,13 +1130,12 @@ protected int runLinkCostCommand(List cmdList) { } protected int getLinkCost(String rack1, String rack2) { - if (scriptName == null) { + if (netLinkScriptName == null) { LOG.warn("Could not find link cost script"); return defaultLinkCost; } - List m = new ArrayList(4); - m.add(scriptName); - m.add("--cost"); + List m = new ArrayList(3); + m.add(netLinkScriptName); m.add(rack1); m.add(rack2); return runLinkCostCommand(m); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index c8ad5dd9e3e95..a51a66bbf5a55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -380,7 +380,7 @@ public void HelperFunction(String scriptFileName) */ @Test public void testGoodLinkCostScript() throws IOException, URISyntaxException { - LinkCostHelperFunction("/" + Shell.appendScriptExtension("topology-script"), + LinkCostHelperFunction("/" + Shell.appendScriptExtension("link-script"), false); } @@ -394,7 +394,7 @@ public void testGoodLinkCostScript() throws IOException, URISyntaxException { */ @Test public void testBadLinkCostScript() throws IOException, URISyntaxException { - LinkCostHelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"), + LinkCostHelperFunction("/"+ Shell.appendScriptExtension("link-broken-script"), true); } /** @@ -406,18 +406,32 @@ public void testBadLinkCostScript() throws IOException, URISyntaxException { * @throws URISyntaxException * @throws IOException */ - public void LinkCostHelperFunction(String scriptFileName, Boolean brokenScript) + public void LinkCostHelperFunction(String linkScriptFileName, + Boolean brokenScript) throws URISyntaxException, IOException { // create the DatanodeManager which will be tested Configuration conf = new Configuration(); FSNamesystem fsn = Mockito.mock(FSNamesystem.class); Mockito.when(fsn.hasWriteLock()).thenReturn(true); - assertThat(scriptFileName != null && !scriptFileName.isEmpty(), is(true)); - URL shellScript = getClass().getResource(scriptFileName); + + // Setup topology script so that DM can distinguish between racks + String topoScriptFileName = "/"+ Shell.appendScriptExtension("topology-script"); + assertEquals(true, topoScriptFileName != null && !topoScriptFileName.isEmpty()); + URL shellScript = getClass().getResource(topoScriptFileName); Path resourcePath = Paths.get(shellScript.toURI()); FileUtil.setExecutable(resourcePath.toFile(), true); conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, resourcePath.toString()); + + // Setup link cost script so that DM can set link costs between racks + assertEquals(true, linkScriptFileName != null && !linkScriptFileName.isEmpty()); + shellScript = getClass().getResource(linkScriptFileName); + resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + conf.set(DFSConfigKeys.NET_LINK_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + + // Start DatanodeManager DatanodeManager dm = mockDatanodeManager(fsn, conf); // Register 5 datanodes each in a different rack @@ -436,8 +450,7 @@ public void LinkCostHelperFunction(String scriptFileName, Boolean brokenScript) // Verify that we store the correct number of rack names Set racks = dm.getNetworkTopology().getRackNames(); - int expectedRacks = brokenScript ? 1 : 5; - assertThat(racks.size(), is(expectedRacks)); + Assert.assertEquals(5, racks.size()); // Verify that rack costs have been updated as expected // We have written the working script such that it returns a value of 10 @@ -446,10 +459,11 @@ public void LinkCostHelperFunction(String scriptFileName, Boolean brokenScript) for (String rack : racks) { for (String otherRack : racks) { if (!brokenScript && rack.equals(otherRack)) { - assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), is(0)); + // Cost between a rack and itself must be zero. + assertEquals(0, dm.getNetworkTopology().getRackCost(rack, otherRack)); } else { - assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), - is(expectedCost)); + assertEquals(expectedCost, + dm.getNetworkTopology().getRackCost(rack, otherRack)); } } } @@ -467,8 +481,7 @@ public void LinkCostHelperFunction(String scriptFileName, Boolean brokenScript) // Verify that the rack lists are updated racks = dm.getNetworkTopology().getRackNames(); - expectedRacks = brokenScript ? 1 : 4; - assertThat(racks.size(), is(expectedRacks)); + Assert.assertEquals(4, racks.size()); // Verify that rack costs have also been updated expectedCost = brokenScript ? 1 : 10; @@ -476,10 +489,10 @@ public void LinkCostHelperFunction(String scriptFileName, Boolean brokenScript) for (String otherRack : racks) { if (!brokenScript && rack.equals(otherRack)) { // Cost between a rack and itself must be zero. - assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), is(0)); + assertEquals(0, dm.getNetworkTopology().getRackCost(rack, otherRack)); } else { - assertThat(dm.getNetworkTopology().getRackCost(rack, otherRack), - is(expectedCost)); + assertEquals(expectedCost, + dm.getNetworkTopology().getRackCost(rack, otherRack)); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-broken-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-broken-script.sh new file mode 100644 index 0000000000000..8e5cf00587d8c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-broken-script.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## yes, this is a broken script, please don't fix this. +## This is used in a test case to verify that we can handle broken +## topology scripts. + +exit 1 + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-script.sh new file mode 100644 index 0000000000000..ab0319ed0f15c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/link-script.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +if [ "$1" == "$2" ] ; then + echo "0" +else + echo "10" +fi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh index f4e5e6091d5e9..582ac794d6f63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/topology-script.sh @@ -17,17 +17,4 @@ # limitations under the License. -case "$1" in - (-c|--cost) - if [ "$2" == "$3" ] ; then - echo "0" - else - echo "10" - fi - ;; - (*) - echo $1 | awk -F'-' '{printf("/rackID-%s",$2)}' - ;; -esac - - +echo $1 | awk -F'-' '{printf("/rackID-%s",$2)}' From 2adbf63e622a4b29b8b4ef31114be70bf8a13566 Mon Sep 17 00:00:00 2001 From: Karan Bavishi Date: Tue, 22 Nov 2016 00:08:14 -0600 Subject: [PATCH 3/3] Added support for WAN-aware EC writes - Greedy algo based on link cost --- .../fs/CommonConfigurationKeysPublic.java | 8 +- .../apache/hadoop/net/NetworkTopology.java | 22 +- ...ckPlacementPolicyRackFaultTolerantGDA.java | 242 ++++++++++++++ .../blockmanagement/DatanodeManager.java | 9 +- ...ckPlacementPolicyRackFaultTolerantGDA.java | 301 ++++++++++++++++++ .../src/test/resources/custom-link-script.sh | 20 ++ 6 files changed, 582 insertions(+), 20 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerantGDA.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerantGDA.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/resources/custom-link-script.sh diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 2f1fcbfef5450..bd15cccc6582e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -121,10 +121,16 @@ public class CommonConfigurationKeysPublic { */ public static final String NET_LINK_SCRIPT_FILE_NAME_KEY = "net.link.script.file.name"; + + public static final String NET_LINK_DEFAULT_COST_KEY = + "net.link.default.cost"; + /** Default value for NET_LINK_DEFAULT_COST_KEY */ + public static final int NET_LINK_DEFAULT_COST_DEFAULT = 1; + public static final String NET_LINK_SAME_RACK_PENALTY_KEY = "net.link.samerack.penalty"; /** Default value for NET_LINK_SAME_RACK_PENALTY_KEY */ - public static final long NET_LINK_SAME_RACK_PENALTY_DEFAULT = 5; + public static final int NET_LINK_SAME_RACK_PENALTY_DEFAULT = 5; /** * @see diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 05119ba10014a..adb7f16d0b502 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -1122,19 +1122,20 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen) { * @param racks Available racks containing replicas with the requested data * @param activeLen Number of active nodes at the front of the array */ - public void sortRacksByDistance(Node reader, String[] racks, int activeLen) { + public TreeMap> sortRacksByDistance(Node reader, + List racks) { /** Sort weights for the nodes array */ - int[] weights = new int[activeLen]; + int[] weights = new int[racks.size()]; String readerRack = reader.getNetworkLocation(); - for (int i=0; i> tree = new TreeMap>(); - for (int i=0; i list = tree.get(weight); if (list == null) { list = Lists.newArrayListWithExpectedSize(1); @@ -1142,18 +1143,11 @@ public void sortRacksByDistance(Node reader, String[] racks, int activeLen) { } list.add(rack); } - - int idx = 0; for (List list: tree.values()) { if (list != null) { Collections.shuffle(list, r); - for (String rack: list) { - racks[idx] = rack; - idx++; - } } } - Preconditions.checkState(idx == activeLen, - "Sorted the wrong number of nodes!"); + return tree; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerantGDA.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerantGDA.java new file mode 100644 index 0000000000000..c6c201e711c7f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerantGDA.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.*; + +/** + * The class is responsible for choosing the desired number of targets + * for placing block replicas in a GDA-aware fashion. + * The strategy is that it tries its best to place the replicas to most racks + * while keeping in mind the link costs associated with geo-distributed nodes. + */ +@InterfaceAudience.Private +public class BlockPlacementPolicyRackFaultTolerantGDA extends BlockPlacementPolicyDefault { + + private int sameRackPenalty; + + @Override + public void initialize(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + this.sameRackPenalty = conf.getInt( + DFSConfigKeys.NET_LINK_SAME_RACK_PENALTY_KEY, + DFSConfigKeys.NET_LINK_SAME_RACK_PENALTY_DEFAULT); + super.initialize(conf, stats, clusterMap, host2datanodeMap); + } + + @Override + protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = numOfChosen + numOfReplicas; + if (totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; + } + // No calculation needed when there is only one rack or picking one node. + int numOfRacks = clusterMap.getNumOfRacks(); + if (numOfRacks == 1 || totalNumOfReplicas <= 1) { + return new int[] {numOfReplicas, totalNumOfReplicas}; + } + // Don't set any restrictions. Let GDA decide the limit. + return new int[] {numOfReplicas, totalNumOfReplicas}; + } + + /** + * Choose numOfReplicas in order: + * TODO kbavishi - Fill this up + * @return local node of writer + */ + @Override + protected Node chooseTargetInOrder(int numOfReplicas, + Node writer, + final Set excludedNodes, + final long blocksize, + final int maxNodesPerRack, + final List results, + final boolean avoidStaleNodes, + final boolean newBlock, + EnumMap storageTypes) + throws NotEnoughReplicasException { + final int numOfResults = results.size(); + LOG.warn("Writer: " + writer + ", rack: " + writer.getNetworkLocation()); + if (numOfResults == 0) { + writer = chooseLocalStorage(writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) + .getDatanodeDescriptor(); + if (--numOfReplicas == 0) { + return writer; + } + } + chooseRandomGDA(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + return writer; + } + + /** + * TODO kbavishi - Fill this up + */ + protected DatanodeStorageInfo chooseRandomGDA(int numOfReplicas, + String scope, + Set excludedNodes, + long blocksize, + int maxNodesPerRack, + List results, + boolean avoidStaleNodes, + EnumMap storageTypes) + throws NotEnoughReplicasException { + StringBuilder builder = null; + if (LOG.isDebugEnabled()) { + } + boolean badTarget = false; + DatanodeStorageInfo firstChosen = null; + + // Assume that one replica has already been placed at local storage + final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); + + // 1. Find all racks + ArrayList racks = new ArrayList(clusterMap.getRackNames()); + + // 2. Sort all racks by distance from rack hosting DN0 + TreeMap> tree; + tree = clusterMap.sortRacksByDistance(dn0, racks); + + // 3. Greedily pick the closest racks and pawn off one replica to a randomly + // selected node on it. + while (numOfReplicas > 0) { + // Pick the rack with the least cost. + Map.Entry> treeEntry = tree.pollFirstEntry(); + int cost = treeEntry.getKey(); + List rackNames = treeEntry.getValue(); + String chosenRack = rackNames.remove(0); + LOG.warn("GDA: Picked rack " + chosenRack + " with cost " + cost); + + DatanodeDescriptor chosenNode = chooseDataNode(chosenRack, + excludedNodes); + LOG.warn("GDA: Yielded datanode " + chosenNode); + if (chosenNode == null) { + // Chosen rack is no good because it did not yield any data nodes. + // Do not consider it again for selection. + if (tree.size() == 0 && rackNames.size() == 0) { + // No more racks left. Quit + break; + } else if (rackNames.size() == 0) { + // No racks exist at the same cost. Don't add back cost entry + } else { + // Other racks exist with the same cost. Add them back to the tree. + tree.put(cost, rackNames); + } + // Try again + continue; + + } else { + // Chosen rack generated a datanode. Reconsider it for selection after + // updating with the same rack penalty. + int updatedCost = cost + this.sameRackPenalty; + List list = tree.get(updatedCost); + if (list == null) { + list = Lists.newArrayListWithExpectedSize(1); + tree.put(updatedCost, list); + } + list.add(chosenRack); + + if (rackNames.size() == 0) { + // No racks exist at the previous cost. Don't add back cost entry + } else { + // Other racks exist with the same cost. Add them back to the tree. + tree.put(cost, rackNames); + } + } + + Preconditions.checkState(excludedNodes.add(chosenNode), "chosenNode " + + chosenNode + " is already in excludedNodes " + excludedNodes); + + DatanodeStorageInfo storage = null; + if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad, + results, avoidStaleNodes)) { + for (Iterator> iter = storageTypes + .entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = iter.next(); + storage = chooseStorage4Block( + chosenNode, blocksize, results, entry.getKey()); + if (storage != null) { + numOfReplicas--; + LOG.warn("GDA: Finally chosen node " + chosenNode); + if (firstChosen == null) { + firstChosen = storage; + } + + // add node (subclasses may also add related nodes) to excludedNode + addToExcludedNodes(chosenNode, excludedNodes); + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + break; + } + } + // If no candidate storage was found on this DN then set badTarget. + badTarget = (storage == null); + } + } + if (numOfReplicas > 0) { + throw new NotEnoughReplicasException("Could not find enough GDA replicas"); + } + return firstChosen; + } + + @Override + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, + int numberOfReplicas) { + if (locs == null) + locs = DatanodeDescriptor.EMPTY_ARRAY; + if (!clusterMap.hasClusterEverBeenMultiRack()) { + // only one rack + return new BlockPlacementStatusDefault(1, 1, 1); + } + // 1. Check that all locations are different. + // 2. Count locations on different racks. + Set racks = new TreeSet<>(); + for (DatanodeInfo dn : locs) { + racks.add(dn.getNetworkLocation()); + } + return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas, + clusterMap.getNumOfRacks()); + } + + @Override + protected Collection pickupReplicaSet( + Collection moreThanOne, + Collection exactlyOne, + Map> rackMap) { + return moreThanOne.isEmpty() ? exactlyOne : moreThanOne; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 630767e8c5d24..7c9ae82c3fe55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -77,9 +77,7 @@ public class DatanodeManager { CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT; private final String netLinkScriptName; - // Assume a default link cost of 1 - private final int defaultLinkCost = 1; - private final int maxArgs; //max hostnames per call of the script + private final int defaultLinkCost; static final Log LOG = LogFactory.getLog(DatanodeManager.class); @@ -294,8 +292,9 @@ public class DatanodeManager { DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY, DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); this.netLinkScriptName = conf.get(DFSConfigKeys.NET_LINK_SCRIPT_FILE_NAME_KEY); - this.maxArgs = conf.getInt( - DFSConfigKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY, DEFAULT_ARG_COUNT); + this.defaultLinkCost = conf.getInt( + DFSConfigKeys.NET_LINK_DEFAULT_COST_KEY, + DFSConfigKeys.NET_LINK_DEFAULT_COST_DEFAULT); } private static long getStaleIntervalFromConf(Configuration conf, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerantGDA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerantGDA.java new file mode 100644 index 0000000000000..e4d4f8d64854f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerantGDA.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerantGDA; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.net.StaticMapping; +import org.apache.hadoop.util.Shell; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.*; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBlockPlacementPolicyRackFaultTolerantGDA { + + private static final int DEFAULT_BLOCK_SIZE = 1024; + private MiniDFSCluster cluster = null; + private NamenodeProtocols nameNodeRpc = null; + private FSNamesystem namesystem = null; + private PermissionStatus perm = null; + + public Map> defineLinkCosts(int[][] costs) { + Map> linkCosts = + new HashMap>(); + + for (int i = 0; i < costs.length; i++) { + String rack_i = "/rack" + i; + linkCosts.put(rack_i, new HashMap()); + Map costs_i = linkCosts.get(rack_i); + for (int j = 0; j < costs.length; j++) { + String rack_j = "/rack" + j; + costs_i.put(rack_j, costs[i][j]); + } + } + return linkCosts; + } + + public void createLinkCostScript(File src, int[][] costs) throws IOException { + Map> linkCosts = defineLinkCosts(costs); + PrintWriter writer = new PrintWriter(src); + writer.println("#!/bin/bash"); + try { + for (String rack1 : linkCosts.keySet()) { + Map otherRacks = linkCosts.get(rack1); + for (String rack2 : otherRacks.keySet()) { + int cost = otherRacks.get(rack2); + writer.format( + "if [[ \"$1\" == \"%s\" && \"$2\" == \"%s\" ]]; then echo \"%d\"; fi\n", + rack1, rack2, cost); + } + } + } finally { + writer.close(); + } + } + + private MiniDFSCluster testGDASetup(int numRacks, + int numHostsPerRack, int[][] costs) throws URISyntaxException, IOException { + StaticMapping.resetMap(); + Configuration conf = new HdfsConfiguration(); + final ArrayList rackList = new ArrayList(); + final ArrayList hostList = new ArrayList(); + for (int i = 0; i < numRacks; i++) { + for (int j = 0; j < numHostsPerRack; j++) { + rackList.add("/rack" + i); + hostList.add("/host" + i + j); + } + } + + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerantGDA.class, + BlockPlacementPolicy.class); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); + + // Setup link cost script. + String scriptFileName = "/" + + Shell.appendScriptExtension("custom-link-script"); + assertEquals(true, scriptFileName != null && !scriptFileName.isEmpty()); + URL shellScript = getClass().getResource(scriptFileName); + Path resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + FileUtil.setWritable(resourcePath.toFile(), true); + + // Manually update the script with link cost logic + createLinkCostScript(resourcePath.toFile(), costs); + conf.set(DFSConfigKeys.NET_LINK_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(hostList.size()) + .racks(rackList.toArray(new String[rackList.size()])) + .hosts(hostList.toArray(new String[hostList.size()])) + .build(); + cluster.waitActive(); + nameNodeRpc = cluster.getNameNodeRpc(); + namesystem = cluster.getNamesystem(); + perm = new PermissionStatus("TestBlockPlacementPolicyEC", null, + FsPermission.getDefault()); + + return cluster; + } + + private void doTestGDA(String filename, int numRacks, short[][] testSuite, + int[][] expectedReplication, int[][] expectedAdditionalReplication, + String clientMachine) throws Exception { + // Test 5 files + int fileCount = 0; + for (int i = 0; i < 5; i++) { + int idx = 0; + for (short[] testCase : testSuite) { + short replication = testCase[0]; + short additionalReplication = testCase[1]; + String src = "/" + filename + (fileCount++); + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, + clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, + replication, DEFAULT_BLOCK_SIZE, null, false); + + //test chooseTarget for new file + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, + null, null, fileStatus.getFileId(), null, null); + testBlockLocations(replication, locatedBlock, + expectedReplication, idx, numRacks); + + //test chooseTarget for existing file. + LocatedBlock additionalLocatedBlock = + nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(), + locatedBlock.getBlock(), locatedBlock.getLocations(), + locatedBlock.getStorageIDs(), new DatanodeInfo[0], + additionalReplication, clientMachine); + testBlockLocations(replication + additionalReplication, + additionalLocatedBlock, + expectedAdditionalReplication, idx, numRacks); + idx++; + } + } + } + + @Test + public void doTestGDASimple1() throws Exception { + // Simple GDA testcase. 3 racks, 2 hosts per rack + // Rack0, Rack1 are in the same DC. Rack2 is in another DC + int numRacks = 3; + int numHostsPerRack = 2; + int[][] costs = { + {0, 1, 10}, + {1, 0, 10}, + {10, 10, 0}, + }; + // Setup + MiniDFSCluster cluster = testGDASetup(numRacks, numHostsPerRack, costs); + + String clientMachine = "/host00"; + short[][] testSuite = { + {3,2}, {3,1}, {4,1}, {4,2}, + }; + int[][] expectedReplication = { + {2, 1, 0}, + {2, 1, 0}, + {2, 2, 0}, + {2, 2, 0}, + }; + int[][] expectedAdditionalReplication = { + {2, 2, 1}, + {2, 2, 0}, + {2, 2, 1}, + {2, 2, 2}, + }; + doTestGDA("testfile", numRacks, testSuite, expectedReplication, + expectedAdditionalReplication, clientMachine); + + // Cleanup + cluster.shutdown(); + } + + @Test + public void doTestGDASimple2() throws Exception { + // Simple GDA testcase. 4 racks, 2 hosts per rack + // Rack0, Rack1 are in one DC. Rack2, Rack3 are in another DC + int numRacks = 4; + int numHostsPerRack = 2; + int[][] costs = { + {0, 1, 10, 10}, + {1, 0, 10, 10}, + {10, 10, 0, 1}, + {10, 10, 1, 0}, + }; + // Setup + MiniDFSCluster cluster = testGDASetup(numRacks, numHostsPerRack, costs); + + // Test when client is on a host on Rack0. It should try to stick to Rack0 + // and Rack1 as much as possible + String clientMachine1 = "/host00"; + // XXX-kbavishi Can't pick testcases where there is a chance of + // randomization kicking in. How do we fix this? + short[][] testSuite = { + {2,2}, {3,1}, {4,4}, + }; + int[][] expectedReplication1 = { + {2, 0, 0, 0}, + {2, 1, 0, 0}, + {2, 2, 0, 0}, + }; + int[][] expectedAdditionalReplication1 = { + {2, 2, 0, 0}, + {2, 2, 0, 0}, + {2, 2, 2, 2}, + }; + doTestGDA("testfile1", numRacks, testSuite, expectedReplication1, + expectedAdditionalReplication1, clientMachine1); + + // Test when client is on a host on Rack2. It should try to stick to Rack2 + // and Rack3 as much as possible + String clientMachine2 = "/host20"; + int[][] expectedReplication2 = { + {0, 0, 2, 0}, + {0, 0, 2, 1}, + {0, 0, 2, 2}, + }; + int[][] expectedAdditionalReplication2 = { + {0, 0, 2, 2}, + {0, 0, 2, 2}, + {2, 2, 2, 2}, + }; + doTestGDA("testfile2", numRacks, testSuite, expectedReplication2, + expectedAdditionalReplication2, clientMachine2); + + // Cleanup + cluster.shutdown(); + } + + private void testBlockLocations(int replication, LocatedBlock locatedBlock, + int[][] expected, int idx, int numRacks) { + assertEquals(replication, locatedBlock.getLocations().length); + + HashMap racksCount = new HashMap(); + for (DatanodeInfo node : locatedBlock.getLocations()) { + addToRacksCount(node.getNetworkLocation(), racksCount); + } + + for (int i = 0; i < numRacks; i++) { + assertEquals(expected[idx][i], getRacksCount(racksCount, "/rack" + i)); + } + } + + private void addToRacksCount(String rack, HashMap racksCount) { + Integer count = racksCount.get(rack); + if (count == null) { + racksCount.put(rack, 1); + } else { + racksCount.put(rack, count + 1); + } + } + private int getRacksCount(HashMap racksCount, String rack) { + if (racksCount.containsKey(rack)) { + return racksCount.get(rack); + } else { + return 0; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/custom-link-script.sh b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/custom-link-script.sh new file mode 100644 index 0000000000000..c6cd28a39e64c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/custom-link-script.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Empty link script. We expect the test to overwrite this and then use it +exit 1