From 1c5669bd04dea5bff9a46ff9b7ecdffbb7725b23 Mon Sep 17 00:00:00 2001 From: CharliePu Date: Wed, 9 Dec 2020 18:37:44 +0800 Subject: [PATCH 1/7] Support the expansion of metrics data statistics in RocketMQ-Spring. --- .../springboot/AtomicLongMetricExtension.java | 45 +++++++++++ ...che.rocketmq.spring.metric.MetricExtension | 1 + .../springboot/AtomicLongMetricExtension.java | 45 +++++++++++ ...che.rocketmq.spring.metric.MetricExtension | 1 + .../DefaultLitePullConsumerWithTopic.java | 42 ++++++++++ .../spring/core/RocketMQTemplate.java | 19 ++++- .../rocketmq/spring/metric/EConsumerMode.java | 29 +++++++ .../spring/metric/MetricExtension.java | 38 +++++++++ .../metric/MetricExtensionProvider.java | 81 +++++++++++++++++++ .../DefaultRocketMQListenerContainer.java | 3 + .../rocketmq/spring/support/RocketMQUtil.java | 8 +- .../spring/metric/MetricExtensionTest.java | 40 +++++++++ ...che.rocketmq.spring.metric.MetricExtension | 1 + 13 files changed, 349 insertions(+), 4 deletions(-) create mode 100644 rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java create mode 100644 rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension create mode 100644 rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java create mode 100644 rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/DefaultLitePullConsumerWithTopic.java create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java create mode 100644 rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java create mode 100644 rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java new file mode 100644 index 00000000..24f13f88 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.samples.springboot; + +import org.apache.rocketmq.spring.metric.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtension; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class AtomicLongMetricExtension implements MetricExtension { + + private final Map producerMessageCountMap = new ConcurrentHashMap<>(); + private final Map consumerMessageCountMap = new ConcurrentHashMap<>(); + + @Override + public void addProducerMessageCount(String topic, int count) { + AtomicLong atomicLong = producerMessageCountMap.computeIfAbsent(topic, t -> new AtomicLong()); + System.out.printf("The count of producer messages for %s is %d.%n", topic, atomicLong.addAndGet(count)); + } + + @Override + public void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { + String key = topic + "_" + consumerMode.name(); + AtomicLong atomicLong = consumerMessageCountMap.computeIfAbsent(key, t -> new AtomicLong()); + System.out.printf("The count of %s consumer messages for %s is %d.%n" + , consumerMode.name(), topic, atomicLong.addAndGet(count)); + } +} diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension new file mode 100644 index 00000000..ba1a24d1 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension @@ -0,0 +1 @@ +org.apache.rocketmq.samples.springboot.AtomicLongMetricExtension \ No newline at end of file diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java new file mode 100644 index 00000000..24f13f88 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.samples.springboot; + +import org.apache.rocketmq.spring.metric.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtension; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class AtomicLongMetricExtension implements MetricExtension { + + private final Map producerMessageCountMap = new ConcurrentHashMap<>(); + private final Map consumerMessageCountMap = new ConcurrentHashMap<>(); + + @Override + public void addProducerMessageCount(String topic, int count) { + AtomicLong atomicLong = producerMessageCountMap.computeIfAbsent(topic, t -> new AtomicLong()); + System.out.printf("The count of producer messages for %s is %d.%n", topic, atomicLong.addAndGet(count)); + } + + @Override + public void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { + String key = topic + "_" + consumerMode.name(); + AtomicLong atomicLong = consumerMessageCountMap.computeIfAbsent(key, t -> new AtomicLong()); + System.out.printf("The count of %s consumer messages for %s is %d.%n" + , consumerMode.name(), topic, atomicLong.addAndGet(count)); + } +} diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension new file mode 100644 index 00000000..ba1a24d1 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension @@ -0,0 +1 @@ +org.apache.rocketmq.samples.springboot.AtomicLongMetricExtension \ No newline at end of file diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/DefaultLitePullConsumerWithTopic.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/DefaultLitePullConsumerWithTopic.java new file mode 100644 index 00000000..ddf92165 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/DefaultLitePullConsumerWithTopic.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.core; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.remoting.RPCHook; + +public class DefaultLitePullConsumerWithTopic extends DefaultLitePullConsumer { + + private String topic; + + public DefaultLitePullConsumerWithTopic(final String consumerGroup) { + super(consumerGroup); + } + + public DefaultLitePullConsumerWithTopic(final String consumerGroup, RPCHook rpcHook) { + super(consumerGroup, rpcHook); + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index dd6a4d8d..a7e77c6f 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -31,6 +31,8 @@ import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.metric.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtensionProvider; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; @@ -239,6 +241,7 @@ public T sendAndReceive(String destination, Message message, Type type, S if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); MessageExt replyMessage; if (Objects.isNull(hashKey) || hashKey.isEmpty()) { @@ -444,6 +447,7 @@ public void sendAndReceive(String destination, Message message, } }; } + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); if (Objects.isNull(hashKey) || hashKey.isEmpty()) { producer.request(rocketMsg, requestCallback, timeout); } else { @@ -514,7 +518,7 @@ public SendResult syncSend(String destination, Collection try { long now = System.currentTimeMillis(); - Collection rmqMsgs = new ArrayList<>(); + List rmqMsgs = new ArrayList<>(); for (Message msg : messages) { if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) { log.warn("Found a message empty in the batch, skip it"); @@ -523,6 +527,10 @@ public SendResult syncSend(String destination, Collection rmqMsgs.add(this.createRocketMqMessage(destination, msg)); } + if(!rmqMsgs.isEmpty()){ + MetricExtensionProvider.addProducerMessageCount(rmqMsgs.get(0).getTopic(), rmqMsgs.size()); + } + SendResult sendResult = producer.send(rmqMsgs, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -555,6 +563,7 @@ public SendResult syncSend(String destination, Message message, long timeout, if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); SendResult sendResult = producer.send(rocketMsg, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -620,6 +629,7 @@ public SendResult syncSendOrderly(String destination, Message message, String try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -726,6 +736,7 @@ public void asyncSend(String destination, Message message, SendCallback sendC if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); producer.send(rocketMsg, sendCallback, timeout); } catch (Exception e) { log.info("asyncSend failed. destination:{}, message:{} ", destination, message); @@ -805,6 +816,7 @@ public void asyncSendOrderly(String destination, Message message, String hash } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); } catch (Exception e) { log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); @@ -867,6 +879,7 @@ public void sendOneWay(String destination, Message message) { } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); producer.sendOneway(rocketMsg); } catch (Exception e) { log.error("sendOneWay failed. destination:{}, message:{} ", destination, message); @@ -899,6 +912,7 @@ public void sendOneWayOrderly(String destination, Message message, String has } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); } catch (Exception e) { log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); @@ -973,6 +987,7 @@ public TransactionSendResult sendMessageInTransaction(final String destination, throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener"); } org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); return producer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); @@ -1082,6 +1097,8 @@ public List receive(Class clazz, long timeout) { for (MessageExt messageExt : messageExts) { list.add(doConvertMessage(messageExt, clazz)); } + MetricExtensionProvider.addConsumerMessageCount( + ((DefaultLitePullConsumerWithTopic) consumer).getTopic(), list.size(), EConsumerMode.Pull); return list; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java new file mode 100644 index 00000000..ee2133e6 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.metric; + +public enum EConsumerMode { + /** + * pull mode + */ + Pull, + /** + * push mode + */ + Push, +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java new file mode 100644 index 00000000..e37108b6 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.metric; + +public interface MetricExtension { + + /** + * Add current count of message from the producer. + * + * @param topic topic name + * @param count count of message + */ + void addProducerMessageCount(String topic, int count); + + /** + * Add current count of message from the consumer. + * + * @param topic topic name + * @param count count of message + * @param consumerMode consumer mode + */ + void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode); +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java new file mode 100644 index 00000000..d5a53a17 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.metric; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; + +public class MetricExtensionProvider { + + private final static Logger log = LoggerFactory.getLogger(MetricExtensionProvider.class); + + private static List metricExtensions = new ArrayList<>(); + + static { + resolveInstance(); + } + + private static void resolveInstance() { + try { + ServiceLoader serviceLoader = ServiceLoader.load(MetricExtension.class); + for (MetricExtension spi : serviceLoader) { + metricExtensions.add(spi); + } + log.info("[MetricExtensionProvider] MetricExtension resolved, size=" + metricExtensions.size()); + } catch (Throwable t) { + log.warn("[MetricExtensionProvider] WARN: MetricExtension resolve failure"); + } + } + + /** + * Get all metric extensions. DO NOT MODIFY the returned list, use {@link #addMetricExtension(MetricExtension)}. + * + * @return all metric extensions. + */ + public static List getMetricExtensions() { + return metricExtensions; + } + + /** + * Add metric extension. + *

+ * Not that this method is NOT thread safe. + *

+ * + * @param metricExtension the metric extension to add. + */ + public static void addMetricExtension(MetricExtension metricExtension) { + metricExtensions.add(metricExtension); + } + + public static void addProducerMessageCount(String topic, int count) { + for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { + m.addProducerMessageCount(topic, count); + } + } + + public static void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { + for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { + m.addConsumerMessageCount(topic, count, consumerMode); + } + } +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index b6705db2..860a6877 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -48,6 +48,8 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; +import org.apache.rocketmq.spring.metric.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtensionProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -392,6 +394,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderly private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { + MetricExtensionProvider.addConsumerMessageCount(messageExt.getTopic(), 1, EConsumerMode.Push); if (rocketMQListener != null) { rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java index 381d9365..dae77e0f 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.DefaultLitePullConsumerWithTopic; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.slf4j.Logger; @@ -296,13 +297,14 @@ public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameS String groupName, String topicName, MessageModel messageModel, SelectorType selectorType, String selectorExpression, String ak, String sk, int pullBatchSize) throws MQClientException { - DefaultLitePullConsumer litePullConsumer = null; + DefaultLitePullConsumerWithTopic litePullConsumer = null; if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { - litePullConsumer = new DefaultLitePullConsumer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk))); + litePullConsumer = new DefaultLitePullConsumerWithTopic(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk))); litePullConsumer.setVipChannelEnabled(false); } else { - litePullConsumer = new DefaultLitePullConsumer(groupName); + litePullConsumer = new DefaultLitePullConsumerWithTopic(groupName); } + litePullConsumer.setTopic(topicName); litePullConsumer.setNamesrvAddr(nameServer); litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer)); litePullConsumer.setPullBatchSize(pullBatchSize); diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java new file mode 100644 index 00000000..4284fadc --- /dev/null +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java @@ -0,0 +1,40 @@ +package org.apache.rocketmq.spring.metric; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class MetricExtensionTest implements MetricExtension { + + private static String TOPIC; + private static int COUNT; + private static EConsumerMode CONSUMER_MODE; + + @Override + public void addProducerMessageCount(String topic, int count) { + TOPIC = topic; + COUNT = count; + } + + @Override + public void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { + TOPIC = topic; + COUNT = count; + CONSUMER_MODE = consumerMode; + } + + @Test + public void testAddProducerMessageCount() { + MetricExtensionProvider.addProducerMessageCount("topic1", 111); + assertEquals("topic1", TOPIC); + assertEquals(111, COUNT); + } + + @Test + public void testAddConsumerMessageCount() { + MetricExtensionProvider.addConsumerMessageCount("topic2", 222, EConsumerMode.Push); + assertEquals("topic2", TOPIC); + assertEquals(222, COUNT); + assertEquals(EConsumerMode.Push, CONSUMER_MODE); + } +} diff --git a/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension b/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension new file mode 100644 index 00000000..0e69187c --- /dev/null +++ b/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension @@ -0,0 +1 @@ +org.apache.rocketmq.spring.metric.MetricExtensionTest \ No newline at end of file From 8b070b631124dc4063bf6094791369db139c1386 Mon Sep 17 00:00:00 2001 From: CharliePu Date: Wed, 9 Dec 2020 20:20:43 +0800 Subject: [PATCH 2/7] Add Apache License headers. --- .../spring/metric/MetricExtensionTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java index 4284fadc..b38b524e 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.spring.metric; import org.junit.Test; From b52eaa8b91b61a5d6bc66399605ce88bcd650e46 Mon Sep 17 00:00:00 2001 From: CharliePu Date: Wed, 9 Dec 2020 20:34:02 +0800 Subject: [PATCH 3/7] Fix the checkstyle error. --- .../java/org/apache/rocketmq/spring/core/RocketMQTemplate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index a7e77c6f..181ef64e 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -527,7 +527,7 @@ public SendResult syncSend(String destination, Collection rmqMsgs.add(this.createRocketMqMessage(destination, msg)); } - if(!rmqMsgs.isEmpty()){ + if (!rmqMsgs.isEmpty()) { MetricExtensionProvider.addProducerMessageCount(rmqMsgs.get(0).getTopic(), rmqMsgs.size()); } From 958f1ba1f5ee32cd741461ad2720d170063271bf Mon Sep 17 00:00:00 2001 From: CharliePu Date: Wed, 9 Dec 2020 21:10:43 +0800 Subject: [PATCH 4/7] Add exclusion rules for checkstyle. --- pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 7e7ecce5..5c754c7b 100644 --- a/pom.xml +++ b/pom.xml @@ -176,9 +176,10 @@ .github/** src/test/resources/certs/* src/test/**/*.log - src/test/resources/META-INF/service/* + **/src/main/resources/META-INF/services/* + **/src/test/resources/META-INF/services/* **/target/** - */*.iml + **/*.iml **/*/spring.factories **/application.properties From 0a435b2c995fb20524bfca17a0ee1fd287e7695e Mon Sep 17 00:00:00 2001 From: CharliePu Date: Thu, 10 Dec 2020 18:16:31 +0800 Subject: [PATCH 5/7] Optimize code structure in MetricExtensionProvider. --- pom.xml | 4 ++-- .../springboot/AtomicLongMetricExtension.java | 12 ++++++----- .../springboot/AtomicLongMetricExtension.java | 14 ++++++------- .../spring/core/RocketMQTemplate.java | 20 +++++++++---------- .../spring/metric/MetricExtension.java | 4 ++-- .../metric/MetricExtensionProvider.java | 12 +++++++++-- .../DefaultRocketMQListenerContainer.java | 2 +- .../spring/metric/MetricExtensionTest.java | 4 ++-- 8 files changed, 41 insertions(+), 31 deletions(-) diff --git a/pom.xml b/pom.xml index 5c754c7b..82597666 100644 --- a/pom.xml +++ b/pom.xml @@ -176,8 +176,8 @@ .github/** src/test/resources/certs/* src/test/**/*.log - **/src/main/resources/META-INF/services/* - **/src/test/resources/META-INF/services/* + src/main/resources/META-INF/services/* + src/test/resources/META-INF/services/* **/target/** **/*.iml **/*/spring.factories diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java index 24f13f88..99ca8fc6 100644 --- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -19,6 +19,8 @@ import org.apache.rocketmq.spring.metric.EConsumerMode; import org.apache.rocketmq.spring.metric.MetricExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,20 +28,20 @@ public class AtomicLongMetricExtension implements MetricExtension { - private final Map producerMessageCountMap = new ConcurrentHashMap<>(); + private final static Logger LOGGER = LoggerFactory.getLogger(AtomicLongMetricExtension.class); + private final Map consumerMessageCountMap = new ConcurrentHashMap<>(); @Override public void addProducerMessageCount(String topic, int count) { - AtomicLong atomicLong = producerMessageCountMap.computeIfAbsent(topic, t -> new AtomicLong()); - System.out.printf("The count of producer messages for %s is %d.%n", topic, atomicLong.addAndGet(count)); + //nothing } @Override - public void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { + public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { String key = topic + "_" + consumerMode.name(); AtomicLong atomicLong = consumerMessageCountMap.computeIfAbsent(key, t -> new AtomicLong()); - System.out.printf("The count of %s consumer messages for %s is %d.%n" + LOGGER.info("The count of {} consumer messages for {} is {}" , consumerMode.name(), topic, atomicLong.addAndGet(count)); } } diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java index 24f13f88..c11601f7 100644 --- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java +++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -19,6 +19,8 @@ import org.apache.rocketmq.spring.metric.EConsumerMode; import org.apache.rocketmq.spring.metric.MetricExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,20 +28,18 @@ public class AtomicLongMetricExtension implements MetricExtension { + private final static Logger LOGGER = LoggerFactory.getLogger(AtomicLongMetricExtension.class); + private final Map producerMessageCountMap = new ConcurrentHashMap<>(); - private final Map consumerMessageCountMap = new ConcurrentHashMap<>(); @Override public void addProducerMessageCount(String topic, int count) { AtomicLong atomicLong = producerMessageCountMap.computeIfAbsent(topic, t -> new AtomicLong()); - System.out.printf("The count of producer messages for %s is %d.%n", topic, atomicLong.addAndGet(count)); + LOGGER.info("The count of producer messages for {} is {}", topic, atomicLong.addAndGet(count)); } @Override - public void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { - String key = topic + "_" + consumerMode.name(); - AtomicLong atomicLong = consumerMessageCountMap.computeIfAbsent(key, t -> new AtomicLong()); - System.out.printf("The count of %s consumer messages for %s is %d.%n" - , consumerMode.name(), topic, atomicLong.addAndGet(count)); + public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { + //nothing } } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index 181ef64e..f6af998a 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -241,7 +241,7 @@ public T sendAndReceive(String destination, Message message, Type type, S if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); MessageExt replyMessage; if (Objects.isNull(hashKey) || hashKey.isEmpty()) { @@ -447,7 +447,7 @@ public void sendAndReceive(String destination, Message message, } }; } - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); if (Objects.isNull(hashKey) || hashKey.isEmpty()) { producer.request(rocketMsg, requestCallback, timeout); } else { @@ -563,7 +563,7 @@ public SendResult syncSend(String destination, Message message, long timeout, if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); SendResult sendResult = producer.send(rocketMsg, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -629,7 +629,7 @@ public SendResult syncSendOrderly(String destination, Message message, String try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -736,7 +736,7 @@ public void asyncSend(String destination, Message message, SendCallback sendC if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.send(rocketMsg, sendCallback, timeout); } catch (Exception e) { log.info("asyncSend failed. destination:{}, message:{} ", destination, message); @@ -816,7 +816,7 @@ public void asyncSendOrderly(String destination, Message message, String hash } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); } catch (Exception e) { log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); @@ -879,7 +879,7 @@ public void sendOneWay(String destination, Message message) { } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.sendOneway(rocketMsg); } catch (Exception e) { log.error("sendOneWay failed. destination:{}, message:{} ", destination, message); @@ -912,7 +912,7 @@ public void sendOneWayOrderly(String destination, Message message, String has } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); } catch (Exception e) { log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); @@ -987,7 +987,7 @@ public TransactionSendResult sendMessageInTransaction(final String destination, throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener"); } org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); - MetricExtensionProvider.addProducerMessageCount(rocketMsg.getTopic(), 1); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); return producer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); @@ -1098,7 +1098,7 @@ public List receive(Class clazz, long timeout) { list.add(doConvertMessage(messageExt, clazz)); } MetricExtensionProvider.addConsumerMessageCount( - ((DefaultLitePullConsumerWithTopic) consumer).getTopic(), list.size(), EConsumerMode.Pull); + ((DefaultLitePullConsumerWithTopic) consumer).getTopic(), EConsumerMode.Pull, list.size()); return list; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java index e37108b6..7dcba7a9 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java @@ -31,8 +31,8 @@ public interface MetricExtension { * Add current count of message from the consumer. * * @param topic topic name - * @param count count of message * @param consumerMode consumer mode + * @param count count of message */ - void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode); + void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count); } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java index d5a53a17..3a416df9 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java @@ -73,9 +73,17 @@ public static void addProducerMessageCount(String topic, int count) { } } - public static void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { + public static void producerMessageCountIncrement(String topic) { + addProducerMessageCount(topic, 1); + } + + public static void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { - m.addConsumerMessageCount(topic, count, consumerMode); + m.addConsumerMessageCount(topic, consumerMode, count); } } + + public static void consumerMessageCountIncrement(String topic, EConsumerMode consumerMode) { + addConsumerMessageCount(topic, consumerMode, 1); + } } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index 860a6877..1ad161b6 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -394,7 +394,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderly private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { - MetricExtensionProvider.addConsumerMessageCount(messageExt.getTopic(), 1, EConsumerMode.Push); + MetricExtensionProvider.consumerMessageCountIncrement(messageExt.getTopic(), EConsumerMode.Push); if (rocketMQListener != null) { rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java index b38b524e..62c6e4cc 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java @@ -34,7 +34,7 @@ public void addProducerMessageCount(String topic, int count) { } @Override - public void addConsumerMessageCount(String topic, int count, EConsumerMode consumerMode) { + public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { TOPIC = topic; COUNT = count; CONSUMER_MODE = consumerMode; @@ -49,7 +49,7 @@ public void testAddProducerMessageCount() { @Test public void testAddConsumerMessageCount() { - MetricExtensionProvider.addConsumerMessageCount("topic2", 222, EConsumerMode.Push); + MetricExtensionProvider.addConsumerMessageCount("topic2", EConsumerMode.Push, 222); assertEquals("topic2", TOPIC); assertEquals(222, COUNT); assertEquals(EConsumerMode.Push, CONSUMER_MODE); From efc25a70f5db2601c3fb17d5c9a3572ea5365b52 Mon Sep 17 00:00:00 2001 From: CharliePu Date: Thu, 10 Dec 2020 18:29:27 +0800 Subject: [PATCH 6/7] Fix the checkstyle error. --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 82597666..5c754c7b 100644 --- a/pom.xml +++ b/pom.xml @@ -176,8 +176,8 @@ .github/** src/test/resources/certs/* src/test/**/*.log - src/main/resources/META-INF/services/* - src/test/resources/META-INF/services/* + **/src/main/resources/META-INF/services/* + **/src/test/resources/META-INF/services/* **/target/** **/*.iml **/*/spring.factories From 6c7a2dedd430c4038fa8b9bb8952c6d2ea60dd7a Mon Sep 17 00:00:00 2001 From: CharliePu Date: Thu, 10 Dec 2020 19:26:21 +0800 Subject: [PATCH 7/7] Encapsulate EConsumerMode to the inner enum. --- .../springboot/AtomicLongMetricExtension.java | 1 - .../springboot/AtomicLongMetricExtension.java | 1 - .../spring/core/RocketMQTemplate.java | 2 +- .../rocketmq/spring/metric/EConsumerMode.java | 29 ------------------- .../spring/metric/MetricExtension.java | 11 +++++++ .../metric/MetricExtensionProvider.java | 1 + .../DefaultRocketMQListenerContainer.java | 2 +- 7 files changed, 14 insertions(+), 33 deletions(-) delete mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java index 99ca8fc6..184a0695 100644 --- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.samples.springboot; -import org.apache.rocketmq.spring.metric.EConsumerMode; import org.apache.rocketmq.spring.metric.MetricExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java index c11601f7..ff3a0b25 100644 --- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java +++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.samples.springboot; -import org.apache.rocketmq.spring.metric.EConsumerMode; import org.apache.rocketmq.spring.metric.MetricExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index f6af998a..b5742275 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -31,7 +31,7 @@ import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.spring.metric.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode; import org.apache.rocketmq.spring.metric.MetricExtensionProvider; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.apache.rocketmq.spring.support.RocketMQUtil; diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java deleted file mode 100644 index ee2133e6..00000000 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/EConsumerMode.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.spring.metric; - -public enum EConsumerMode { - /** - * pull mode - */ - Pull, - /** - * push mode - */ - Push, -} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java index 7dcba7a9..53c59a28 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java @@ -19,6 +19,17 @@ public interface MetricExtension { + enum EConsumerMode { + /** + * pull mode + */ + Pull, + /** + * push mode + */ + Push, + } + /** * Add current count of message from the producer. * diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java index 3a416df9..a4d12c1d 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.spring.metric; +import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index 1ad161b6..4039dfa6 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -48,7 +48,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; -import org.apache.rocketmq.spring.metric.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode; import org.apache.rocketmq.spring.metric.MetricExtensionProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory;