Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,10 @@
<exclude>.github/**</exclude>
<exclude>src/test/resources/certs/*</exclude>
<exclude>src/test/**/*.log</exclude>
<exclude>src/test/resources/META-INF/service/*</exclude>
<exclude>**/src/main/resources/META-INF/services/*</exclude>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you add prefix placeholder before the root directory /src?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it. When running mvn -B clean apache-rat:check and building RocketMQ Spring Boot, the rocketmq-spring-boot-samples project will also be checked. If I do not add the prefix placeholder, an unapproved license error will be reported. Like this: https://travis-ci.org/github/apache/rocketmq-spring/builds/748732956

<exclude>**/src/test/resources/META-INF/services/*</exclude>
<exclude>**/target/**</exclude>
<exclude>*/*.iml</exclude>
<exclude>**/*.iml</exclude>
<exclude>**/*/spring.factories</exclude>
<exclude>**/application.properties</exclude>
</excludes>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.samples.springboot;

import org.apache.rocketmq.spring.metric.MetricExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class AtomicLongMetricExtension implements MetricExtension {

private final static Logger LOGGER = LoggerFactory.getLogger(AtomicLongMetricExtension.class);

private final Map<String, AtomicLong> consumerMessageCountMap = new ConcurrentHashMap<>();

@Override
public void addProducerMessageCount(String topic, int count) {
//nothing
}

@Override
public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) {
String key = topic + "_" + consumerMode.name();
AtomicLong atomicLong = consumerMessageCountMap.computeIfAbsent(key, t -> new AtomicLong());
LOGGER.info("The count of {} consumer messages for {} is {}"
, consumerMode.name(), topic, atomicLong.addAndGet(count));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.rocketmq.samples.springboot.AtomicLongMetricExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.samples.springboot;

import org.apache.rocketmq.spring.metric.MetricExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class AtomicLongMetricExtension implements MetricExtension {

private final static Logger LOGGER = LoggerFactory.getLogger(AtomicLongMetricExtension.class);

private final Map<String, AtomicLong> producerMessageCountMap = new ConcurrentHashMap<>();

@Override
public void addProducerMessageCount(String topic, int count) {
AtomicLong atomicLong = producerMessageCountMap.computeIfAbsent(topic, t -> new AtomicLong());
LOGGER.info("The count of producer messages for {} is {}", topic, atomicLong.addAndGet(count));
}

@Override
public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) {
//nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.rocketmq.samples.springboot.AtomicLongMetricExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.spring.core;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.remoting.RPCHook;

public class DefaultLitePullConsumerWithTopic extends DefaultLitePullConsumer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultLitePullConsumerWithTopic is not a rational name here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared with DefaultLitePullConsumer, this class just has one more Topic attribute. So, I named this class DefaultLitePullConsumerWithTopic. Do you have any better suggestions for the naming of this class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not make a new class here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where can this new class be maked? What's your suggestion?


private String topic;

public DefaultLitePullConsumerWithTopic(final String consumerGroup) {
super(consumerGroup);
}

public DefaultLitePullConsumerWithTopic(final String consumerGroup, RPCHook rpcHook) {
super(consumerGroup, rpcHook);
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode;
import org.apache.rocketmq.spring.metric.MetricExtensionProvider;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -239,6 +241,7 @@ public <T> T sendAndReceive(String destination, Message<?> message, Type type, S
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
MessageExt replyMessage;

if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
Expand Down Expand Up @@ -444,6 +447,7 @@ public void sendAndReceive(String destination, Message<?> message,
}
};
}
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
producer.request(rocketMsg, requestCallback, timeout);
} else {
Expand Down Expand Up @@ -514,7 +518,7 @@ public <T extends Message> SendResult syncSend(String destination, Collection<T>

try {
long now = System.currentTimeMillis();
Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
List<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
for (Message msg : messages) {
if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
log.warn("Found a message empty in the batch, skip it");
Expand All @@ -523,6 +527,10 @@ public <T extends Message> SendResult syncSend(String destination, Collection<T>
rmqMsgs.add(this.createRocketMqMessage(destination, msg));
}

if (!rmqMsgs.isEmpty()) {
MetricExtensionProvider.addProducerMessageCount(rmqMsgs.get(0).getTopic(), rmqMsgs.size());
}

SendResult sendResult = producer.send(rmqMsgs, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -555,6 +563,7 @@ public SendResult syncSend(String destination, Message<?> message, long timeout,
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
SendResult sendResult = producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -620,6 +629,7 @@ public SendResult syncSendOrderly(String destination, Message<?> message, String
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -726,6 +736,7 @@ public void asyncSend(String destination, Message<?> message, SendCallback sendC
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
producer.send(rocketMsg, sendCallback, timeout);
} catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
Expand Down Expand Up @@ -805,6 +816,7 @@ public void asyncSendOrderly(String destination, Message<?> message, String hash
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
} catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
Expand Down Expand Up @@ -867,6 +879,7 @@ public void sendOneWay(String destination, Message<?> message) {
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
producer.sendOneway(rocketMsg);
} catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
Expand Down Expand Up @@ -899,6 +912,7 @@ public void sendOneWayOrderly(String destination, Message<?> message, String has
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
} catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
Expand Down Expand Up @@ -973,6 +987,7 @@ public TransactionSendResult sendMessageInTransaction(final String destination,
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
}
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic());
return producer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
Expand Down Expand Up @@ -1082,6 +1097,8 @@ public <T> List<T> receive(Class<T> clazz, long timeout) {
for (MessageExt messageExt : messageExts) {
list.add(doConvertMessage(messageExt, clazz));
}
MetricExtensionProvider.addConsumerMessageCount(
((DefaultLitePullConsumerWithTopic) consumer).getTopic(), EConsumerMode.Pull, list.size());
return list;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.spring.metric;

public interface MetricExtension {

enum EConsumerMode {
/**
* pull mode
*/
Pull,
/**
* push mode
*/
Push,
}

/**
* Add current count of message from the producer.
*
* @param topic topic name
* @param count count of message
*/
void addProducerMessageCount(String topic, int count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int or long should be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter type is always int, and long is not used.


/**
* Add current count of message from the consumer.
*
* @param topic topic name
* @param consumerMode consumer mode
* @param count count of message
*/
void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count);
}
Loading