Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ public NetworkTopology() {
this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
}

/**
* Update a leaf node with new network location.
* @param node node to be updated;
* @param newNetworkLocations new network locations for node;
*/
public void updateNodeNetworkLocation(Node node, String newNetworkLocations) {
if (node == null) {
return;
}
if (node instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allow to update an inner node: " + NodeBase.getPath(node));
}
netlock.writeLock().lock();
try {
remove(node);
node.setNetworkLocation(newNetworkLocations);
add(node); // may throw InvalidTopologyException
} finally {
netlock.writeLock().unlock();
}
}

/** Add a leaf node
* Update node counter & rack counter if necessary
* @param node node to be added; can be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,13 @@ public void refreshNodes() throws IOException {
}
}

public void refreshTopology() throws IOException{
checkOpen();
try (TraceScope ignored = tracer.newScope("refreshTopology")) {
namenode.refreshTopology();
}
}

/**
* Dumps DFS data structures into specified file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,10 @@ public void refreshNodes() throws IOException {
dfs.refreshNodes();
}

public void refreshTopology() throws IOException{
dfs.refreshTopology();
}

/**
* Finalize previously upgraded files system state.
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,14 @@ boolean setSafeMode(HdfsConstants.SafeModeAction action, boolean isChecked)
@Idempotent
void refreshNodes() throws IOException;

/**
* Tells the namenode to refresh the network topology info
*
* @throws IOException
*/
@Idempotent
void refreshTopology() throws IOException;

