Skip to content

Commit 25103c9

Browse files
AndreyNudkochrjohn
andcommitted
Calculating orderedFieldsArray eagerly to avoid concurrency problems (QFJ-971) (#241)
* #240 Calculating orderedFieldsArray eagerly to avoid races when dictionary is shared between multiple sessions * Added unit test to verify race condition is fixed. * Added missing constructor to PausableThreadPoolExecutor. Co-authored-by: Christoph John <[email protected]>
1 parent 4b779b5 commit 25103c9

File tree

3 files changed

+108
-9
lines changed

3 files changed

+108
-9
lines changed

quickfixj-core/src/main/java/quickfix/DataDictionary.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public class DataDictionary {
8787
private final IntegerStringMap<String> valueNames = new IntegerStringMap<>();
8888
private final StringIntegerMap<GroupInfo> groups = new StringIntegerMap<>();
8989
private final Map<String, Node> components = new HashMap<>();
90+
private int[] orderedFieldsArray;
9091

9192
private DataDictionary() {
9293
}
@@ -529,6 +530,8 @@ private void copyFrom(DataDictionary rhs) {
529530
setCheckUserDefinedFields(rhs.checkUserDefinedFields);
530531
setCheckUnorderedGroupFields(rhs.checkUnorderedGroupFields);
531532
setAllowUnknownMessageFields(rhs.allowUnknownMessageFields);
533+
534+
calculateOrderedFields();
532535
}
533536

534537
@SuppressWarnings("unchecked")
@@ -1017,6 +1020,8 @@ private void load(InputStream inputStream) throws ConfigError {
10171020
load(document, msgtype, messageNode);
10181021
}
10191022
}
1023+
1024+
calculateOrderedFields();
10201025
}
10211026

10221027
public int getNumMessageCategories() {
@@ -1069,18 +1074,22 @@ private void load(Document document, String msgtype, Node node) throws ConfigErr
10691074
}
10701075
}
10711076

