diff --git a/client/pom.xml b/client/pom.xml
index 07ba7d1e483..4f6f1959346 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -27,6 +27,11 @@
rocketmq-client
rocketmq-client ${project.version}
+
+ 1.6
+ 1.6
+
+
${project.groupId}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 89c998b7f4e..8c9af52e36c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -397,7 +397,8 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,
}
- public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+ public AclConfig getBrokerClusterConfig(final String addr,
+ final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);
@@ -407,7 +408,7 @@ public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMil
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
- GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
+ GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
@@ -505,7 +506,7 @@ private SendResult sendMessageSync(
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
- return this.processSendResponse(brokerName, msg, response,addr);
+ return this.processSendResponse(brokerName, msg, response, addr);
}
private void sendMessageAsync(
@@ -612,7 +613,10 @@ private void onExceptionImpl(final String brokerName,
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
- } catch (InterruptedException | RemotingTooMuchRequestException e1) {
+ } catch (InterruptedException e1) {
+ onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+ context, false, producer);
+ } catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
@@ -664,7 +668,7 @@ private SendResult processSendResponse(
}
SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
@@ -683,8 +687,8 @@ private SendResult processSendResponse(
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
- uniqMsgId,
- responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+ uniqMsgId,
+ responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -1425,8 +1429,8 @@ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
}
public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, final long timeoutMillis)
- throws RemotingCommandException,
- RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+ throws RemotingCommandException,
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);
@@ -1437,7 +1441,7 @@ public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, fin
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
AddWritePermOfBrokerResponseHeader responseHeader =
- (AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
+ (AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
return responseHeader.getAddTopicCount();
}
default:
@@ -1485,7 +1489,8 @@ public void deleteTopicInNameServer(final String addr, final String topic, final
throw new MQClientException(response.getCode(), response.getRemark());
}
- public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis)
+ public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset,
+ final long timeoutMillis)
throws RemotingException, InterruptedException, MQClientException {
DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
requestHeader.setGroupName(groupName);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index fecf07645ed..c0d6e97e921 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -147,7 +147,7 @@ private enum SubscriptionType {
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
- private final ArrayList consumeMessageHookList = new ArrayList<>();
+ private final ArrayList consumeMessageHookList = new ArrayList();
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 6d35ed63028..eea264b9eff 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -897,8 +897,12 @@ private void unregisterClient(final String producerGroup, final String consumerG
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
- } catch (RemotingException | InterruptedException | MQBrokerException e) {
- log.error("unregister client exception from broker: " + addr, e);
+ } catch (RemotingException e) {
+ log.warn("unregister client RemotingException from broker: {}, {}", addr, e.getMessage());
+ } catch (InterruptedException e) {
+ log.warn("unregister client InterruptedException from broker: {}, {}", addr, e.getMessage());
+ } catch (MQBrokerException e) {
+ log.warn("unregister client MQBrokerException from broker: {}, {}", addr, e.getMessage());
}
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index cedbbdb6ca3..f980d9d5d66 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -490,7 +490,7 @@ public void send(Message msg,
*
* @param msg
* @param sendCallback
- * @param timeout the sendCallback
will be invoked at most time
+ * @param timeout the sendCallback
will be invoked at most time
* @throws RejectedExecutionException
*/
@Deprecated
@@ -597,7 +597,14 @@ private SendResult sendDefaultImpl(
default:
break;
}
- } catch (RemotingException | MQClientException e) {
+ } catch (RemotingException e) {
+ endTimestamp = System.currentTimeMillis();
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+ log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+ log.warn(msg.toString());
+ exception = e;
+ continue;
+ } catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
@@ -856,7 +863,19 @@ private SendResult sendKernelImpl(final Message msg,
}
return sendResult;
- } catch (RemotingException | MQBrokerException | InterruptedException e) {
+ } catch (RemotingException e) {
+ if (this.hasSendMessageHook()) {
+ context.setException(e);
+ this.executeSendMessageHookAfter(context);
+ }
+ throw e;
+ } catch (MQBrokerException e) {
+ if (this.hasSendMessageHook()) {
+ context.setException(e);
+ this.executeSendMessageHookAfter(context);
+ }
+ throw e;
+ } catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
@@ -969,6 +988,7 @@ public void doExecuteEndTransactionHook(Message msg, String msgId, String broker
executeEndTransactionHook(context);
}
}
+
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
@@ -1021,7 +1041,7 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
* @param msg
* @param mq
* @param sendCallback
- * @param timeout the sendCallback
will be invoked at most time
+ * @param timeout the sendCallback
will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
@@ -1151,7 +1171,7 @@ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCal
* @param selector
* @param arg
* @param sendCallback
- * @param timeout the sendCallback
will be invoked at most time
+ * @param timeout the sendCallback
will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
@@ -1491,7 +1511,8 @@ public void onException(Throwable e) {
}
}
- private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
+ private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture,
+ long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
index 8fe9abcc697..df0706f1f8f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
@@ -39,7 +39,7 @@ public class RequestFutureHolder {
private static InternalLogger log = ClientLogger.getLog();
private static final RequestFutureHolder INSTANCE = new RequestFutureHolder();
private ConcurrentHashMap requestFutureTable = new ConcurrentHashMap();
- private final Set producerSet = new HashSet<>();
+ private final Set producerSet = new HashSet();
private ScheduledExecutorService scheduledExecutorService = null;
public ConcurrentHashMap getRequestFutureTable() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
index 28fccae06f8..fe97c773949 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
@@ -51,7 +51,7 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
- List spanList = new ArrayList<>();
+ List spanList = new ArrayList();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 04b760eec22..227ea441719 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
@@ -95,7 +96,9 @@ public class DefaultLitePullConsumerTest {
@Before
public void init() throws Exception {
ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
- factoryTable.forEach((s, instance) -> instance.shutdown());
+ for (Map.Entry entry : factoryTable.entrySet()) {
+ entry.getValue().shutdown();
+ }
factoryTable.clear();
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
@@ -481,7 +484,7 @@ public void testCheckConfig_Exception() {
}
@Test
- public void testComputePullFromWhereReturnedNotFound() throws Exception{
+ public void testComputePullFromWhereReturnedNotFound() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
@@ -491,7 +494,7 @@ public void testComputePullFromWhereReturnedNotFound() throws Exception{
}
@Test
- public void testComputePullFromWhereReturned() throws Exception{
+ public void testComputePullFromWhereReturned() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
@@ -500,9 +503,8 @@ public void testComputePullFromWhereReturned() throws Exception{
assertThat(offset).isEqualTo(100);
}
-
@Test
- public void testComputePullFromLast() throws Exception{
+ public void testComputePullFromLast() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
@@ -513,13 +515,13 @@ public void testComputePullFromLast() throws Exception{
}
@Test
- public void testComputePullByTimeStamp() throws Exception{
+ public void testComputePullByTimeStamp() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
MessageQueue messageQueue = createMessageQueue();
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
- when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L);
+ when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class), anyLong())).thenReturn(100L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(100);
}
@@ -660,7 +662,8 @@ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, P
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
- private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
+ private static void suppressUpdateTopicRouteInfoFromNameServer(
+ DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index df87932b2ed..bfc87f13eaa 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -94,7 +95,9 @@ public class DefaultMQPushConsumerTest {
@Before
public void init() throws Exception {
ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
- factoryTable.forEach((s, instance) -> instance.shutdown());
+ for (Map.Entry entry : factoryTable.entrySet()) {
+ entry.getValue().shutdown();
+ }
factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
@@ -117,7 +120,6 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
}
});
-
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -169,7 +171,7 @@ public void testStart_OffsetShouldNotNUllAfterStart() {
@Test
public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final AtomicReference messageAtomic = new AtomicReference<>();
+ final AtomicReference messageAtomic = new AtomicReference();
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
@@ -192,7 +194,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs,
@Test
public void testPullMessage_SuccessWithOrderlyService() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final AtomicReference messageAtomic = new AtomicReference<>();
+ final AtomicReference messageAtomic = new AtomicReference();
MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
@Override
@@ -329,11 +331,13 @@ public void testPullMessage_ExceptionOccursWhenComputePullFromWhere() throws MQC
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(
- new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(),
- (msgs, context) -> {
- messageExts[0] = msgs.get(0);
- return null;
- }));
+ new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
+ @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs,
+ ConsumeConcurrentlyContext context) {
+ messageExts[0] = msgs.get(0);
+ return null;
+ }
+ }));
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true);
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 6c331db0593..8798c2d9ea0 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -149,7 +149,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
@Test
public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final AtomicReference messageAtomic = new AtomicReference<>();
+ final AtomicReference messageAtomic = new AtomicReference();
ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index a890e721dcd..5ce761c30f5 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -257,7 +257,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) {
}
};
- List msgs = new ArrayList<>();
+ List msgs = new ArrayList();
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setTopic("test");
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
index ecf72ae44cf..4864522c66c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -25,7 +25,9 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -97,12 +99,14 @@ public class DefaultMQConsumerWithOpenTracingTest {
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
- private MockTracer tracer = new MockTracer();
+ private final MockTracer tracer = new MockTracer();
@Before
public void init() throws Exception {
ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
- factoryTable.forEach((s, instance) -> instance.shutdown());
+ for (Map.Entry entry : factoryTable.entrySet()) {
+ entry.getValue().shutdown();
+ }
factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
@@ -115,7 +119,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
- messageClientExt.setBody(new byte[]{'a'});
+ messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
@@ -125,11 +129,10 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
}
});
-
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
- new ConsumeMessageOpenTracingHookImpl(tracer));
+ new ConsumeMessageOpenTracingHookImpl(tracer));
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
@@ -140,7 +143,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
return null;
}
});
@@ -173,11 +176,11 @@ public void terminate() {
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final AtomicReference messageAtomic = new AtomicReference<>();
+ final AtomicReference messageAtomic = new AtomicReference();
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
messageAtomic.set(msgs.get(0));
countDownLatch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
@@ -190,10 +193,15 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs,
MessageExt msg = messageAtomic.get();
assertThat(msg).isNotNull();
assertThat(msg.getTopic()).isEqualTo(topic);
- assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
+ assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
// wait until consumeMessageAfter hook of tracer is done surely.
- waitAtMost(1, TimeUnit.SECONDS).until(() -> tracer.finishedSpans().size() == 1);
+ waitAtMost(1, TimeUnit.SECONDS).until(new Callable() {
+ @Override public Object call() throws Exception {
+ return tracer.finishedSpans().size() == 1;
+ }
+ });
+
MockSpan span = tracer.finishedSpans().get(0);
assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER);
@@ -219,7 +227,7 @@ private PullRequest createPullRequest() {
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
- List messageExtList) throws Exception {
+ List messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index d142e94d76a..50e4456b868 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -116,7 +117,9 @@ public class DefaultMQConsumerWithTraceTest {
@Before
public void init() throws Exception {
ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
- factoryTable.forEach((s, instance) -> instance.shutdown());
+ for (Map.Entry entry : factoryTable.entrySet()) {
+ entry.getValue().shutdown();
+ }
factoryTable.clear();
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
@@ -217,7 +220,7 @@ public void testPullMessage_WithTrace_Success() throws InterruptedException, Rem
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final AtomicReference messageAtomic = new AtomicReference<>();
+ final AtomicReference messageAtomic = new AtomicReference();
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
index 02289505d9a..3454bf078cc 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -17,6 +17,12 @@
package org.apache.rocketmq.client.trace;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,17 +58,16 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
-
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@@ -127,7 +132,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
fieldHooks.setAccessible(true);
- Listhooks = new ArrayList<>();
+ List hooks = new ArrayList();
hooks.add(endTransactionHook);
fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
@@ -143,11 +148,14 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
- AtomicReference context = new AtomicReference<>();
- doAnswer(mock -> {
- context.set(mock.getArgument(0));
- return null;
- }).when(endTransactionHook).endTransaction(any());
+ final AtomicReference context = new AtomicReference();
+ doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock mock) throws Throwable {
+ context.set((EndTransactionContext) mock.getArgument(0));
+ return null;
+ }
+
+ }).when(endTransactionHook).endTransaction(any(EndTransactionContext.class));
producer.sendMessageInTransaction(message, null);
EndTransactionContext ctx = context.get();