Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3434,4 +3434,12 @@ public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
private boolean isLocatedBlocksRefresherEnabled() {
return clientContext.isLocatedBlocksRefresherEnabled();
}

public DatanodeInfo[] slowDatanodeReport() throws IOException {
checkOpen();
try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) {
return namenode.getSlowDatanodeReport();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3651,4 +3651,15 @@ public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
throws IOException {
return new FileSystemMultipartUploaderBuilder(this, basePath);
}

/**
* Retrieve stats for slow running datanodes.
*
* @return An array of slow datanode info.
* @throws IOException If an I/O error occurs.
*/
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
return dfs.slowDatanodeReport();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2318,4 +2318,14 @@ public long getUsed() throws IOException {
}
return this.vfs.getUsed();
}

@Override
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
if (this.vfs == null) {
return super.getSlowDatanodeStats();
}
checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
return defaultDFS.getSlowDatanodeStats();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1856,4 +1856,16 @@ BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
*/
@AtMostOnce
void satisfyStoragePolicy(String path) throws IOException;

/**
* Get report on all of the slow Datanodes. Slow running datanodes are identified based on
* the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster.
*
* @return Datanode report for slow running datanodes.
* @throws IOException If an I/O error occurs.
*/
@Idempotent
@ReadOnly
DatanodeInfo[] getSlowDatanodeReport() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
Expand Down Expand Up @@ -2044,6 +2045,18 @@ public void satisfyStoragePolicy(String src) throws IOException {
}
}