1072-
private int[] orderedFieldsArray;
1073-
10741077
public int[] getOrderedFields() {
1075-
if (orderedFieldsArray == null) {
1076-
orderedFieldsArray = new int[fields.size()];
1077-
int i = 0;
1078-
for (Integer field : fields) {
1079-
orderedFieldsArray[i++] = field;
1080-
}
1078+
return orderedFieldsArray;
1079+
}
1080+
1081+
private void calculateOrderedFields() {
1082+
orderedFieldsArray = new int[fields.size()];
1083+
int i = 0;
1084+
for (Integer field : fields) {
1085+
orderedFieldsArray[i++] = field;
10811086
}
10821087

1083-
return orderedFieldsArray;
1088+
for (Map<Integer, GroupInfo> gm : groups.values()) {
1089+
for (GroupInfo gi : gm.values()) {
1090+
gi.dataDictionary.calculateOrderedFields();
1091+
}
1092+
}
10841093
}
10851094

10861095
private int lookupXMLFieldNumber(Document document, Node node) throws ConfigError {

quickfixj-core/src/test/java/quickfix/DataDictionaryTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,15 @@
3434
import quickfix.field.MsgType;
3535
import quickfix.field.NoHops;
3636
import quickfix.field.NoPartyIDs;
37+
import quickfix.field.NoPartySubIDs;
3738
import quickfix.field.NoRelatedSym;
3839
import quickfix.field.OrdType;
3940
import quickfix.field.OrderQty;
41+
import quickfix.field.PartyID;
42+
import quickfix.field.PartyIDSource;
43+
import quickfix.field.PartyRole;
44+
import quickfix.field.PartySubID;
45+
import quickfix.field.PartySubIDType;
4046
import quickfix.field.Price;
4147
import quickfix.field.QuoteReqID;
4248
import quickfix.field.SenderCompID;
@@ -55,9 +61,15 @@
5561
import java.math.BigDecimal;
5662
import java.net.URL;
5763
import java.net.URLClassLoader;
64+
import java.util.ArrayList;
65+
import java.util.List;
66+
import java.util.concurrent.Callable;
67+
import java.util.concurrent.Future;
68+
import java.util.concurrent.TimeUnit;
5869

5970
import static org.hamcrest.CoreMatchers.is;
6071
import static org.hamcrest.Matchers.hasProperty;
72+
import static org.junit.Assert.assertArrayEquals;
6173
import static org.junit.Assert.assertEquals;
6274
import static org.junit.Assert.assertFalse;
6375
import static org.junit.Assert.assertNotNull;
@@ -973,6 +985,8 @@ public void testCopy() throws Exception {
973985
assertEquals(ddCopy.isCheckFieldsOutOfOrder(),dataDictionary.isCheckFieldsOutOfOrder());
974986
assertEquals(ddCopy.isCheckUnorderedGroupFields(),dataDictionary.isCheckUnorderedGroupFields());
975987
assertEquals(ddCopy.isCheckUserDefinedFields(),dataDictionary.isCheckUserDefinedFields());
988+
assertArrayEquals(getDictionary().getOrderedFields(),ddCopy.getOrderedFields());
989+
assertArrayEquals(getDictionary().getOrderedFields(),dataDictionary.getOrderedFields());
976990

977991
DataDictionary.GroupInfo groupFromDDCopy = ddCopy.getGroup(NewOrderSingle.MSGTYPE, NoPartyIDs.FIELD);
978992
assertTrue(groupFromDDCopy.getDataDictionary().isAllowUnknownMessageFields());
@@ -984,6 +998,31 @@ public void testCopy() throws Exception {
984998
assertTrue(ddCopy.isAllowUnknownMessageFields());
985999
groupFromDDCopy = ddCopy.getGroup(NewOrderSingle.MSGTYPE, NoPartyIDs.FIELD);
9861000
assertTrue(groupFromDDCopy.getDataDictionary().isAllowUnknownMessageFields());
1001+
1002+
DataDictionary originalGroupDictionary = getDictionary().getGroup(NewOrderSingle.MSGTYPE, NoPartyIDs.FIELD).getDataDictionary();
1003+
DataDictionary groupDictionary = dataDictionary.getGroup(NewOrderSingle.MSGTYPE, NoPartyIDs.FIELD).getDataDictionary();
1004+
DataDictionary copyGroupDictionary = ddCopy.getGroup(NewOrderSingle.MSGTYPE, NoPartyIDs.FIELD).getDataDictionary();
1005+
assertArrayEquals(originalGroupDictionary.getOrderedFields(), groupDictionary.getOrderedFields());
1006+
assertArrayEquals(originalGroupDictionary.getOrderedFields(), copyGroupDictionary.getOrderedFields());
1007+
1008+
DataDictionary originalNestedGroupDictionary = originalGroupDictionary.getGroup(NewOrderSingle.MSGTYPE, NoPartySubIDs.FIELD).getDataDictionary();
1009+
DataDictionary nestedGroupDictionary = groupDictionary.getGroup(NewOrderSingle.MSGTYPE, NoPartySubIDs.FIELD).getDataDictionary();
1010+
DataDictionary copyNestedGroupDictionary = copyGroupDictionary.getGroup(NewOrderSingle.MSGTYPE, NoPartySubIDs.FIELD).getDataDictionary();
1011+
assertArrayEquals(originalNestedGroupDictionary.getOrderedFields(), nestedGroupDictionary.getOrderedFields());
1012+
assertArrayEquals(originalNestedGroupDictionary.getOrderedFields(), copyNestedGroupDictionary.getOrderedFields());
1013+
}
1014+
1015+
@Test
1016+
public void testOrderedFields() throws Exception {
1017+
final DataDictionary dataDictionary = getDictionary();
1018+
1019+
final DataDictionary partyIDsDictionary = dataDictionary.getGroup(NewOrderSingle.MSGTYPE, NoPartyIDs.FIELD).getDataDictionary();
1020+
int[] expectedPartyIDsFieldOrder = new int[] {PartyID.FIELD, PartyIDSource.FIELD, PartyRole.FIELD, NoPartySubIDs.FIELD};
1021+
assertArrayEquals(expectedPartyIDsFieldOrder, partyIDsDictionary.getOrderedFields());
1022+
1023+
final DataDictionary partySubIDsDictionary = partyIDsDictionary.getGroup(NewOrderSingle.MSGTYPE, NoPartySubIDs.FIELD).getDataDictionary();
1024+
int[] expectedPartySubIDsFieldOrder = new int[] {PartySubID.FIELD, PartySubIDType.FIELD};
1025+
assertArrayEquals(expectedPartySubIDsFieldOrder, partySubIDsDictionary.getOrderedFields());
9871026
}
9881027

9891028
/**
@@ -1299,6 +1338,52 @@ public void testAllowingBlankValuesDisablesFieldValidation() throws Exception {
12991338
dictionary.validate(newSingle, true);
13001339
}
13011340

1341+
1342+
// QFJ-971
1343+
@Test
1344+
public void testConcurrentValidationFailure() throws Exception {
1345+
final String data = "8=FIX.4.4|9=284|35=F|49=TEST_49|56=TEST_56|34=420|52=20190302-07:31:57.079|"
1346+
+ "115=TEST3|116=TEST_116|11=TEST_11|41=TEST_41|55=TEST_55|48=TEST_48|22=4|54=2|"
1347+
+ "60=20190302-07:31:56.933|38=100|207=TEST_207|453=1|448=TEST_448|447=D|452=3|10=204|";
1348+
final String msgString = data.replace('|', (char) 1);
1349+
1350+
// use some more threads to make it more likely that the problem will occur
1351+
final int noOfThreads = 8;
1352+
final int noOfIterations = 500;
1353+
1354+
for (int i = 0; i < noOfIterations; i++) {
1355+
final DataDictionary dd = new DataDictionary("FIX44.xml");
1356+
final MessageFactory messageFactory = new quickfix.fix44.MessageFactory();
1357+
PausableThreadPoolExecutor ptpe = new PausableThreadPoolExecutor(noOfThreads);
1358+
// submit threads to pausable executor and try to let them start at the same time
1359+
ptpe.pause();
1360+
List<Future> resultList = new ArrayList<>();
1361+
for (int j = 0; j < noOfThreads; j++) {
1362+
final Callable messageParser = (Callable) () -> {
1363+
Message msg = MessageUtils.parse(messageFactory, dd, msgString);
1364+
Group partyGroup = msg.getGroups(quickfix.field.NoPartyIDs.FIELD).get(0);
1365+
char partyIdSource = partyGroup.getChar(PartyIDSource.FIELD);
1366+
assertEquals(PartyIDSource.PROPRIETARY_CUSTOM_CODE, partyIdSource);
1367+
return msg;
1368+
};
1369+
resultList.add(ptpe.submit(messageParser));
1370+
}
1371+
1372+
// start all threads
1373+
ptpe.resume();
1374+
ptpe.shutdown();
1375+
ptpe.awaitTermination(10, TimeUnit.MILLISECONDS);
1376+
1377+
// validate results
1378+
for (Future future : resultList) {
1379+
// if unsuccessful, this will throw an ExecutionException
1380+
future.get();
1381+
}
1382+
}
1383+
}
1384+
1385+
1386+
13021387
//
13031388
// Group Validation Tests in RepeatingGroupTest
13041389
//

quickfixj-core/src/test/java/quickfix/PausableThreadPoolExecutor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ public PausableThreadPoolExecutor() {
1515
super(2, 2, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));
1616
}
1717

18+
public PausableThreadPoolExecutor(int noOfThreads) {
19+
super(noOfThreads, noOfThreads, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));
20+
}
21+
22+
@Override
1823
protected void beforeExecute(Thread t, Runnable r) {
1924
super.beforeExecute(t, r);
2025
pauseLock.lock();

0 commit comments

Comments
 (0)