Skip to content

Commit b502210

Browse files
committed
Fix STOMP connect failure related memory leak
Normally heartbeats keep connections from hanging. However in some cases a connection may hang before a CONNECTED frame is received and heartbeats are put in place. This commit adds a change to enforce a 60 limit on receiving the CONNECTED frame. Issue: SPR-14266
1 parent 6eba220 commit b502210

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

+17
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
8888

8989
private static final Message<byte[]> HEARTBEAT_MESSAGE;
9090

91+
/**
92+
* A heartbeat is setup once a CONNECTED frame is received which contains
93+
* the heartbeat settings we need. If we don't receive CONNECTED within
94+
* a minute, the connection is closed proactively.
95+
*/
96+
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
97+
98+
9199

92100
static {
93101
EMPTY_TASK.run();
@@ -567,6 +575,15 @@ public void afterConnected(TcpConnection<byte[]> connection) {
567575
logger.debug("TCP connection opened in session=" + getSessionId());
568576
}
569577
this.tcpConnection = connection;
578+
this.tcpConnection.onReadInactivity(new Runnable() {
579+
@Override
580+
public void run() {
581+
if (tcpConnection != null && !isStompConnected) {
582+
handleTcpConnectionFailure("No CONNECTED frame received in " +
583+
MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
584+
}
585+
}
586+
}, MAX_TIME_TO_CONNECTED_FRAME);
570587
connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
571588
}
572589

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHa
168168
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
169169

170170
final TcpClient<Message<P>, Message<P>> tcpClient;
171-
Runnable cleanupTask;
171+
final Runnable cleanupTask;
172172
synchronized (this.tcpClients) {
173173
if (this.stopping) {
174174
IllegalStateException ex = new IllegalStateException("Shutting down.");
@@ -194,6 +194,7 @@ public void run() {
194194
promise.onError(new Consumer<Throwable>() {
195195
@Override
196196
public void accept(Throwable ex) {
197+
cleanupTask.run();
197198
connectionHandler.afterConnectFailure(ex);
198199
}
199200
})

0 commit comments

Comments
 (0)