Skip to content

Commit 5a31fcd

Browse files
authored
QFJ-375 - configurable timeout multiplier (#288)
1 parent c73c4c7 commit 5a31fcd

File tree

8 files changed

+94
-48
lines changed

8 files changed

+94
-48
lines changed

quickfixj-core/src/main/doc/usermanual/usage/configuration.html

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,11 +1285,21 @@ <H3>QuickFIX Settings</H3>
12851285
<TR ALIGN="left" VALIGN="middle">
12861286
<TD><I>TestRequestDelayMultiplier</I></TD>
12871287
<TD>Fraction of the heartbeat interval which defines the additional time to wait
1288-
if a TestRequest sent after a missing heartbeat times out.
1288+
if a TestRequest sent after a missing heartbeat times out (final coefficient value is equal to
1289+
TestRequestDelayMultiplier + 1.0).
12891290
</TD>
1290-
<TD>0..1</TD>
1291+
<TD>any non-negative value</TD>
12911292
<TD>0.5</TD>
12921293
</TR>
1294+
<TR ALIGN="left" VALIGN="middle">
1295+
<TD><I>HeartBeatTimeoutMultiplier</I></TD>
1296+
<TD>Fraction of the heartbeat interval which defines the additional time to wait
1297+
since the last message was received before disconnecting (final coefficient value is equal to
1298+
HeartBeatTimeoutMultiplier + 1.0).
1299+
</TD>
1300+
<TD>any non-negative value</TD>
1301+
<TD>1.4</TD>
1302+
</TR>
12931303
<TR ALIGN="left" VALIGN="middle">
12941304
<TD><I>DisableHeartBeatCheck</I></TD>
12951305
<TD> Heartbeat detection is disabled. A disconnect due to a missing heartbeat will never occur.</TD>

quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
170170
final double testRequestDelayMultiplier = getSetting(settings, sessionID,
171171
Session.SETTING_TEST_REQUEST_DELAY_MULTIPLIER,
172172
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER);
173+
final double heartBeatTimeoutMultiplier = getSetting(settings, sessionID,
174+
Session.SETTING_HEARTBEAT_TIMEOUT_MULTIPLIER,
175+
Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
173176

174177
final UtcTimestampPrecision timestampPrecision = getTimestampPrecision(settings, sessionID,
175178
UtcTimestampPrecision.MILLIS);
@@ -228,7 +231,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
228231
rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime,
229232
forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage,
230233
resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed,
231-
validateChecksum, logonTags);
234+
validateChecksum, logonTags, heartBeatTimeoutMultiplier);
232235

233236
session.setLogonTimeout(logonTimeout);
234237
session.setLogoutTimeout(logoutTimeout);

