Skip to content

Commit 2dfa928

Browse files
authored
HDFS-16521. DFS API to retrieve slow datanodes (#4107)
Signed-off-by: stack <[email protected]> Signed-off-by: Wei-Chiu Chuang <[email protected]>
1 parent d4a91bd commit 2dfa928

File tree

18 files changed

+362
-46
lines changed

18 files changed

+362
-46
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3491,4 +3491,12 @@ public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
34913491
private boolean isLocatedBlocksRefresherEnabled() {
34923492
return clientContext.isLocatedBlocksRefresherEnabled();
34933493
}
3494+
3495+
public DatanodeInfo[] slowDatanodeReport() throws IOException {
3496+
checkOpen();
3497+
try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) {
3498+
return namenode.getSlowDatanodeReport();
3499+
}
3500+
}
3501+
34943502
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3887,4 +3887,15 @@ public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
38873887
throws IOException {
38883888
return new FileSystemMultipartUploaderBuilder(this, basePath);
38893889
}
3890+
3891+
/**
3892+
* Retrieve stats for slow running datanodes.
3893+
*
3894+
* @return An array of slow datanode info.
3895+
* @throws IOException If an I/O error occurs.
3896+
*/
3897+
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
3898+
return dfs.slowDatanodeReport();
3899+
}
3900+
38903901
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,4 +2318,14 @@ public long getUsed() throws IOException {
23182318
}
23192319
return this.vfs.getUsed();
23202320
}
2321+
2322+
@Override
2323+
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
2324+
if (this.vfs == null) {
2325+
return super.getSlowDatanodeStats();
2326+
}
2327+
checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
2328+
return defaultDFS.getSlowDatanodeStats();
2329+
}
2330+
23212331
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1868,4 +1868,16 @@ BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
18681868
*/
18691869
@AtMostOnce
18701870
void satisfyStoragePolicy(String path) throws IOException;
1871+
1872+
/**
1873+
* Get report on all of the slow Datanodes. Slow running datanodes are identified based on
1874+
* the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster.
1875+
*
1876+
* @return Datanode report for slow running datanodes.
1877+
* @throws IOException If an I/O error occurs.
1878+
*/
1879+
@Idempotent
1880+
@ReadOnly
1881+
DatanodeInfo[] getSlowDatanodeReport() throws IOException;
1882+
18711883
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
144144
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
145145
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
146+
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
146147
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
147148
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
148149
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@@ -2065,6 +2066,18 @@ public void satisfyStoragePolicy(String src) throws IOException {
20652066
}
20662067
}
20672068

2069+
@Override
2070+
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
2071+
GetSlowDatanodeReportRequestProto req =
2072+
GetSlowDatanodeReportRequestProto.newBuilder().build();
2073+
try {
2074+
return PBHelperClient.convert(
2075+
rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
2076+
} catch (ServiceException e) {
2077+
throw ProtobufHelper.getRemoteException(e);
2078+
}
2079+
}
2080+
20682081
@Override
20692082
public HAServiceProtocol.HAServiceState getHAServiceState()
20702083
throws IOException {

hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,13 @@ message GetPreferredBlockSizeResponseProto {
424424
required uint64 bsize = 1;
425425
}
426426

427+
message GetSlowDatanodeReportRequestProto {
428+
}
429+
430+
message GetSlowDatanodeReportResponseProto {
431+
repeated DatanodeInfoProto datanodeInfoProto = 1;
432+
}
433+
427434
enum SafeModeActionProto {
428435
SAFEMODE_LEAVE = 1;
429436
SAFEMODE_ENTER = 2;
@@ -1070,4 +1077,6 @@ service ClientNamenodeProtocol {
10701077
returns(SatisfyStoragePolicyResponseProto);
10711078
rpc getHAServiceState(HAServiceStateRequestProto)
10721079
returns(HAServiceStateResponseProto);
1080+
rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
1081+
returns(GetSlowDatanodeReportResponseProto);
10731082
}

hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public class TestReadOnly {
7676
"getQuotaUsage",
7777
"msync",
7878
"getHAServiceState",
79-
"getECTopologyResultForPolicies"
79+
"getECTopologyResultForPolicies",
80+
"getSlowDatanodeReport"
8081
)
8182
);
8283

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,6 +1815,12 @@ public void satisfyStoragePolicy(String path) throws IOException {
18151815
storagePolicy.satisfyStoragePolicy(path);
18161816
}
18171817

1818+
@Override
1819+
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
1820+
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
1821+
return rpcServer.getSlowDatanodeReport(true, 0);
1822+
}
1823+
18181824
@Override
18191825
public HAServiceProtocol.HAServiceState getHAServiceState() {
18201826
if (rpcServer.isSafeMode()) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,24 +1095,7 @@ public DatanodeInfo[] getDatanodeReport(
10951095
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
10961096
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
10971097
timeOutMs, DatanodeInfo[].class);
1098-
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
1099-
results.entrySet()) {
1100-
FederationNamespaceInfo ns = entry.getKey();
1101-
DatanodeInfo[] result = entry.getValue();
1102-
for (DatanodeInfo node : result) {
1103-
String nodeId = node.getXferAddr();
1104-
DatanodeInfo dn = datanodesMap.get(nodeId);
1105-
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
1106-
// Add the subcluster as a suffix to the network location
1107-
node.setNetworkLocation(
1108-
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
1109-
node.getNetworkLocation());
1110-
datanodesMap.put(nodeId, node);
1111-
} else {
1112-
LOG.debug("{} is in multiple subclusters", nodeId);
1113-
}
1114-
}
1115-
}
1098+
updateDnMap(results, datanodesMap);
11161099
// Map -> Array
11171100
Collection<DatanodeInfo> datanodes = datanodesMap.values();
11181101
return toArray(datanodes, DatanodeInfo.class);
@@ -1578,6 +1561,11 @@ public void satisfyStoragePolicy(String path) throws IOException {
15781561
clientProto.satisfyStoragePolicy(path);
15791562
}
15801563

