Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
.settings
target
build
*.swp

# External tool builders
*/.externalToolBuilders
Expand All @@ -31,3 +32,6 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml
hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml
patchprocess/
/bin/
ID
tags
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ public class CommonConfigurationKeysPublic {
public static final String NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY =
"net.topology.dependency.script.file.name";

// The Link cost related keys
/**
* @see TODO: Add html link
*/
public static final String NET_LINK_SCRIPT_FILE_NAME_KEY =
"net.link.script.file.name";

public static final String NET_LINK_DEFAULT_COST_KEY =
"net.link.default.cost";
/** Default value for NET_LINK_DEFAULT_COST_KEY */
public static final int NET_LINK_DEFAULT_COST_DEFAULT = 1;

public static final String NET_LINK_SAME_RACK_PENALTY_KEY =
"net.link.samerack.penalty";
/** Default value for NET_LINK_SAME_RACK_PENALTY_KEY */
public static final int NET_LINK_SAME_RACK_PENALTY_DEFAULT = 5;

/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -389,6 +389,10 @@ public boolean equals(Object to) {
private int depthOfAllLeaves = -1;
/** rack counter */
protected int numOfRacks = 0;
/** Known rack names */
protected Map<String, Boolean> rackNames;
/** Link costs between racks */
protected Map<String, Map<String, Integer>> rackCosts;

/**
* Whether or not this cluster has ever consisted of more than 1 rack,
Expand All @@ -400,6 +404,8 @@ public boolean equals(Object to) {
protected ReadWriteLock netlock = new ReentrantReadWriteLock();

public NetworkTopology() {
rackNames = new HashMap<String, Boolean>();
rackCosts = new HashMap<String, Map<String, Integer>>();
clusterMap = new InnerNode(InnerNode.ROOT);
}

Expand Down Expand Up @@ -435,6 +441,7 @@ public void add(Node node) {
LOG.info("Adding a new node: "+NodeBase.getPath(node));
if (rack == null) {
incrementRacks();
addRackName(node.getNetworkLocation());
}
if (!(node instanceof InnerNode)) {
if (depthOfAllLeaves == -1) {
Expand All @@ -448,6 +455,63 @@ public void add(Node node) {
}
}

/** Set this rack's cost to other rack
* @param rack one rack location
* @param otherRack the other rack location
* @param cost the link cost between the two racks
*/
public void setRackCost(String rack, String otherRack, int cost) {
netlock.writeLock().lock();
try {
if (!rackCosts.containsKey(rack)) {
rackCosts.put(rack, new HashMap<String, Integer>());
}
Map<String, Integer> costs = rackCosts.get(rack);
costs.put(otherRack, cost);
} finally {
netlock.writeLock().unlock();
}
}
/** Delete this rack's cost to other rack
* @param rack one rack location
* @param otherRack the other rack location
*/
public void deleteRackCost(String rack, String otherRack) {
netlock.writeLock().lock();
try {
if (rackCosts.containsKey(rack)) {
Map<String, Integer> costs = rackCosts.get(rack);
costs.remove(otherRack);
if (costs.size() == 0) {
rackCosts.remove(rack);
}
}
} finally {
netlock.writeLock().unlock();
}
}
/** @return rack link cost between two nodes */
public int getRackCost(String rack, String otherRack) {
netlock.readLock().lock();
int cost = -1;
try {
if (rackCosts.containsKey(rack)) {
cost = rackCosts.get(rack).get(otherRack);
}
} finally {
netlock.readLock().unlock();
}
return cost;
}

protected void addRackName(String rackName) {
LOG.info("Adding a new rack: " + rackName);
rackNames.put(rackName, true);
}
protected void removeRackName(String rackName) {
LOG.info("Removing a new rack: " + rackName);
rackNames.remove(rackName);
}
protected void incrementRacks() {
numOfRacks++;
if (!clusterEverBeenMultiRack && numOfRacks > 1) {
Expand Down Expand Up @@ -510,9 +574,11 @@ public void remove(Node node) {
netlock.writeLock().lock();
try {
if (clusterMap.remove(node)) {
InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
String rackName = node.getNetworkLocation();
InnerNode rack = (InnerNode)getNode(rackName);
if (rack == null) {
numOfRacks--;
removeRackName(rackName);
}
}
LOG.debug("NetworkTopology became:\n{}", this.toString());
Expand Down Expand Up @@ -583,6 +649,16 @@ public String getRack(String loc) {
return loc;
}

/** @return the total number of rack names */
public Set<String> getRackNames() {
netlock.readLock().lock();
try {
return rackNames.keySet();
} finally {
netlock.readLock().unlock();
}
}

/** @return the total number of racks */
public int getNumOfRacks() {
netlock.readLock().lock();
Expand Down Expand Up @@ -1030,4 +1106,48 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
Preconditions.checkState(idx == activeLen,
"Sorted the wrong number of nodes!");
}

/**
* Sort racks array by network distance to rack containing <i>reader</i>.
* <p/>
* In a three-level topology, a node can be either local, on the same rack,
* or on a different rack from the reader. Sorting the nodes based on network
* distance from the reader reduces network traffic and improves
* performance.
* <p/>
* As an additional twist, we also randomize the nodes at each network
* distance. This helps with load balancing when there is data skew.
*
* @param reader Node where data will be read
* @param racks Available racks containing replicas with the requested data
* @param activeLen Number of active nodes at the front of the array
*/
public TreeMap<Integer, List<String>> sortRacksByDistance(Node reader,
List<String> racks) {
/** Sort weights for the nodes array */
int[] weights = new int[racks.size()];
String readerRack = reader.getNetworkLocation();
for (int i = 0; i < racks.size(); i++) {
weights[i] = getRackCost(readerRack, racks.get(i));
}
// Add weight/rack pairs to a TreeMap to sort
// NOTE - TreeMap keys are sorted in their natural order
TreeMap<Integer, List<String>> tree = new TreeMap<Integer, List<String>>();
for (int i = 0; i < racks.size(); i++) {
int weight = weights[i];
String rack = racks.get(i);
List<String> list = tree.get(weight);
if (list == null) {
list = Lists.newArrayListWithExpectedSize(1);
tree.put(weight, list);
}
list.add(rack);
}
for (List<String> list: tree.values()) {
if (list != null) {
Collections.shuffle(list, r);
}
}
return tree;
}
}
Loading