quickfixj-core/src/main/java/quickfix/Session.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,15 @@ public class Session implements Closeable {
113113
public static final String SETTING_MAX_LATENCY = "MaxLatency";
114114

115115
/**
116-
* Session setting for the test delay multiplier (0-1, as fraction of Heartbeat interval)
116+
* Session setting for the test delay multiplier (as fraction of heartbeat interval).
117117
*/
118118
public static final String SETTING_TEST_REQUEST_DELAY_MULTIPLIER = "TestRequestDelayMultiplier";
119119

120+
/**
121+
* Session setting for the heartbeat timeout multiplier (as fraction of heartbeat interval).
122+
*/
123+
public static final String SETTING_HEARTBEAT_TIMEOUT_MULTIPLIER = "HeartBeatTimeoutMultiplier";
124+
120125
/**
121126
* Session scheduling setting to specify that session never reset
122127
*/
@@ -426,14 +431,15 @@ public class Session implements Closeable {
426431

427432
private final AtomicReference<ApplVerID> targetDefaultApplVerID = new AtomicReference<>();
428433
private final DefaultApplVerID senderDefaultApplVerID;
429-
private boolean validateSequenceNumbers = true;
430-
private boolean validateIncomingMessage = true;
434+
private final boolean validateSequenceNumbers;
435+
private final boolean validateIncomingMessage;
431436
private final int[] logonIntervals;
432437
private final Set<InetAddress> allowedRemoteAddresses;
433438

434439
public static final int DEFAULT_MAX_LATENCY = 120;
435440
public static final int DEFAULT_RESEND_RANGE_CHUNK_SIZE = 0; // no resend range
436441
public static final double DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER = 0.5;
442+
public static final double DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER = 1.4;
437443
private static final String ENCOUNTERED_END_OF_STREAM = "Encountered END_OF_STREAM";
438444

439445

@@ -447,15 +453,14 @@ public class Session implements Closeable {
447453

448454
protected static final Logger LOG = LoggerFactory.getLogger(Session.class);
449455

450-
451456
Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID,
452-
DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule,
453-
LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval) {
454-
this(application, messageStoreFactory, sessionID, dataDictionaryProvider, sessionSchedule,
455-
logFactory, messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS,
456-
false, false, false, false, true, false, true, false,
457-
DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[]{5}, false, false,
458-
false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, false, false, new ArrayList<StringField>());
457+
DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, LogFactory logFactory,
458+
MessageFactory messageFactory, int heartbeatInterval) {
459+
this(application, messageStoreFactory, sessionID, dataDictionaryProvider, sessionSchedule, logFactory,
460+
messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false,
461+
false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5},
462+
false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false,
463+
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
459464
}
460465

461466
Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID,
@@ -473,7 +478,7 @@ public class Session implements Closeable {
473478
boolean forceResendWhenCorruptedStore, Set<InetAddress> allowedRemoteAddresses,
474479
boolean validateIncomingMessage, int resendRequestChunkSize,
475480
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
476-
boolean validateChecksum, List<StringField> logonTags) {
481+
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier) {
477482
this.application = application;
478483
this.sessionID = sessionID;
479484
this.sessionSchedule = sessionSchedule;
@@ -520,7 +525,7 @@ public class Session implements Closeable {
520525
}
521526

522527
state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0,
523-
messageStore, testRequestDelayMultiplier);
528+
messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier);
524529

525530
registerSession(this);
526531

quickfixj-core/src/main/java/quickfix/SessionState.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Date;
2525
import java.util.LinkedHashMap;
2626
import java.util.Map;
27+
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicInteger;
2829
import java.util.concurrent.locks.Lock;
2930
import java.util.concurrent.locks.ReentrantLock;
@@ -55,6 +56,7 @@ public final class SessionState {
5556
private long lastSentTime;
5657
private long lastReceivedTime;
5758
private final double testRequestDelayMultiplier;
59+
private final double heartBeatTimeoutMultiplier;
5860
private long heartBeatMillis = Long.MAX_VALUE;
5961
private int heartBeatInterval;
6062

@@ -75,13 +77,14 @@ public final class SessionState {
7577
private final Map<Integer, Message> messageQueue = new LinkedHashMap<>();
7678

7779
public SessionState(Object lock, Log log, int heartBeatInterval, boolean initiator, MessageStore messageStore,
78-
double testRequestDelayMultiplier) {
80+
double testRequestDelayMultiplier, double heartBeatTimeoutMultiplier) {
7981
this.lock = lock;
8082
this.initiator = initiator;
8183
this.messageStore = messageStore;
8284
setHeartBeatInterval(heartBeatInterval);
8385
this.log = log == null ? new NullLog() : log;
8486
this.testRequestDelayMultiplier = testRequestDelayMultiplier;
87+
this.heartBeatTimeoutMultiplier = heartBeatTimeoutMultiplier;
8588
}
8689

8790
public int getHeartBeatInterval() {
@@ -93,13 +96,7 @@ public int getHeartBeatInterval() {
9396
public void setHeartBeatInterval(int heartBeatInterval) {
9497
synchronized (lock) {
9598
this.heartBeatInterval = heartBeatInterval;
96-
}
97-
setHeartBeatMillis(heartBeatInterval * 1000L);
98-
}
99-
100-
private void setHeartBeatMillis(long heartBeatMillis) {
101-
synchronized (lock) {
102-
this.heartBeatMillis = heartBeatMillis;
99+
this.heartBeatMillis = TimeUnit.SECONDS.toMillis(heartBeatInterval);
103100
}
104101
}
105102

@@ -291,7 +288,7 @@ private long timeSinceLastReceivedMessage() {
291288

292289
public boolean isTimedOut() {
293290
long millisSinceLastReceivedTime = timeSinceLastReceivedMessage();
294-
return millisSinceLastReceivedTime >= 2.4 * getHeartBeatMillis();
291+
return millisSinceLastReceivedTime >= (1 + heartBeatTimeoutMultiplier) * getHeartBeatMillis();
295292
}
296293

297294
public boolean set(int sequence, String message) throws IOException {

quickfixj-core/src/test/java/quickfix/MockSystemTimeSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void setSystemTimes(long[] times) {
4040
systemTimes = times;
4141
}
4242

43-
private void setSystemTimes(long time) {
43+
void setSystemTimes(long time) {
4444
systemTimes = new long[] { time };
4545
}
4646

quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public static final class Builder {
9494
private boolean persistMessages = false;
9595
private final boolean useClosedRangeForResend = false;
9696
private final double testRequestDelayMultiplier = 1.5;
97+
private final double heartBeatTimeoutMultiplier = 2.5;
9798
private DefaultApplVerID senderDefaultApplVerID = null;
9899
private boolean validateSequenceNumbers = true;
99100
private final int[] logonIntervals = new int[]{5};
@@ -122,7 +123,7 @@ public Session build() {
122123
resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage,
123124
rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore,
124125
allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum,
125-
enableLastMsgSeqNumProcessed, validateChecksum, logonTags);
126+
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier);
126127
}
127128

128129
public Builder setBeginString(final String beginString) {

quickfixj-core/src/test/java/quickfix/SessionStateTest.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,32 @@
1919

2020
package quickfix;
2121

22-
import junit.framework.TestCase;
22+
import org.junit.After;
23+
import org.junit.Before;
24+
import org.junit.Test;
2325

24-
public class SessionStateTest extends TestCase {
25-
protected void setUp() throws Exception {
26-
super.setUp();
27-
MockSystemTimeSource mockTimeSource = new MockSystemTimeSource(1000);
28-
SystemTime.setTimeSource(mockTimeSource);
26+
import static org.junit.Assert.assertFalse;
27+
import static org.junit.Assert.assertTrue;
28+
29+
public class SessionStateTest {
30+
31+
private MockSystemTimeSource timeSource;
32+
33+
@Before
34+
public void setUp() {
35+
timeSource = new MockSystemTimeSource(1000);
36+
SystemTime.setTimeSource(timeSource);
2937
}
3038

31-
protected void tearDown() throws Exception {
39+
@After
40+
public void tearDown() {
3241
SystemTime.setTimeSource(null);
33-
super.tearDown();
3442
}
3543

36-
public void testTimeoutDefaultsAreNonzero() throws Exception {
37-
SessionState state = new SessionState(new Object(), null, 0, false, null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER);
44+
@Test
45+
public void testTimeoutDefaultsAreNonzero() {
46+
SessionState state = new SessionState(new Object(), null, 0, false, null,
47+
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
3848
state.setLastReceivedTime(900);
3949
assertFalse("logon timeout not init'ed", state.isLogonTimedOut());
4050

@@ -43,8 +53,10 @@ public void testTimeoutDefaultsAreNonzero() throws Exception {
4353
assertFalse("logout timeout not init'ed", state.isLogoutTimedOut());
4454
}
4555

46-
public void testTestRequestTiming() throws Exception {
47-
SessionState state = new SessionState(new Object(), null, 0, false, null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER);
56+
@Test
57+
public void testTestRequestTiming() {
58+
SessionState state = new SessionState(new Object(), null, 0, false, null,
59+
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
4860
state.setLastReceivedTime(950);
4961
state.setHeartBeatInterval(50);
5062
assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded());
@@ -57,4 +69,22 @@ public void testTestRequestTiming() throws Exception {
5769
state.setHeartBeatInterval(3);
5870
assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded());
5971
}
72+
73+
@Test
74+
public void testSessionTimeout() {
75+
SessionState state = new SessionState(new Object(), null, 30, false, null,
76+
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
77+
78+
// session should timeout after 2.4 * 30 = 72 seconds
79+
state.setLastReceivedTime(950_000);
80+
81+
timeSource.setSystemTimes(1_000_000L);
82+
assertFalse("session is still valid", state.isTimedOut());
83+
84+
timeSource.setSystemTimes(1_021_999L);
85+
assertFalse("session is still valid", state.isTimedOut());
86+
87+
timeSource.setSystemTimes(1_022_000L);
88+
assertTrue("session timed out", state.isTimedOut());
89+
}
6090
}

quickfixj-core/src/test/java/quickfix/SessionTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testDisposalOfFileResources() throws Exception {
101101
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
102102
false, false, false, false, true, false, 1.5, null, true,
103103
new int[] { 5 }, false, false, false, false, true, false, true, false,
104-
null, true, 0, false, false, true, new ArrayList<>())) {
104+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
105105
// Simulate socket disconnect
106106
session.setResponder(null);
107107
}
@@ -142,7 +142,7 @@ public void testNondisposableFileResources() throws Exception {
142142
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
143143
false, false, false, false, true, false, 1.5, null, true,
144144
new int[] { 5 }, false, false, false, false, true, false, true, false,
145-
null, true, 0, false, false, true, new ArrayList<>())) {
145+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
146146
// Simulate socket disconnect
147147
session.setResponder(null);
148148

@@ -2074,7 +2074,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize)
20742074
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
20752075
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
20762076
false, false, false, false, true, false, true, false, null, true,
2077-
chunkSize, false, false, true, new ArrayList<>())) {
2077+
chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
20782078

20792079
UnitTestResponder responder = new UnitTestResponder();
20802080
session.setResponder(responder);
@@ -2136,7 +2136,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
21362136
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
21372137
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
21382138
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2139-
false, false, true, new ArrayList<>());
2139+
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
21402140

21412141
Responder mockResponder = mock(Responder.class);
21422142
when(mockResponder.send(anyString())).thenReturn(true);
@@ -2184,7 +2184,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
21842184
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
21852185
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
21862186
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2187-
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>());
2187+
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
21882188

21892189
Responder mockResponder = mock(Responder.class);
21902190
when(mockResponder.send(anyString())).thenReturn(true);
@@ -2233,7 +2233,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception {
22332233
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
22342234
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
22352235
false, disconnectOnError, false, false, true, false, true, false,
2236-
null, true, 0, false, false, true, new ArrayList<>())) {
2236+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
22372237

22382238
UnitTestResponder responder = new UnitTestResponder();
22392239
session.setResponder(responder);
@@ -2269,7 +2269,7 @@ public void testTimestampPrecision() throws Exception {
22692269
UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true,
22702270
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
22712271
false, disconnectOnError, false, false, true, false, true, false,
2272-
null, true, 0, false, false, true, new ArrayList<>())) {
2272+
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) {
22732273

22742274
UnitTestResponder responder = new UnitTestResponder();
22752275
session.setResponder(responder);
@@ -2321,7 +2321,7 @@ private void testLargeQueue(int N) throws Exception {
23212321
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
23222322
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
23232323
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2324-
false, false, true, new ArrayList<>());
2324+
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
23252325

23262326
UnitTestResponder responder = new UnitTestResponder();
23272327
session.setResponder(responder);
@@ -2437,7 +2437,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound
24372437
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
24382438
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
24392439
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
2440-
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>());
2440+
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
24412441
UnitTestResponder responder = new UnitTestResponder();
24422442
session.setResponder(responder);
24432443

0 commit comments

Comments
 (0)