Skip to content

Commit 0430601

Browse files
committed
[ISSUE #583]: 新增 RocketMQMessageHandler 和 RocketMQMessagePostProcessor
- 新增 RocketMQMessageHandler 用户处理消息前预处理,可用于处理链路追踪 - 新增 RocketMQMessagePostProcessor 类,用于处理消息发送前预处理消息,用来传递链路ID
1 parent bb4bc5c commit 0430601

21 files changed

+588
-29
lines changed

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.rocketmq.spring.autoconfigure;
1919

2020
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
21+
import org.apache.rocketmq.spring.support.RocketMQMessageHandler;
2122
import org.apache.rocketmq.spring.support.RocketMQMessageListenerContainerRegistrar;
2223
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2324
import org.springframework.context.annotation.Bean;
@@ -28,7 +29,7 @@
2829
@ConditionalOnMissingBean(RocketMQMessageListenerContainerRegistrar.class)
2930
public class ListenerContainerConfiguration {
3031
@Bean
31-
public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
32-
return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties);
32+
public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties, RocketMQMessageHandler rocketMQMessageHandler) {
33+
return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties, rocketMQMessageHandler);
3334
}
3435
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.spring.autoconfigure;
18+
19+
20+
import org.apache.rocketmq.spring.support.RocketMQMessagePostProcessor;
21+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
22+
import org.springframework.context.annotation.Bean;
23+
import org.springframework.context.annotation.Configuration;
24+
25+
/**
26+
* @see RocketMQMessagePostProcessor
27+
*/
28+
@Configuration
29+
@ConditionalOnMissingBean(RocketMQMessagePostProcessor.class)
30+
class MessagePostProcessorConfiguration {
31+
32+
@Bean
33+
public RocketMQMessagePostProcessor createMessagePostProcessor() {
34+
return new RocketMQMessagePostProcessor();
35+
}
36+
37+
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.rocketmq.spring.annotation.SelectorType;
2626
import org.apache.rocketmq.spring.core.RocketMQTemplate;
2727
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
28+
import org.apache.rocketmq.spring.support.RocketMQMessagePostProcessor;
2829
import org.apache.rocketmq.spring.support.RocketMQUtil;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -51,9 +52,10 @@
5152
@EnableConfigurationProperties(RocketMQProperties.class)
5253
@ConditionalOnClass({MQAdmin.class})
5354
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
54-
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class,
55-
ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
56-
@AutoConfigureAfter({MessageConverterConfiguration.class})
55+
@Import({MessageConverterConfiguration.class, RocketMQMessageHandlerConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class,
56+
ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class,
57+
MessagePostProcessorConfiguration.class})
58+
@AutoConfigureAfter({MessageConverterConfiguration.class, MessagePostProcessorConfiguration.class, RocketMQMessageHandlerConfiguration.class})
5759
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
5860
public class RocketMQAutoConfiguration implements ApplicationContextAware {
5961
private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);
@@ -166,7 +168,8 @@ public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocket
166168
@Bean(destroyMethod = "destroy")
167169
@Conditional(ProducerOrConsumerPropertyCondition.class)
168170
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
169-
public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
171+
public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter,
172+
RocketMQMessagePostProcessor rocketMQMessagePostProcessor) {
170173
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
171174
if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
172175
rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
@@ -175,6 +178,7 @@ public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessag
175178
rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
176179
}
177180
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
181+
rocketMQTemplate.setMessagePostProcessor(rocketMQMessagePostProcessor.getMessagePostProcessor());
178182
return rocketMQTemplate;
179183
}
180184

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.spring.autoconfigure;
19+
20+
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
21+
import org.apache.rocketmq.spring.support.RocketMQMessageHandler;
22+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.context.annotation.Configuration;
25+
26+
/**
27+
* @see RocketMQMessageConverter
28+
*/
29+
@Configuration
30+
@ConditionalOnMissingBean(RocketMQMessageHandler.class)
31+
class RocketMQMessageHandlerConfiguration {
32+
33+
@Bean
34+
public RocketMQMessageHandler createRocketMQMessageHandler() {
35+
return (message, consumer) -> consumer.doHandler(message);
36+
}
37+
38+
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
7272

7373
private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();
7474

75+
private MessagePostProcessor messagePostProcessor;
76+
7577
public DefaultMQProducer getProducer() {
7678
return producer;
7779
}
@@ -104,6 +106,14 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
104106
this.messageQueueSelector = messageQueueSelector;
105107
}
106108

109+
public MessagePostProcessor getMessagePostProcessor() {
110+
return messagePostProcessor;
111+
}
112+
113+
public void setMessagePostProcessor(MessagePostProcessor messagePostProcessor) {
114+
this.messagePostProcessor = messagePostProcessor;
115+
}
116+
107117
public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
108118
this.producer.setAsyncSenderExecutor(asyncSenderExecutor);
109119
}
@@ -1191,7 +1201,7 @@ public TransactionSendResult sendMessageInTransaction(final String destination,
11911201

11921202
private org.apache.rocketmq.common.message.Message createRocketMqMessage(
11931203
String destination, Message<?> message) {
1194-
Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
1204+
Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), messagePostProcessor);
11951205
return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
11961206
destination, msg);
11971207
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
140140

