Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) |
| `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) |
| `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) |

| `slowFlushOrSyncCount` | Total number of packets whose sync/flush takes more than a certain time (300ms by default) |
| `slowAckToUpstreamCount` | Total number of packets whose upstream ack takes more than a certain time (300ms by default) |
FsVolume
--------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ boolean packetSentInTime() {
void flushOrSync(boolean isSync, long seqno) throws IOException {
long flushTotalNanos = 0;
long begin = Time.monotonicNow();
DataNodeFaultInjector.get().delay();
if (checksumOut != null) {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
Expand Down Expand Up @@ -445,6 +446,7 @@ void flushOrSync(boolean isSync, long seqno) throws IOException {
}
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
datanode.metrics.incrSlowFlushOrSyncCount();
LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
+ datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
+ flushTotalNanos + "ns, volume=" + getVolumeBaseUri()
Expand Down Expand Up @@ -1656,6 +1658,7 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
}
// send my ack back to upstream datanode
long begin = Time.monotonicNow();
DataNodeFaultInjector.get().delay();
/* for test only, no-op in production system */
DataNodeFaultInjector.get().delaySendingAckToUpstream(inAddr);
replyAck.write(upstreamOut);
Expand All @@ -1665,6 +1668,7 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
inAddr,
duration);
if (duration > datanodeSlowLogThresholdMs) {
datanode.metrics.incrSlowAckToUpstreamCount();
LOG.warn("Slow PacketResponder send ack to upstream took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
+ ", replyAck=" + replyAck
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong packetsSlowWriteToMirror;
@Metric MutableCounterLong packetsSlowWriteToDisk;
@Metric MutableCounterLong packetsSlowWriteToOsCache;
@Metric private MutableCounterLong slowFlushOrSyncCount;
@Metric private MutableCounterLong slowAckToUpstreamCount;

@Metric("Number of replaceBlock ops between" +
" storage types on same host with local copy")
Expand Down Expand Up @@ -440,6 +442,14 @@ public void incrVolumeFailures(int size) {
volumeFailures.incr(size);
}

public void incrSlowFlushOrSyncCount() {
slowFlushOrSyncCount.incr();
}

public void incrSlowAckToUpstreamCount() {
slowAckToUpstreamCount.incr();
}

public void incrDatanodeNetworkErrors() {
datanodeNetworkErrors.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,60 @@ public void testNNRpcMetricsWithNonHA() throws IOException {
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
assertCounter("HeartbeatsNumOps", 1L, rb);
}
@Test(timeout = 60000)
public void testSlowMetrics() throws Exception {
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override public void delay() {
try {
Thread.sleep(310);
} catch (InterruptedException e) {
}
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(dnFaultInjector);

Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
final FileSystem fs = cluster.getFileSystem();
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 3);
final DataNode datanode = datanodes.get(0);
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
final long longFileLen = 10;
final long startFlushOrSyncValue =
getLongCounter("SlowFlushOrSyncCount", rb);
final long startAckToUpstreamValue =
getLongCounter("SlowAckToUpstreamCount", rb);
final AtomicInteger x = new AtomicInteger(0);

GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
x.getAndIncrement();
try {
DFSTestUtil
.createFile(fs, new Path("/time.txt." + x.get()), longFileLen,
(short) 3, Time.monotonicNow());
} catch (IOException ioe) {
LOG.error("Caught IOException while ingesting DN metrics", ioe);
return false;
}
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
final long endFlushOrSyncValue = getLongCounter("SlowFlushOrSyncCount", rbNew);
final long endAckToUpstreamValue = getLongCounter("SlowAckToUpstreamCount", rbNew);
return endFlushOrSyncValue > startFlushOrSyncValue
&& endAckToUpstreamValue > startAckToUpstreamValue;
}
}, 30, 30000);
} finally {
DataNodeFaultInjector.set(oldDnInjector);
if (cluster != null) {
cluster.shutdown();
}
}
}

@Test
public void testNNRpcMetricsWithHA() throws IOException {
Expand Down