From d0ca1183bef0b328f1a94298a551175f34c09395 Mon Sep 17 00:00:00 2001 From: "suresh.bahuguna" Date: Fri, 9 Dec 2016 16:47:03 +0530 Subject: [PATCH 1/3] HDFS-11227: Set read timeout for peer --- .../apache/hadoop/hdfs/BlockReaderFactory.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 8061f7b5a6cbf..5041ef3330ac2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -471,6 +471,10 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer == null) break; if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; + try { + peer.setReadTimeout(conf.socketTimeout); + } + catch(IOException e){} Slot slot = null; ShortCircuitCache cache = clientContext.getShortCircuitCache(); try { @@ -486,6 +490,10 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { curPeer = nextDomainPeer(); if (curPeer == null) break; peer = (DomainPeer)curPeer.peer; + try { + peer.setReadTimeout(conf.socketTimeout); + } + catch(IOException e){} } ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); clientContext.getPeerCache().put(datanode, peer); @@ -629,6 +637,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException { if (curPeer == null) break; if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; + peer.setReadTimeout(conf.socketTimeout); BlockReader blockReader = null; try { blockReader = getRemoteBlockReader(peer); @@ -694,6 +703,7 @@ private BlockReader getRemoteBlockReaderFromTcp() throws IOException { curPeer = nextTcpPeer(); if (curPeer.fromCache) remainingCacheTries--; peer = curPeer.peer; + peer.setReadTimeout(conf.socketTimeout); blockReader = getRemoteBlockReader(peer); return blockReader; } catch (IOException ioe) { @@ -743,6 +753,10 @@ private BlockReaderPeer nextDomainPeer() { if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, true); if (peer != null) { + try { + peer.setReadTimeout(conf.socketTimeout); + } + catch(IOException e){} if (LOG.isTraceEnabled()) { LOG.trace("nextDomainPeer: reusing existing peer " + peer); } @@ -767,6 +781,7 @@ private BlockReaderPeer nextTcpPeer() throws IOException { if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, false); if (peer != null) { + peer.setReadTimeout(conf.socketTimeout); if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: reusing existing peer " + peer); } @@ -776,6 +791,7 @@ private BlockReaderPeer nextTcpPeer() throws IOException { try { Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, datanode); + peer.setReadTimeout(conf.socketTimeout); if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: created newConnectedPeer " + peer); } From 6c29642440471a6da6da0f6996eabafad46acbf3 Mon Sep 17 00:00:00 2001 From: "suresh.bahuguna" Date: Mon, 12 Dec 2016 11:33:10 +0530 Subject: [PATCH 2/3] HDFS-11234: Made the socket buffer size configurable with the config node fs.hdfs.data.socket.size to be set in core-site.xml. If the node is not found, the default value is -1, so socket buffer size would not be set. --- .../java/org/apache/hadoop/hdfs/DFSOutputStream.java | 5 ++++- .../apache/hadoop/hdfs/protocol/HdfsConstants.java | 7 +++++-- .../apache/hadoop/hdfs/server/datanode/DataNode.java | 11 ++++++++--- .../hadoop/hdfs/server/datanode/DataXceiver.java | 7 +++++-- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f105530835f10..5445b65b4f545 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1507,7 +1507,10 @@ static Socket createSocketForPipeline(final DatanodeInfo first, final int timeout = client.getDatanodeReadTimeout(length); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout); sock.setSoTimeout(timeout); - sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + if (HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0) { + DFSClient.LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + } if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 4e9532963772c..f2301911cdae4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -70,9 +70,12 @@ protected HdfsConstants() { // to 1k. public static final int MAX_PATH_LENGTH = 8000; public static final int MAX_PATH_DEPTH = 1000; + + // specify socket buffer size in bytes, useful to increase for high latency/high bandwidth systems. + // As a special case, values <=0 cause the setReceiveBufferSize/setSendBufferSize function call to be skipped; + // letting the kernel do the right thing. -1 is the default, old default was 128*1024 bytes + public static final int DEFAULT_DATA_SOCKET_SIZE = new HdfsConfiguration().getInt("fs.hdfs.data.socket.size", -1); - // TODO should be conf injected? - public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024; public static final int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt( DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c39540eff7781..24c74eb16a9ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -893,7 +893,10 @@ private void initDataXceiver(Configuration conf) throws IOException { tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, DataNode.getStreamingAddr(conf)); } - tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + if(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0) { + LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + } streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); @@ -941,8 +944,10 @@ static DomainPeerServer getDomainPeerServer(Configuration conf, } DomainPeerServer domainPeerServer = new DomainPeerServer(domainSocketPath, port); - domainPeerServer.setReceiveBufferSize( - HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + if( HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0){ + LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + domainPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + } return domainPeerServer; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 01ff32daffa84..06a28f9b56da5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -706,8 +706,11 @@ public void writeBlock(final ExtendedBlock block, (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); - mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - + if (HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0) { + LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + } + OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); From ebd66bc018ee7ff432447ea3ebe569f36b6559c7 Mon Sep 17 00:00:00 2001 From: "suresh.bahuguna" Date: Mon, 12 Dec 2016 12:16:55 +0530 Subject: [PATCH 3/3] Revert "HDFS-11234: Made the socket buffer size configurable with the config node fs.hdfs.data.socket.size to be set in core-site.xml. If the node is not found, the default value is -1, so socket buffer size would not be set. Will add this commit in separate PR" This reverts commit 6c29642440471a6da6da0f6996eabafad46acbf3. --- .../java/org/apache/hadoop/hdfs/DFSOutputStream.java | 5 +---- .../apache/hadoop/hdfs/protocol/HdfsConstants.java | 7 ++----- .../apache/hadoop/hdfs/server/datanode/DataNode.java | 11 +++-------- .../hadoop/hdfs/server/datanode/DataXceiver.java | 7 ++----- 4 files changed, 8 insertions(+), 22 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 5445b65b4f545..f105530835f10 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1507,10 +1507,7 @@ static Socket createSocketForPipeline(final DatanodeInfo first, final int timeout = client.getDatanodeReadTimeout(length); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout); sock.setSoTimeout(timeout); - if (HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0) { - DFSClient.LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - } + sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index f2301911cdae4..4e9532963772c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -70,12 +70,9 @@ protected HdfsConstants() { // to 1k. public static final int MAX_PATH_LENGTH = 8000; public static final int MAX_PATH_DEPTH = 1000; - - // specify socket buffer size in bytes, useful to increase for high latency/high bandwidth systems. - // As a special case, values <=0 cause the setReceiveBufferSize/setSendBufferSize function call to be skipped; - // letting the kernel do the right thing. -1 is the default, old default was 128*1024 bytes - public static final int DEFAULT_DATA_SOCKET_SIZE = new HdfsConfiguration().getInt("fs.hdfs.data.socket.size", -1); + // TODO should be conf injected? + public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024; public static final int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt( DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 24c74eb16a9ce..c39540eff7781 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -893,10 +893,7 @@ private void initDataXceiver(Configuration conf) throws IOException { tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, DataNode.getStreamingAddr(conf)); } - if(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0) { - LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - } + tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); @@ -944,10 +941,8 @@ static DomainPeerServer getDomainPeerServer(Configuration conf, } DomainPeerServer domainPeerServer = new DomainPeerServer(domainSocketPath, port); - if( HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0){ - LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - domainPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - } + domainPeerServer.setReceiveBufferSize( + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); return domainPeerServer; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 06a28f9b56da5..01ff32daffa84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -706,11 +706,8 @@ public void writeBlock(final ExtendedBlock block, (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); - if (HdfsConstants.DEFAULT_DATA_SOCKET_SIZE > 0) { - LOG.info("Setting buf size to " + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - } - + mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);