Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (curPeer == null) break;
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
try {
peer.setReadTimeout(conf.socketTimeout);
}
catch(IOException e){}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the javadoc, it should return a null if it is unable to establish a connection. I think it should also return a null in this case as well.

Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache();
try {
Expand All @@ -486,6 +490,10 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
curPeer = nextDomainPeer();
if (curPeer == null) break;
peer = (DomainPeer)curPeer.peer;
try {
peer.setReadTimeout(conf.socketTimeout);
}
catch(IOException e){}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return null upon exception

}
ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
clientContext.getPeerCache().put(datanode, peer);
Expand Down Expand Up @@ -629,6 +637,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
if (curPeer == null) break;
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
peer.setReadTimeout(conf.socketTimeout);
BlockReader blockReader = null;
try {
blockReader = getRemoteBlockReader(peer);
Expand Down Expand Up @@ -694,6 +703,7 @@ private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
curPeer = nextTcpPeer();
if (curPeer.fromCache) remainingCacheTries--;
peer = curPeer.peer;
peer.setReadTimeout(conf.socketTimeout);
blockReader = getRemoteBlockReader(peer);
return blockReader;
} catch (IOException ioe) {
Expand Down Expand Up @@ -743,6 +753,10 @@ private BlockReaderPeer nextDomainPeer() {
if (remainingCacheTries > 0) {
Peer peer = clientContext.getPeerCache().get(datanode, true);
if (peer != null) {
try {
peer.setReadTimeout(conf.socketTimeout);
}
catch(IOException e){}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null upon exception

if (LOG.isTraceEnabled()) {
LOG.trace("nextDomainPeer: reusing existing peer " + peer);
}
Expand All @@ -767,6 +781,7 @@ private BlockReaderPeer nextTcpPeer() throws IOException {
if (remainingCacheTries > 0) {
Peer peer = clientContext.getPeerCache().get(datanode, false);
if (peer != null) {
peer.setReadTimeout(conf.socketTimeout);
if (LOG.isTraceEnabled()) {
LOG.trace("nextTcpPeer: reusing existing peer " + peer);
}
Expand All @@ -776,6 +791,7 @@ private BlockReaderPeer nextTcpPeer() throws IOException {
try {
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
datanode);
peer.setReadTimeout(conf.socketTimeout);
if (LOG.isTraceEnabled()) {
LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
}
Expand Down