/**
* Finalize previous upgrade.
* Remove file system state saved during the upgrade.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshTopologyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
Expand Down Expand Up @@ -294,6 +295,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
private final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST =
RefreshNodesRequestProto.newBuilder().build();

private final static RefreshTopologyRequestProto VOID_REFRESH_TOPOLOGY_REQUEST =
RefreshTopologyRequestProto.newBuilder().build();

private final static FinalizeUpgradeRequestProto
VOID_FINALIZE_UPGRADE_REQUEST =
FinalizeUpgradeRequestProto.newBuilder().build();
Expand Down Expand Up @@ -769,6 +773,11 @@ public void refreshNodes() throws IOException {
ipc(() -> rpcProxy.refreshNodes(null, VOID_REFRESH_NODES_REQUEST));
}

@Override
public void refreshTopology() throws IOException {
ipc(() -> rpcProxy.refreshTopology(null, VOID_REFRESH_TOPOLOGY_REQUEST));
}

@Override
public void finalizeUpgrade() throws IOException {
ipc(() -> rpcProxy.finalizeUpgrade(null, VOID_FINALIZE_UPGRADE_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ message RefreshNodesRequestProto { // no parameters
message RefreshNodesResponseProto { // void response
}

message RefreshTopologyRequestProto { // no parameters
}

message RefreshTopologyResponseProto { // void response
}

message FinalizeUpgradeRequestProto { // no parameters
}

Expand Down Expand Up @@ -947,6 +953,7 @@ service ClientNamenodeProtocol {
rpc restoreFailedStorage(RestoreFailedStorageRequestProto)
returns(RestoreFailedStorageResponseProto);
rpc refreshNodes(RefreshNodesRequestProto) returns(RefreshNodesResponseProto);
rpc refreshTopology(RefreshTopologyRequestProto) returns(RefreshTopologyResponseProto);
rpc finalizeUpgrade(FinalizeUpgradeRequestProto)
returns(FinalizeUpgradeResponseProto);
rpc upgradeStatus(UpgradeStatusRequestProto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,15 @@ public void refreshNodes() throws IOException {
rpcClient.invokeConcurrent(nss, method, true, true);
}

@Override
public void refreshTopology() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);

RemoteMethod method = new RemoteMethod("refreshTopology", new Class<?>[] {});
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, true, true);
}

@Override
public void finalizeUpgrade() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,11 @@ public void refreshNodes() throws IOException {
clientProto.refreshNodes();
}

@Override // ClientProtocol
public void refreshTopology() throws IOException {
clientProto.refreshTopology();
}

@Override // ClientProtocol
public void finalizeUpgrade() throws IOException {
clientProto.finalizeUpgrade();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshTopologyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshTopologyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
Expand Down Expand Up @@ -378,6 +380,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
private static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
RefreshNodesResponseProto.newBuilder().build();

private static final RefreshTopologyResponseProto VOID_REFRESHTOPOLOGY_RESPONSE =
RefreshTopologyResponseProto.newBuilder().build();

private static final FinalizeUpgradeResponseProto VOID_FINALIZEUPGRADE_RESPONSE =
FinalizeUpgradeResponseProto.newBuilder().build();

Expand Down Expand Up @@ -976,6 +981,16 @@ public RefreshNodesResponseProto refreshNodes(RpcController controller,

}

@Override
public RefreshTopologyResponseProto refreshTopology(RpcController controller, RefreshTopologyRequestProto request) throws ServiceException {
try {
server.refreshTopology();
return VOID_REFRESHTOPOLOGY_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
public FinalizeUpgradeResponseProto finalizeUpgrade(RpcController controller,
FinalizeUpgradeRequestProto req) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,6 +70,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1327,6 +1329,55 @@ nodes with its data cleared (or user can just remove the StorageID
}
}

/**
* refresh the network topology of this cluster based on the mapping_topology.data file.
*/
public void refreshTopology() throws IOException {
long start = Time.monotonicNow();
int datanodeNums = 0;
Set<String> storageIds = datanodeMap.keySet();
Set<String> forIterations = new HashSet<>();
List<String> datanodeIpAddrs = new ArrayList<>();
List<DatanodeDescriptor> datanodeDescriptors = new ArrayList<>();
// To avoid ConcurrentModificationException
forIterations.addAll(storageIds);

for (String storageId : forIterations) {
DatanodeDescriptor dnDescriptor = datanodeMap.get(storageId);
String ipAddr = dnDescriptor.getIpAddr();
datanodeIpAddrs.add(ipAddr);
datanodeDescriptors.add(dnDescriptor);
datanodeNums++;
}
dnsToSwitchMapping.reloadCachedMappings(datanodeIpAddrs);
List<String> rNameList = dnsToSwitchMapping.resolve(datanodeIpAddrs);

for (int i = 0; i < datanodeNums; i++) {
DatanodeDescriptor dnDescriptor = datanodeDescriptors.get(i);
String originNetwork = dnDescriptor.getNetworkLocation();
String resolvedNetwork = rNameList.get(i);

if (dnDescriptor.getNetworkLocation().equals(resolvedNetwork)) {
continue;
}
try {
networktopology.updateNodeNetworkLocation(dnDescriptor, resolvedNetwork);
} catch (Throwable e) {
LOG.error("{}.refreshTopology: update datanode: {} failed. reset from Rack: {} to Rack: {}.",
getClass().getSimpleName(), dnDescriptor, resolvedNetwork, originNetwork);
dnDescriptor.setNetworkLocation(originNetwork);
throw new IOException(getClass().getSimpleName() + ".refreshTopology: update datanode " + dnDescriptor +
" failed. reset from Rack: " + resolvedNetwork + " to Rack: " + originNetwork);
}
LOG.info("{}.refreshTopology: update datanode: {} from Rack: {} to Rack: {}.",
getClass().getSimpleName(), dnDescriptor, originNetwork, resolvedNetwork);

checkIfClusterIsNowMultiRack(dnDescriptor);
}
long end = Time.monotonicNow() - start;
LOG.info("{}.refreshTopology: costs {} ms.", getClass().getSimpleName(), end);
}

/**
* Rereads conf to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5117,6 +5117,20 @@ void refreshNodes() throws IOException {
logAuditEvent(true, operationName, null);
}

public void refreshTopology() throws IOException {
String operationName = "refreshTopology";
checkOperation(OperationCategory.UNCHECKED);
checkSuperuserPrivilege(operationName);
writeLock();
try {
checkOperation(OperationCategory.UNCHECKED);
getBlockManager().getDatanodeManager().refreshTopology();
} finally {
writeUnlock(operationName);
}
logAuditEvent(true, operationName, null);
}

void setBalancerBandwidth(long bandwidth) throws IOException {
String operationName = "setBalancerBandwidth";
checkOperation(OperationCategory.WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,12 @@ public void refreshNodes() throws IOException {
namesystem.refreshNodes();
}

@Override // ClientProtocol
public void refreshTopology() throws IOException {
checkNNStartup();
namesystem.refreshTopology();
}

@Override // NamenodeProtocol
public long getTransactionID() throws IOException {
String operationName = "getTransactionID";
Expand Down
Loading