From 0e0554615185b199d1223dfaca6da2c6cf1ae208 Mon Sep 17 00:00:00 2001 From: Christoph John Date: Tue, 7 Jan 2020 15:30:39 +0100 Subject: [PATCH 1/6] Introduce separate thread pool for establishing Initiator connections * Fixes #254 * introduced separate thread pool with 3 threads for connection establishment * changed `enabled` flag in `Session` to `volatile` and removed synchronization from `setEnabled`/`isEnabled` since I could not find any good reason why it was synchronized ** `volatile` should ensure that all threads should see the current state and prevents possible deadlocks now that flag is checked from distinct threads * removed `synchronized` from `IoSessionInitiator.ConnectTask.run()` since I could not find any good reason why it was synchronized --- .../src/main/java/quickfix/Session.java | 10 ++++++--- .../java/quickfix/mina/SessionConnector.java | 8 ++++--- .../initiator/AbstractSocketInitiator.java | 21 ++++++++++++++++++- .../mina/initiator/IoSessionInitiator.java | 3 ++- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 897cc9bdf..738a541be 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -372,7 +372,11 @@ public class Session implements Closeable { // @GuardedBy(this) private final SessionState state; - private boolean enabled; + /* + * Controls whether it is possible to log on to this Session (if Acceptor) + * or if Logon is sent out respectively (if Initiator). + */ + private volatile boolean enabled; private final Object responderLock = new Object(); // unique instance // @GuardedBy(responderLock) @@ -729,7 +733,7 @@ public void logon() { setEnabled(true); } - private synchronized void setEnabled(boolean enabled) { + private void setEnabled(boolean enabled) { this.enabled = enabled; } @@ -786,7 +790,7 @@ public void logout(String reason) { * * @return true if session is enabled, false otherwise. */ - public synchronized boolean isEnabled() { + public boolean isEnabled() { return enabled; } diff --git a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java index abd9c082c..897599660 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java @@ -71,7 +71,7 @@ public abstract class SessionConnector implements Connector { private final Map sessions = new ConcurrentHashMap<>(); private final SessionSettings settings; private final SessionFactory sessionFactory; - private final static ScheduledExecutorService scheduledExecutorService = Executors + private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors .newSingleThreadScheduledExecutor(new QFTimerThreadFactory()); private ScheduledFuture sessionTimerFuture; private IoFilterChainBuilder ioFilterChainBuilder; @@ -318,7 +318,7 @@ protected void startSessionTimer() { if (shortLivedExecutor != null) { timerTask = new DelegatingTask(timerTask, shortLivedExecutor); } - sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L, + sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L, TimeUnit.MILLISECONDS); log.info("SessionTimer started"); } @@ -339,10 +339,11 @@ boolean checkSessionTimerRunning() { } protected ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorService; + return SCHEDULED_EXECUTOR; } private class SessionTimerTask implements Runnable { + @Override public void run() { try { for (Session session : sessions.values()) { @@ -411,6 +412,7 @@ void await() throws InterruptedException { private static class QFTimerThreadFactory implements ThreadFactory { + @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "QFJ Timer"); thread.setDaemon(true); diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index fd3ea0aa2..77254f0a7 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -51,6 +51,10 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; /** * Abstract base class for socket initiators. @@ -59,6 +63,8 @@ public abstract class AbstractSocketInitiator extends SessionConnector implement protected final Logger log = LoggerFactory.getLogger(getClass()); private final Set initiators = new HashSet<>(); + private static final ScheduledExecutorService SCHEDULED_RECONNECT_EXECUTOR = Executors + .newScheduledThreadPool(3, new QFScheduledReconnectThreadFactory()); protected AbstractSocketInitiator(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, @@ -74,6 +80,19 @@ protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessi IoBuffer.setUseDirectBuffer(false); } + + private static class QFScheduledReconnectThreadFactory implements ThreadFactory { + + private static final AtomicInteger COUNTER = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "QFJ ReconnectTask-" + COUNTER.getAndIncrement()); + thread.setDaemon(true); + return thread; + } + } + protected void createSessionInitiators() throws ConfigError { try { @@ -147,7 +166,7 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session, socketAddresses, localAddress, reconnectingIntervals, - getScheduledExecutorService(), networkingOptions, + SCHEDULED_RECONNECT_EXECUTOR, networkingOptions, getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig, proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation); diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java index e70e6325d..e96f56a70 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java @@ -196,7 +196,8 @@ private SSLFilter installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBu return sslFilter; } - public synchronized void run() { + @Override + public void run() { resetIoConnector(); try { if (connectFuture == null) { From f5e050c907decfdb0bfc27c6be21e9834a7ff59a Mon Sep 17 00:00:00 2001 From: Christoph John Date: Fri, 10 Jan 2020 16:04:30 +0100 Subject: [PATCH 2/6] whitespace change to test LGTM check --- .../java/quickfix/mina/initiator/AbstractSocketInitiator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index 77254f0a7..af5c8df7a 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -80,7 +80,7 @@ protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessi IoBuffer.setUseDirectBuffer(false); } - + private static class QFScheduledReconnectThreadFactory implements ThreadFactory { private static final AtomicInteger COUNTER = new AtomicInteger(1); From f2514215391f64527cff5537a63b243bf88a0c26 Mon Sep 17 00:00:00 2001 From: Christoph John Date: Sun, 5 Apr 2020 23:57:59 +0200 Subject: [PATCH 3/6] fixed typo --- quickfixj-core/src/main/java/quickfix/SessionSettings.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/SessionSettings.java b/quickfixj-core/src/main/java/quickfix/SessionSettings.java index 8c83e8727..8814e4eba 100644 --- a/quickfixj-core/src/main/java/quickfix/SessionSettings.java +++ b/quickfixj-core/src/main/java/quickfix/SessionSettings.java @@ -192,7 +192,6 @@ public Properties getSessionProperties(SessionID sessionID) throws ConfigError { * Returns the defaults for the session-level settings. * * @return the default properties - * @throws ConfigError */ public Properties getDefaultProperties() { try { @@ -250,7 +249,7 @@ public int getInt(String key) throws ConfigError, FieldConvertError { * @param sessionID the session ID * @param key the settings key * @return the long integer value for the setting - * @throws ConfigError configurion error, probably a missing setting. + * @throws ConfigError configuration error, probably a missing setting. * @throws FieldConvertError error during field type conversion. */ public int getInt(SessionID sessionID, String key) throws ConfigError, FieldConvertError { From 8b635cd64fe869681d72821af6418b5cf41a917a Mon Sep 17 00:00:00 2001 From: Christoph John Date: Tue, 7 Apr 2020 15:16:02 +0200 Subject: [PATCH 4/6] - prevent NPE on Logon in Session when not specifying messageFactory in AbstractSessionConnectorBuilder --- .../main/java/quickfix/AbstractSessionConnectorBuilder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java index f5048cffb..2b7e5faa2 100644 --- a/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java +++ b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java @@ -66,7 +66,9 @@ public final Product build() throws ConfigError { if (logFactory == null) { logFactory = new ScreenLogFactory(settings); } - + if (messageFactory == null) { + messageFactory = new DefaultMessageFactory(); + } return doBuild(); } From d55caffc5c8561cf37572f61d6f8b8032feea960 Mon Sep 17 00:00:00 2001 From: Christoph John Date: Wed, 8 Apr 2020 23:02:06 +0200 Subject: [PATCH 5/6] - added possibility to specify number of reconnect threads via builder - by default 3 threads are used - changed executor to be non-static, i.e. each Initiator instance has its own reconnect thread pool - (unless feature is disabled which will use the former behaviour) - to use the former behaviour (i.e. use "QFJ Timer" thread for both reconnections and calls to next()) you should pass 0 as number of reconnect threads - when passing 1 you will end up with a separate thread for reconnections; next() will still be called by "QFJ Timer" thread --- .../main/java/quickfix/SocketInitiator.java | 10 ++++++- .../initiator/AbstractSocketInitiator.java | 30 +++++++++++++++---- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index 3502ae146..f7c7389cb 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -33,7 +33,7 @@ public class SocketInitiator extends AbstractSocketInitiator { private SocketInitiator(Builder builder) throws ConfigError { super(builder.application, builder.messageStoreFactory, builder.settings, - builder.logFactory, builder.messageFactory); + builder.logFactory, builder.messageFactory, builder.numReconnectThreads); if (builder.queueCapacity >= 0) { eventHandlingStrategy @@ -49,9 +49,17 @@ public static Builder newBuilder() { } public static final class Builder extends AbstractSessionConnectorBuilder { + + int numReconnectThreads = 3; + private Builder() { super(Builder.class); } + + public Builder withReconnectThreads(int numReconnectThreads) throws ConfigError { + this.numReconnectThreads = numReconnectThreads; + return this; + } @Override protected SocketInitiator doBuild() throws ConfigError { diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index af5c8df7a..65185fff7 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -54,6 +54,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; /** @@ -63,8 +64,8 @@ public abstract class AbstractSocketInitiator extends SessionConnector implement protected final Logger log = LoggerFactory.getLogger(getClass()); private final Set initiators = new HashSet<>(); - private static final ScheduledExecutorService SCHEDULED_RECONNECT_EXECUTOR = Executors - .newScheduledThreadPool(3, new QFScheduledReconnectThreadFactory()); + private final ScheduledExecutorService scheduledReconnectExecutor; + public static final String QFJ_RECONNECT_THREAD_PREFIX = "QFJ Reconnect Thread-"; protected AbstractSocketInitiator(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, @@ -75,19 +76,37 @@ protected AbstractSocketInitiator(Application application, protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError { + this(settings, sessionFactory, 0); + } + + protected AbstractSocketInitiator(Application application, + MessageStoreFactory messageStoreFactory, SessionSettings settings, + LogFactory logFactory, MessageFactory messageFactory, int numReconnectThreads) throws ConfigError { + this(settings, new DefaultSessionFactory(application, messageStoreFactory, logFactory, + messageFactory), numReconnectThreads); + } + + protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessionFactory, int numReconnectThreads) + throws ConfigError { super(settings, sessionFactory); IoBuffer.setAllocator(new SimpleBufferAllocator()); IoBuffer.setUseDirectBuffer(false); + if (numReconnectThreads > 0) { + scheduledReconnectExecutor = Executors.newScheduledThreadPool(numReconnectThreads, new QFScheduledReconnectThreadFactory()); + ((ThreadPoolExecutor)scheduledReconnectExecutor).setMaximumPoolSize(numReconnectThreads); + } else { + scheduledReconnectExecutor = null; + } } - + // TODO move to end of class private static class QFScheduledReconnectThreadFactory implements ThreadFactory { private static final AtomicInteger COUNTER = new AtomicInteger(1); @Override public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "QFJ ReconnectTask-" + COUNTER.getAndIncrement()); + Thread thread = new Thread(runnable, QFJ_RECONNECT_THREAD_PREFIX + COUNTER.getAndIncrement()); thread.setDaemon(true); return thread; } @@ -164,9 +183,10 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT); } + ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService()); final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session, socketAddresses, localAddress, reconnectingIntervals, - SCHEDULED_RECONNECT_EXECUTOR, networkingOptions, + scheduledExecutorService, networkingOptions, getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig, proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation); From 7e1e5348607661464fd7b2a33ae5723a115b4e1d Mon Sep 17 00:00:00 2001 From: Christoph John Date: Wed, 8 Apr 2020 23:05:36 +0200 Subject: [PATCH 6/6] minor formatting --- .../initiator/AbstractSocketInitiator.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index 65185fff7..0704c9f86 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -93,25 +93,12 @@ protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessi IoBuffer.setUseDirectBuffer(false); if (numReconnectThreads > 0) { scheduledReconnectExecutor = Executors.newScheduledThreadPool(numReconnectThreads, new QFScheduledReconnectThreadFactory()); - ((ThreadPoolExecutor)scheduledReconnectExecutor).setMaximumPoolSize(numReconnectThreads); + ((ThreadPoolExecutor) scheduledReconnectExecutor).setMaximumPoolSize(numReconnectThreads); } else { scheduledReconnectExecutor = null; } } - // TODO move to end of class - private static class QFScheduledReconnectThreadFactory implements ThreadFactory { - - private static final AtomicInteger COUNTER = new AtomicInteger(1); - - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, QFJ_RECONNECT_THREAD_PREFIX + COUNTER.getAndIncrement()); - thread.setDaemon(true); - return thread; - } - } - protected void createSessionInitiators() throws ConfigError { try { @@ -348,4 +335,18 @@ public int getQueueSize() { } protected abstract EventHandlingStrategy getEventHandlingStrategy(); + + + private static class QFScheduledReconnectThreadFactory implements ThreadFactory { + + private static final AtomicInteger COUNTER = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, QFJ_RECONNECT_THREAD_PREFIX + COUNTER.getAndIncrement()); + thread.setDaemon(true); + return thread; + } + } + }