From 1be3cc363fea0404a3b9c25ec74161d79e0ac42a Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Fri, 31 Dec 2021 10:47:14 +0800 Subject: [PATCH] chore(client) change client jdk version to 1.6 --- client/pom.xml | 5 +++ .../rocketmq/client/impl/MQClientAPIImpl.java | 27 ++++++++------ .../consumer/DefaultLitePullConsumerImpl.java | 2 +- .../client/impl/factory/MQClientInstance.java | 8 +++-- .../impl/producer/DefaultMQProducerImpl.java | 33 +++++++++++++---- .../client/producer/RequestFutureHolder.java | 2 +- .../ConsumeMessageOpenTracingHookImpl.java | 2 +- .../consumer/DefaultLitePullConsumerTest.java | 19 +++++----- .../consumer/DefaultMQPushConsumerTest.java | 22 +++++++----- ...ConsumeMessageConcurrentlyServiceTest.java | 2 +- .../producer/DefaultMQProducerTest.java | 2 +- .../DefaultMQConsumerWithOpenTracingTest.java | 30 ++++++++++------ .../trace/DefaultMQConsumerWithTraceTest.java | 7 ++-- .../TransactionMQProducerWithTraceTest.java | 36 +++++++++++-------- 14 files changed, 129 insertions(+), 68 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index 07ba7d1e483..4f6f1959346 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -27,6 +27,11 @@ rocketmq-client rocketmq-client ${project.version} + + 1.6 + 1.6 + + ${project.groupId} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 89c998b7f4e..8c9af52e36c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -397,7 +397,8 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr, } - public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, + public AclConfig getBrokerClusterConfig(final String addr, + final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null); @@ -407,7 +408,7 @@ public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMil case ResponseCode.SUCCESS: { if (response.getBody() != null) { GetBrokerClusterAclConfigResponseBody body = - GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class); + GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class); AclConfig aclConfig = new AclConfig(); aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs()); aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs()); @@ -505,7 +506,7 @@ private SendResult sendMessageSync( ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; - return this.processSendResponse(brokerName, msg, response,addr); + return this.processSendResponse(brokerName, msg, response, addr); } private void sendMessageAsync( @@ -612,7 +613,10 @@ private void onExceptionImpl(final String brokerName, request.setOpaque(RemotingCommand.createNewRequestId()); sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); - } catch (InterruptedException | RemotingTooMuchRequestException e1) { + } catch (InterruptedException e1) { + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + context, false, producer); + } catch (RemotingTooMuchRequestException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { @@ -664,7 +668,7 @@ private SendResult processSendResponse( } SendMessageResponseHeader responseHeader = - (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); + (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); //If namespace not null , reset Topic without namespace. String topic = msg.getTopic(); @@ -683,8 +687,8 @@ private SendResult processSendResponse( uniqMsgId = sb.toString(); } SendResult sendResult = new SendResult(sendStatus, - uniqMsgId, - responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); + uniqMsgId, + responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); @@ -1425,8 +1429,8 @@ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, } public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, final long timeoutMillis) - throws RemotingCommandException, - RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader(); requestHeader.setBrokerName(brokerName); @@ -1437,7 +1441,7 @@ public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, fin switch (response.getCode()) { case ResponseCode.SUCCESS: { AddWritePermOfBrokerResponseHeader responseHeader = - (AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class); + (AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class); return responseHeader.getAddTopicCount(); } default: @@ -1485,7 +1489,8 @@ public void deleteTopicInNameServer(final String addr, final String topic, final throw new MQClientException(response.getCode(), response.getRemark()); } - public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis) + public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, + final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader(); requestHeader.setGroupName(groupName); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index fecf07645ed..c0d6e97e921 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -147,7 +147,7 @@ private enum SubscriptionType { private final MessageQueueLock messageQueueLock = new MessageQueueLock(); - private final ArrayList consumeMessageHookList = new ArrayList<>(); + private final ArrayList consumeMessageHookList = new ArrayList(); public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { this.defaultLitePullConsumer = defaultLitePullConsumer; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 6d35ed63028..eea264b9eff 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -897,8 +897,12 @@ private void unregisterClient(final String producerGroup, final String consumerG try { this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout()); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); - } catch (RemotingException | InterruptedException | MQBrokerException e) { - log.error("unregister client exception from broker: " + addr, e); + } catch (RemotingException e) { + log.warn("unregister client RemotingException from broker: {}, {}", addr, e.getMessage()); + } catch (InterruptedException e) { + log.warn("unregister client InterruptedException from broker: {}, {}", addr, e.getMessage()); + } catch (MQBrokerException e) { + log.warn("unregister client MQBrokerException from broker: {}, {}", addr, e.getMessage()); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index cedbbdb6ca3..f980d9d5d66 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -490,7 +490,7 @@ public void send(Message msg, * * @param msg * @param sendCallback - * @param timeout the sendCallback will be invoked at most time + * @param timeout the sendCallback will be invoked at most time * @throws RejectedExecutionException */ @Deprecated @@ -597,7 +597,14 @@ private SendResult sendDefaultImpl( default: break; } - } catch (RemotingException | MQClientException e) { + } catch (RemotingException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + exception = e; + continue; + } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); @@ -856,7 +863,19 @@ private SendResult sendKernelImpl(final Message msg, } return sendResult; - } catch (RemotingException | MQBrokerException | InterruptedException e) { + } catch (RemotingException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } catch (MQBrokerException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); @@ -969,6 +988,7 @@ public void doExecuteEndTransactionHook(Message msg, String msgId, String broker executeEndTransactionHook(context); } } + /** * DEFAULT ONEWAY ------------------------------------------------------- */ @@ -1021,7 +1041,7 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) * @param msg * @param mq * @param sendCallback - * @param timeout the sendCallback will be invoked at most time + * @param timeout the sendCallback will be invoked at most time * @throws MQClientException * @throws RemotingException * @throws InterruptedException @@ -1151,7 +1171,7 @@ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCal * @param selector * @param arg * @param sendCallback - * @param timeout the sendCallback will be invoked at most time + * @param timeout the sendCallback will be invoked at most time * @throws MQClientException * @throws RemotingException * @throws InterruptedException @@ -1491,7 +1511,8 @@ public void onException(Throwable e) { } } - private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException { + private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, + long cost) throws InterruptedException, RequestTimeoutException, MQClientException { Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); if (responseMessage == null) { if (requestResponseFuture.isSendRequestOk()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java index 8fe9abcc697..df0706f1f8f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java @@ -39,7 +39,7 @@ public class RequestFutureHolder { private static InternalLogger log = ClientLogger.getLog(); private static final RequestFutureHolder INSTANCE = new RequestFutureHolder(); private ConcurrentHashMap requestFutureTable = new ConcurrentHashMap(); - private final Set producerSet = new HashSet<>(); + private final Set producerSet = new HashSet(); private ScheduledExecutorService scheduledExecutorService = null; public ConcurrentHashMap getRequestFutureTable() { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java index 28fccae06f8..fe97c773949 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java @@ -51,7 +51,7 @@ public void consumeMessageBefore(ConsumeMessageContext context) { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { return; } - List spanList = new ArrayList<>(); + List spanList = new ArrayList(); for (MessageExt msg : context.getMsgList()) { if (msg == null) { continue; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 04b760eec22..227ea441719 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.reflect.FieldUtils; @@ -95,7 +96,9 @@ public class DefaultLitePullConsumerTest { @Before public void init() throws Exception { ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); - factoryTable.forEach((s, instance) -> instance.shutdown()); + for (Map.Entry entry : factoryTable.entrySet()) { + entry.getValue().shutdown(); + } factoryTable.clear(); Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); @@ -481,7 +484,7 @@ public void testCheckConfig_Exception() { } @Test - public void testComputePullFromWhereReturnedNotFound() throws Exception{ + public void testComputePullFromWhereReturnedNotFound() throws Exception { DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); MessageQueue messageQueue = createMessageQueue(); @@ -491,7 +494,7 @@ public void testComputePullFromWhereReturnedNotFound() throws Exception{ } @Test - public void testComputePullFromWhereReturned() throws Exception{ + public void testComputePullFromWhereReturned() throws Exception { DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); MessageQueue messageQueue = createMessageQueue(); @@ -500,9 +503,8 @@ public void testComputePullFromWhereReturned() throws Exception{ assertThat(offset).isEqualTo(100); } - @Test - public void testComputePullFromLast() throws Exception{ + public void testComputePullFromLast() throws Exception { DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); MessageQueue messageQueue = createMessageQueue(); @@ -513,13 +515,13 @@ public void testComputePullFromLast() throws Exception{ } @Test - public void testComputePullByTimeStamp() throws Exception{ + public void testComputePullByTimeStamp() throws Exception { DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); defaultLitePullConsumer.setConsumeTimestamp("20191024171201"); MessageQueue messageQueue = createMessageQueue(); when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); - when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L); + when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class), anyLong())).thenReturn(100L); long offset = rebalanceImpl.computePullFromWhere(messageQueue); assertThat(offset).isEqualTo(100); } @@ -660,7 +662,8 @@ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, P return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); } - private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException { + private static void suppressUpdateTopicRouteInfoFromNameServer( + DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException { DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true); if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { litePullConsumer.changeInstanceNameToPID(); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index df87932b2ed..bfc87f13eaa 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -94,7 +95,9 @@ public class DefaultMQPushConsumerTest { @Before public void init() throws Exception { ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); - factoryTable.forEach((s, instance) -> instance.shutdown()); + for (Map.Entry entry : factoryTable.entrySet()) { + entry.getValue().shutdown(); + } factoryTable.clear(); when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class), @@ -117,7 +120,6 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { } }); - consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); @@ -169,7 +171,7 @@ public void testStart_OffsetShouldNotNUllAfterStart() { @Test public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference messageAtomic = new AtomicReference<>(); + final AtomicReference messageAtomic = new AtomicReference(); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, @@ -192,7 +194,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, @Test public void testPullMessage_SuccessWithOrderlyService() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference messageAtomic = new AtomicReference<>(); + final AtomicReference messageAtomic = new AtomicReference(); MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() { @Override @@ -329,11 +331,13 @@ public void testPullMessage_ExceptionOccursWhenComputePullFromWhere() throws MQC final CountDownLatch countDownLatch = new CountDownLatch(1); final MessageExt[] messageExts = new MessageExt[1]; pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService( - new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), - (msgs, context) -> { - messageExts[0] = msgs.get(0); - return null; - })); + new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + messageExts[0] = msgs.get(0); + return null; + } + })); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true); PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java index 6c331db0593..8798c2d9ea0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java @@ -149,7 +149,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { @Test public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference messageAtomic = new AtomicReference<>(); + final AtomicReference messageAtomic = new AtomicReference(); ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index a890e721dcd..5ce761c30f5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -257,7 +257,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { } }; - List msgs = new ArrayList<>(); + List msgs = new ArrayList(); for (int i = 0; i < 5; i++) { Message message = new Message(); message.setTopic("test"); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java index ecf72ae44cf..4864522c66c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java @@ -25,7 +25,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -97,12 +99,14 @@ public class DefaultMQConsumerWithOpenTracingTest { private PullAPIWrapper pullAPIWrapper; private RebalancePushImpl rebalancePushImpl; private DefaultMQPushConsumer pushConsumer; - private MockTracer tracer = new MockTracer(); + private final MockTracer tracer = new MockTracer(); @Before public void init() throws Exception { ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); - factoryTable.forEach((s, instance) -> instance.shutdown()); + for (Map.Entry entry : factoryTable.entrySet()) { + entry.getValue().shutdown(); + } factoryTable.clear(); when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class), @@ -115,7 +119,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { messageClientExt.setTopic(topic); messageClientExt.setQueueId(0); messageClientExt.setMsgId("123"); - messageClientExt.setBody(new byte[]{'a'}); + messageClientExt.setBody(new byte[] {'a'}); messageClientExt.setOffsetMsgId("234"); messageClientExt.setBornHost(new InetSocketAddress(8080)); messageClientExt.setStoreHost(new InetSocketAddress(8080)); @@ -125,11 +129,10 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { } }); - consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( - new ConsumeMessageOpenTracingHookImpl(tracer)); + new ConsumeMessageOpenTracingHookImpl(tracer)); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); @@ -140,7 +143,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { return null; } }); @@ -173,11 +176,11 @@ public void terminate() { @Test public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference messageAtomic = new AtomicReference<>(); + final AtomicReference messageAtomic = new AtomicReference(); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { messageAtomic.set(msgs.get(0)); countDownLatch.countDown(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; @@ -190,10 +193,15 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, MessageExt msg = messageAtomic.get(); assertThat(msg).isNotNull(); assertThat(msg.getTopic()).isEqualTo(topic); - assertThat(msg.getBody()).isEqualTo(new byte[]{'a'}); + assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); // wait until consumeMessageAfter hook of tracer is done surely. - waitAtMost(1, TimeUnit.SECONDS).until(() -> tracer.finishedSpans().size() == 1); + waitAtMost(1, TimeUnit.SECONDS).until(new Callable() { + @Override public Object call() throws Exception { + return tracer.finishedSpans().size() == 1; + } + }); + MockSpan span = tracer.finishedSpans().get(0); assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic); assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER); @@ -219,7 +227,7 @@ private PullRequest createPullRequest() { } private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, - List messageExtList) throws Exception { + List messageExtList) throws Exception { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); for (MessageExt messageExt : messageExtList) { outputStream.write(MessageDecoder.encode(messageExt, false)); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index d142e94d76a..50e4456b868 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -116,7 +117,9 @@ public class DefaultMQConsumerWithTraceTest { @Before public void init() throws Exception { ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); - factoryTable.forEach((s, instance) -> instance.shutdown()); + for (Map.Entry entry : factoryTable.entrySet()) { + entry.getValue().shutdown(); + } factoryTable.clear(); consumerGroup = "FooBarGroup" + System.currentTimeMillis(); @@ -217,7 +220,7 @@ public void testPullMessage_WithTrace_Success() throws InterruptedException, Rem traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference messageAtomic = new AtomicReference<>(); + final AtomicReference messageAtomic = new AtomicReference(); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java index 02289505d9a..3454bf078cc 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java @@ -17,6 +17,12 @@ package org.apache.rocketmq.client.trace; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,17 +58,16 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; - -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; +import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -127,7 +132,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList"); fieldHooks.setAccessible(true); - Listhooks = new ArrayList<>(); + List hooks = new ArrayList(); hooks.add(endTransactionHook); fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks); @@ -143,11 +148,14 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - AtomicReference context = new AtomicReference<>(); - doAnswer(mock -> { - context.set(mock.getArgument(0)); - return null; - }).when(endTransactionHook).endTransaction(any()); + final AtomicReference context = new AtomicReference(); + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock mock) throws Throwable { + context.set((EndTransactionContext) mock.getArgument(0)); + return null; + } + + }).when(endTransactionHook).endTransaction(any(EndTransactionContext.class)); producer.sendMessageInTransaction(message, null); EndTransactionContext ctx = context.get();