1564+
@Override // ClientProtocol
1565+
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
1566+
return clientProto.getSlowDatanodeReport();
1567+
}
1568+
15811569
@Override // NamenodeProtocol
15821570
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
15831571
long minBlockSize, long hotBlockTimeInterval) throws IOException {
@@ -1994,6 +1982,53 @@ public String refreshFairnessPolicyController() {
19941982
return rpcClient.refreshFairnessPolicyController(new Configuration());
19951983
}
19961984

1985+
/**
1986+
* Get the slow running datanodes report with a timeout.
1987+
*
1988+
* @param requireResponse If we require all the namespaces to report.
1989+
* @param timeOutMs Time out for the reply in milliseconds.
1990+
* @return List of datanodes.
1991+
* @throws IOException If it cannot get the report.
1992+
*/
1993+
public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs)
1994+
throws IOException {
1995+
checkOperation(OperationCategory.UNCHECKED);
1996+
1997+
Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
1998+
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");
1999+
2000+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
2001+
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
2002+
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
2003+
timeOutMs, DatanodeInfo[].class);
2004+
updateDnMap(results, datanodesMap);
2005+
// Map -> Array
2006+
Collection<DatanodeInfo> datanodes = datanodesMap.values();
2007+
return toArray(datanodes, DatanodeInfo.class);
2008+
}
2009+
2010+
private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
2011+
Map<String, DatanodeInfo> datanodesMap) {
2012+
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
2013+
results.entrySet()) {
2014+
FederationNamespaceInfo ns = entry.getKey();
2015+
DatanodeInfo[] result = entry.getValue();
2016+
for (DatanodeInfo node : result) {
2017+
String nodeId = node.getXferAddr();
2018+
DatanodeInfo dn = datanodesMap.get(nodeId);
2019+
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
2020+
// Add the subcluster as a suffix to the network location
2021+
node.setNetworkLocation(
2022+
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
2023+
node.getNetworkLocation());
2024+
datanodesMap.put(nodeId, node);
2025+
} else {
2026+
LOG.debug("{} is in multiple subclusters", nodeId);
2027+
}
2028+
}
2029+
}
2030+
}
2031+
19972032
/**
19982033
* Deals with loading datanode report into the cache and refresh.
19992034
*/

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,7 @@ public void testProxyGetDatanodeReport() throws Exception {
704704

705705
DatanodeInfo[] combinedData =
706706
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
707+
assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
707708
final Map<Integer, String> routerDNMap = new TreeMap<>();
708709
for (DatanodeInfo dn : combinedData) {
709710
String subcluster = dn.getNetworkLocation().split("/")[1];

0 commit comments

Comments
 (0)