diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index a541b2d379c..b90a5b7908e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.MQProducerInner; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.client.producer.RequestFutureTable; +import org.apache.rocketmq.client.producer.RequestFutureHolder; import org.apache.rocketmq.client.producer.RequestResponseFuture; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; @@ -274,11 +274,11 @@ private RemotingCommand receiveReplyMessage(ChannelHandlerContext ctx, private void processReplyMessage(MessageExt replyMsg) { final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID); - final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId); + final RequestResponseFuture requestResponseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().get(correlationId); if (requestResponseFuture != null) { requestResponseFuture.putResponseMessage(replyMsg); - RequestFutureTable.getRequestFutureTable().remove(correlationId); + RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); if (requestResponseFuture.getRequestCallback() != null) { requestResponseFuture.getRequestCallback().onSuccess(replyMsg); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index bdc103f1328..11e24f13005 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -28,10 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -58,7 +56,7 @@ import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.RequestCallback; -import org.apache.rocketmq.client.producer.RequestFutureTable; +import org.apache.rocketmq.client.producer.RequestFutureHolder; import org.apache.rocketmq.client.producer.RequestResponseFuture; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; @@ -106,12 +104,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final RPCHook rpcHook; private final BlockingQueue asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "RequestHouseKeepingService"); - } - }); protected BlockingQueue checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; @@ -236,12 +228,12 @@ public void start(final boolean startFactory) throws MQClientException { } private void startScheduledTask() { - if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + if (RequestFutureHolder.getInstance().getProducerNum().incrementAndGet() == 1) { + RequestFutureHolder.getInstance().getScheduledExecutorService().scheduleAtFixedRate(new Runnable() { @Override public void run() { try { - RequestFutureTable.scanExpiredRequest(); + RequestFutureHolder.getInstance().scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } @@ -277,8 +269,8 @@ public void shutdown(final boolean shutdownFactory) { if (shutdownFactory) { this.mQClientFactory.shutdown(); } - if (RequestFutureTable.getProducerNum().decrementAndGet() == 0) { - scheduledExecutorService.shutdown(); + if (RequestFutureHolder.getInstance().getProducerNum().decrementAndGet() == 0) { + RequestFutureHolder.getInstance().getScheduledExecutorService().shutdown(); } log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; @@ -1399,7 +1391,7 @@ public Message request(Message msg, try { final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null); - RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); + RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @@ -1418,7 +1410,7 @@ public void onException(Throwable e) { return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { - RequestFutureTable.getRequestFutureTable().remove(correlationId); + RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); } } @@ -1429,7 +1421,7 @@ public void request(Message msg, final RequestCallback requestCallback, long tim final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); + RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @@ -1455,7 +1447,7 @@ public Message request(final Message msg, final MessageQueueSelector selector, f try { final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null); - RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); + RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @@ -1474,7 +1466,7 @@ public void onException(Throwable e) { return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { - RequestFutureTable.getRequestFutureTable().remove(correlationId); + RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); } } @@ -1486,7 +1478,7 @@ public void request(final Message msg, final MessageQueueSelector selector, fina final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); + RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @@ -1512,7 +1504,7 @@ public Message request(final Message msg, final MessageQueue mq, final long time try { final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null); - RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); + RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @@ -1531,7 +1523,7 @@ public void onException(Throwable e) { return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { - RequestFutureTable.getRequestFutureTable().remove(correlationId); + RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); } } @@ -1555,7 +1547,7 @@ public void request(final Message msg, final MessageQueue mq, final RequestCallb final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); + RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @@ -1573,7 +1565,7 @@ public void onException(Throwable e) { } private void requestFail(final String correlationId) { - RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId); + RequestResponseFuture responseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); if (responseFuture != null) { responseFuture.setSendRequestOk(false); responseFuture.putResponseMessage(null); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java similarity index 66% rename from client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java rename to client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java index 9bdc3da45ef..24b3a906a56 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java @@ -22,6 +22,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -29,16 +32,24 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.logging.InternalLogger; -public class RequestFutureTable { +public class RequestFutureHolder { private static InternalLogger log = ClientLogger.getLog(); - private static ConcurrentHashMap requestFutureTable = new ConcurrentHashMap(); - private static final AtomicInteger PRODUCER_NUM = new AtomicInteger(0); + private static final RequestFutureHolder INSTANCE = new RequestFutureHolder(); + private ConcurrentHashMap requestFutureTable = new ConcurrentHashMap(); + private final AtomicInteger producerNum = new AtomicInteger(0); + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RequestHouseKeepingService"); + } + }); - public static ConcurrentHashMap getRequestFutureTable() { + public ConcurrentHashMap getRequestFutureTable() { return requestFutureTable; } - public static void scanExpiredRequest() { + public void scanExpiredRequest() { final List rfList = new LinkedList(); Iterator> it = requestFutureTable.entrySet().iterator(); while (it.hasNext()) { @@ -63,7 +74,18 @@ public static void scanExpiredRequest() { } } - public static AtomicInteger getProducerNum() { - return PRODUCER_NUM; + private RequestFutureHolder() { + } + + public AtomicInteger getProducerNum() { + return producerNum; + } + + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutorService; + } + + public static RequestFutureHolder getInstance() { + return INSTANCE; } } diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 5f29fe113c5..7347c2ab24e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -48,7 +48,6 @@ import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.junit.After; import org.junit.Before; @@ -56,9 +55,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Spy; -import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -372,7 +369,7 @@ public void testRequestMessage() throws RemotingException, RequestTimeoutExcepti final AtomicBoolean finish = new AtomicBoolean(false); new Thread(new Runnable() { @Override public void run() { - ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable(); + ConcurrentHashMap responseMap = RequestFutureHolder.getInstance().getRequestFutureTable(); assertThat(responseMap).isNotNull(); while (!finish.get()) { try { @@ -414,7 +411,7 @@ public void testAsyncRequest_OnSuccess() throws Exception { } }; producer.request(message, requestCallback, 3 * 1000L); - ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable(); + ConcurrentHashMap responseMap = RequestFutureHolder.getInstance().getRequestFutureTable(); assertThat(responseMap).isNotNull(); for (Map.Entry entry : responseMap.entrySet()) { RequestResponseFuture future = entry.getValue(); @@ -450,7 +447,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { producer.request(message, requestCallback, 3 * 1000L); failBecauseExceptionWasNotThrown(Exception.class); } catch (Exception e) { - ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable(); + ConcurrentHashMap responseMap = RequestFutureHolder.getInstance().getRequestFutureTable(); assertThat(responseMap).isNotNull(); for (Map.Entry entry : responseMap.entrySet()) { RequestResponseFuture future = entry.getValue();