@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
GetSlowDatanodeReportRequestProto req =
GetSlowDatanodeReportRequestProto.newBuilder().build();
try {
return PBHelperClient.convert(
rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override
public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ message GetPreferredBlockSizeResponseProto {
required uint64 bsize = 1;
}

message GetSlowDatanodeReportRequestProto {
}

message GetSlowDatanodeReportResponseProto {
repeated DatanodeInfoProto datanodeInfoProto = 1;
}

enum SafeModeActionProto {
SAFEMODE_LEAVE = 1;
SAFEMODE_ENTER = 2;
Expand Down Expand Up @@ -1060,4 +1067,6 @@ service ClientNamenodeProtocol {
returns(SatisfyStoragePolicyResponseProto);
rpc getHAServiceState(HAServiceStateRequestProto)
returns(HAServiceStateResponseProto);
rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
returns(GetSlowDatanodeReportResponseProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public class TestReadOnly {
"getQuotaUsage",
"msync",
"getHAServiceState",
"getECTopologyResultForPolicies"
"getECTopologyResultForPolicies",
"getSlowDatanodeReport"
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1791,6 +1791,12 @@ public void satisfyStoragePolicy(String path) throws IOException {
storagePolicy.satisfyStoragePolicy(path);
}

@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
return rpcServer.getSlowDatanodeReport(true, 0);
}

@Override
public HAServiceProtocol.HAServiceState getHAServiceState() {
if (rpcServer.isSafeMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,24 +898,7 @@ public DatanodeInfo[] getDatanodeReport(
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
DatanodeInfo[] result = entry.getValue();
for (DatanodeInfo node : result) {
String nodeId = node.getXferAddr();
DatanodeInfo dn = datanodesMap.get(nodeId);
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
// Add the subcluster as a suffix to the network location
node.setNetworkLocation(
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
node.getNetworkLocation());
datanodesMap.put(nodeId, node);
} else {
LOG.debug("{} is in multiple subclusters", nodeId);
}
}
}
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
Expand Down Expand Up @@ -1358,6 +1341,11 @@ public void satisfyStoragePolicy(String path) throws IOException {
clientProto.satisfyStoragePolicy(path);
}

@Override // ClientProtocol
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
return clientProto.getSlowDatanodeReport();
}

@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize) throws IOException {
Expand Down Expand Up @@ -1757,4 +1745,52 @@ public void refreshSuperUserGroupsConfiguration() throws IOException {
public String[] getGroupsForUser(String user) throws IOException {
return routerProto.getGroupsForUser(user);
}

/**
* Get the slow running datanodes report with a timeout.
*
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
}

private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
Map<String, DatanodeInfo> datanodesMap) {
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
DatanodeInfo[] result = entry.getValue();
for (DatanodeInfo node : result) {
String nodeId = node.getXferAddr();
DatanodeInfo dn = datanodesMap.get(nodeId);
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
// Add the subcluster as a suffix to the network location
node.setNetworkLocation(
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
node.getNetworkLocation());
datanodesMap.put(nodeId, node);
} else {
LOG.debug("{} is in multiple subclusters", nodeId);
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ public void testProxyGetDatanodeReport() throws Exception {

DatanodeInfo[] combinedData =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
final Map<Integer, String> routerDNMap = new TreeMap<>();
for (DatanodeInfo dn : combinedData) {
String subcluster = dn.getNetworkLocation().split("/")[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
Expand Down Expand Up @@ -2034,4 +2036,18 @@ public HAServiceStateResponseProto getHAServiceState(
throw new ServiceException(e);
}
}

@Override
public GetSlowDatanodeReportResponseProto getSlowDatanodeReport(RpcController controller,
GetSlowDatanodeReportRequestProto request) throws ServiceException {
try {
List<? extends DatanodeInfoProto> result =
PBHelperClient.convert(server.getSlowDatanodeReport());
return GetSlowDatanodeReportResponseProto.newBuilder()
.addAllDatanodeInfoProto(result)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
import static org.apache.hadoop.util.Time.monotonicNow;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1650,7 +1651,17 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
}
return nodes;
}


public List<DatanodeDescriptor> getAllSlowDataNodes() {
if (slowPeerTracker == null) {
LOG.debug("{} is disabled. Try enabling it first to capture slow peer outliers.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return ImmutableList.of();
}
List<String> slowNodes = slowPeerTracker.getSlowNodes(getNumOfDataNodes());
return getDnDescriptorsFromIpAddr(slowNodes);
}

/**
* Checks if name resolution was successful for the given address. If IP
* address and host name are the same, then it means name resolution has
Expand Down Expand Up @@ -2133,19 +2144,26 @@ public Set<String> getSlowPeersUuidSet() {
List<String> slowNodes;
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
for (String slowNode : slowNodes) {
if (StringUtils.isBlank(slowNode)
|| !slowNode.contains(IP_PORT_SEPARATOR)) {
List<DatanodeDescriptor> datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes);
datanodeDescriptors.forEach(
datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()));
return slowPeersUuidSet;
}

private List<DatanodeDescriptor> getDnDescriptorsFromIpAddr(List<String> nodes) {
List<DatanodeDescriptor> datanodeDescriptors = new ArrayList<>();
for (String node : nodes) {
if (StringUtils.isBlank(node) || !node.contains(IP_PORT_SEPARATOR)) {
continue;
}
String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
String ipAddr = node.split(IP_PORT_SEPARATOR)[0];
DatanodeDescriptor datanodeByHost =
host2DatanodeMap.getDatanodeByHost(ipAddr);
if (datanodeByHost != null) {
slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
datanodeDescriptors.add(datanodeByHost);
}
}
return slowPeersUuidSet;
return datanodeDescriptors;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class DataNodePeerMetrics {

private final String name;

// Strictly to be used by test code only. Source code is not supposed to use this.
private Map<String, Double> testOutlier = null;

private final OutlierDetector slowNodeDetector;

Expand Down Expand Up @@ -142,14 +144,28 @@ public void collectThreadLocalStates() {
* than their peers.
*/
public Map<String, Double> getOutliers() {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(
minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);

return slowNodeDetector.getOutliers(stats);
// outlier must be null for source code.
if (testOutlier == null) {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
return slowNodeDetector.getOutliers(stats);
} else {
// this happens only for test code.
return testOutlier;
}
}

/**
* Strictly to be used by test code only. Source code is not supposed to use this. This method
* directly sets outlier mapping so that aggregate latency metrics are not calculated for tests.
*
* @param outlier outlier directly set by tests.
*/
public void setTestOutliers(Map<String, Double> outlier) {
this.testOutlier = outlier;
}

public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
Expand Down
Loading