Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestFutureHolder;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand Down Expand Up @@ -274,11 +274,11 @@ private RemotingCommand receiveReplyMessage(ChannelHandlerContext ctx,

private void processReplyMessage(MessageExt replyMsg) {
final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);
final RequestResponseFuture requestResponseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().get(correlationId);
if (requestResponseFuture != null) {
requestResponseFuture.putResponseMessage(replyMsg);

RequestFutureTable.getRequestFutureTable().remove(correlationId);
RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);

if (requestResponseFuture.getRequestCallback() != null) {
requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -58,7 +56,7 @@
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestFutureHolder;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
Expand Down Expand Up @@ -106,12 +104,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RequestHouseKeepingService");
}
});
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST;
Expand Down Expand Up @@ -236,12 +228,12 @@ public void start(final boolean startFactory) throws MQClientException {
}

private void startScheduledTask() {
if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
if (RequestFutureHolder.getInstance().getProducerNum().incrementAndGet() == 1) {
RequestFutureHolder.getInstance().getScheduledExecutorService().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
RequestFutureHolder.getInstance().scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
Expand Down Expand Up @@ -277,8 +269,8 @@ public void shutdown(final boolean shutdownFactory) {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
if (RequestFutureTable.getProducerNum().decrementAndGet() == 0) {
scheduledExecutorService.shutdown();
if (RequestFutureHolder.getInstance().getProducerNum().decrementAndGet() == 0) {
RequestFutureHolder.getInstance().getScheduledExecutorService().shutdown();
}
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
Expand Down Expand Up @@ -1399,7 +1391,7 @@ public Message request(Message msg,

try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
Expand All @@ -1418,7 +1410,7 @@ public void onException(Throwable e) {

return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
}
}

Expand All @@ -1429,7 +1421,7 @@ public void request(Message msg, final RequestCallback requestCallback, long tim
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
Expand All @@ -1455,7 +1447,7 @@ public Message request(final Message msg, final MessageQueueSelector selector, f

try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
Expand All @@ -1474,7 +1466,7 @@ public void onException(Throwable e) {

return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
}
}

Expand All @@ -1486,7 +1478,7 @@ public void request(final Message msg, final MessageQueueSelector selector, fina
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
Expand All @@ -1512,7 +1504,7 @@ public Message request(final Message msg, final MessageQueue mq, final long time

try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
Expand All @@ -1531,7 +1523,7 @@ public void onException(Throwable e) {

return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
}
}

Expand All @@ -1555,7 +1547,7 @@ public void request(final Message msg, final MessageQueue mq, final RequestCallb
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
Expand All @@ -1573,7 +1565,7 @@ public void onException(Throwable e) {
}

private void requestFail(final String correlationId) {
RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId);
RequestResponseFuture responseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
if (responseFuture != null) {
responseFuture.setSendRequestOk(false);
responseFuture.putResponseMessage(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,34 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;

public class RequestFutureTable {
public class RequestFutureHolder {
private static InternalLogger log = ClientLogger.getLog();
private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
private static final AtomicInteger PRODUCER_NUM = new AtomicInteger(0);
private static final RequestFutureHolder INSTANCE = new RequestFutureHolder();
private ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
private final AtomicInteger producerNum = new AtomicInteger(0);
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RequestHouseKeepingService");
}
});

public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
public ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
return requestFutureTable;
}

public static void scanExpiredRequest() {
public void scanExpiredRequest() {
final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
while (it.hasNext()) {
Expand All @@ -63,7 +74,18 @@ public static void scanExpiredRequest() {
}
}

public static AtomicInteger getProducerNum() {
return PRODUCER_NUM;
private RequestFutureHolder() {
}

public AtomicInteger getProducerNum() {
return producerNum;
}

public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}

public static RequestFutureHolder getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,14 @@
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
Expand Down Expand Up @@ -372,7 +369,7 @@ public void testRequestMessage() throws RemotingException, RequestTimeoutExcepti
final AtomicBoolean finish = new AtomicBoolean(false);
new Thread(new Runnable() {
@Override public void run() {
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureHolder.getInstance().getRequestFutureTable();
assertThat(responseMap).isNotNull();
while (!finish.get()) {
try {
Expand Down Expand Up @@ -414,7 +411,7 @@ public void testAsyncRequest_OnSuccess() throws Exception {
}
};
producer.request(message, requestCallback, 3 * 1000L);
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureHolder.getInstance().getRequestFutureTable();
assertThat(responseMap).isNotNull();
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
Expand Down Expand Up @@ -450,7 +447,7 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
producer.request(message, requestCallback, 3 * 1000L);
failBecauseExceptionWasNotThrown(Exception.class);
} catch (Exception e) {
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureHolder.getInstance().getRequestFutureTable();
assertThat(responseMap).isNotNull();
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
Expand Down