141141
private String instanceName;
142142

143+
private RocketMQMessageHandler rocketMQMessageHandler = (message, chain) -> chain.doHandler(message);
144+
143145
public long getSuspendCurrentQueueTimeMillis() {
144146
return suspendCurrentQueueTimeMillis;
145147
}
@@ -318,6 +320,14 @@ public void setInstanceName(String instanceName) {
318320
this.instanceName = instanceName;
319321
}
320322

323+
public RocketMQMessageHandler getRocketMQMessageHandler() {
324+
return rocketMQMessageHandler;
325+
}
326+
327+
public void setRocketMQMessageHandler(RocketMQMessageHandler rocketMQMessageHandler) {
328+
this.rocketMQMessageHandler = rocketMQMessageHandler;
329+
}
330+
321331
public DefaultRocketMQListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
322332
this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
323333
return this;
@@ -426,15 +436,21 @@ public class DefaultMessageListenerConcurrently implements MessageListenerConcur
426436
@Override
427437
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
428438
for (MessageExt messageExt : msgs) {
429-
log.debug("received msg: {}", messageExt);
430439
try {
431-
long now = System.currentTimeMillis();
432-
DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class);
433-
container.handleMessage(messageExt);
434-
long costTime = System.currentTimeMillis() - now;
435-
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
440+
rocketMQMessageHandler.doHandler(messageExt, messageExt1 -> {
441+
log.debug("received msg: {}", messageExt1);
442+
try {
443+
long now = System.currentTimeMillis();
444+
DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class);
445+
container.handleMessage(messageExt1);
446+
long costTime = System.currentTimeMillis() - now;
447+
log.debug("consume {} cost: {} ms", messageExt1.getMsgId(), costTime);
448+
} catch (Exception e) {
449+
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt1.getMsgId(), messageExt1.getTopic(), messageExt1.getReconsumeTimes(), e);
450+
throw e;
451+
}
452+
});
436453
} catch (Exception e) {
437-
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
438454
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
439455
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
440456
}
@@ -450,15 +466,21 @@ public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
450466
@Override
451467
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
452468
for (MessageExt messageExt : msgs) {
453-
log.debug("received msg: {}", messageExt);
454469
try {
455-
long now = System.currentTimeMillis();
456-
DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class);
457-
container.handleMessage(messageExt);
458-
long costTime = System.currentTimeMillis() - now;
459-
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
470+
rocketMQMessageHandler.doHandler(messageExt, messageExt1 -> {
471+
log.debug("received msg: {}", messageExt1);
472+
try {
473+
long now = System.currentTimeMillis();
474+
DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class);
475+
container.handleMessage(messageExt1);
476+
long costTime = System.currentTimeMillis() - now;
477+
log.debug("consume {} cost: {} ms", messageExt1.getMsgId(), costTime);
478+
} catch (Exception e) {
479+
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt1.getMsgId(), messageExt1.getTopic(), messageExt1.getReconsumeTimes(), e);
480+
throw e;
481+
}
482+
});
460483
} catch (Exception e) {
461-
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
462484
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
463485
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
464486
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.spring.support;
18+
19+
20+
import org.apache.rocketmq.common.message.MessageExt;
21+
22+
public interface RocketMQMessageHandler {
23+
24+
void doHandler(MessageExt message, RocketMQMessageHandlerChain chain) throws Exception;
25+
26+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.spring.support;
18+
19+
import org.apache.rocketmq.common.message.MessageExt;
20+
21+
public interface RocketMQMessageHandlerChain {
22+
void doHandler(MessageExt message) throws Exception;
23+
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,15 @@ public class RocketMQMessageListenerContainerRegistrar implements ApplicationCon
5656

5757
private final List<DefaultRocketMQListenerContainer> containers = new ArrayList<>();
5858

59+
private final RocketMQMessageHandler rocketMQMessageHandler;
60+
5961
public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter,
60-
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
62+
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties,
63+
RocketMQMessageHandler rocketMQMessageHandler) {
6164
this.rocketMQMessageConverter = rocketMQMessageConverter;
6265
this.environment = environment;
6366
this.rocketMQProperties = rocketMQProperties;
67+
this.rocketMQMessageHandler = rocketMQMessageHandler;
6468
}
6569

6670
@Override
@@ -146,6 +150,7 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
146150
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
147151
}
148152
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
153+
container.setRocketMQMessageHandler(rocketMQMessageHandler);
149154
container.setName(name);
150155

151156
String namespace = environment.resolvePlaceholders(annotation.namespace());
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.spring.support;
18+
19+
import org.springframework.messaging.core.MessagePostProcessor;
20+
21+
/**
22+
* @see MessagePostProcessor
23+
*/
24+
public class RocketMQMessagePostProcessor {
25+
26+
public MessagePostProcessor getMessagePostProcessor() {
27+
return null;
28+
}
29+
30+
}

0 commit comments

Comments
 (0)