diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 956f5bbe519d4..756b273c8ac1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1101,7 +1101,12 @@ public PipelineAck.ECN getECN() { } double load = ManagementFactory.getOperatingSystemMXBean() .getSystemLoadAverage(); - return load > NUM_CORES * congestionRatio ? PipelineAck.ECN.CONGESTED : + double threshold = NUM_CORES * congestionRatio; + + if (load > threshold || DataNodeFaultInjector.get().mockCongestedForTest()) { + metrics.incrCongestedCount(); + } + return load > threshold ? PipelineAck.ECN.CONGESTED : PipelineAck.ECN.SUPPORTED; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 7b116d9e566f3..4c022535118bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -162,4 +162,8 @@ public void markSlow(String dnAddr, int[] replies) {} * Just delay delete replica a while. */ public void delayDeleteReplica() {} + + public boolean mockCongestedForTest() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 77e6dab067b9d..398af6df51186 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -210,6 +210,8 @@ public class DataNodeMetrics { private MutableCounterLong replaceBlockOpOnSameMount; @Metric("Number of replaceBlock ops to another node") private MutableCounterLong replaceBlockOpToOtherHost; + @Metric("Number of congested count") + private MutableCounterLong congestedCount; final MetricsRegistry registry = new MetricsRegistry("datanode"); @Metric("Milliseconds spent on calling NN rpc") @@ -807,4 +809,7 @@ public void incrReplaceBlockOpToOtherHost() { replaceBlockOpToOtherHost.incr(); } + public void incrCongestedCount() { + congestedCount.incr(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 3a0b5238360a2..9806cb7f3164d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -42,9 +42,11 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.util.Lists; +import org.junit.Assert; import org.junit.Assume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -816,4 +818,31 @@ public Boolean get() { }, 100, 10000); } } + + @Test + public void testCongestedCount() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, true); + MiniDFSCluster cluster = null; + DataNodeFaultInjector old = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + old = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(new DataNodeFaultInjector(){ + @Override + public boolean mockCongestedForTest() { + return true; + } + }); + PipelineAck.ECN ecn = cluster.getDataNodes().get(0).getECN(); + MetricsRecordBuilder dnMetrics = getMetrics(cluster.getDataNodes().get(0) + .getMetrics().name()); + Assert.assertEquals(1L, getLongCounter("CongestedCount", dnMetrics)); + } finally { + if (cluster != null) { + DataNodeFaultInjector.set(old); + cluster.shutdown(); + } + } + } }