Skip to content

使用starter的时候需要自定义全局拦截器在 listener onMessage 之前处理 MessageExt #583

@texousliu

Description

@texousliu

FEATURE REQUEST

  1. 使用starter的时候需要自定义全局拦截器在 listener onMessage 之前处理 MessageExt
  2. Provide any additional detail on your proposed use case for this feature.
  3. 添加message处理器
package org.apache.rocketmq.spring.support;

import org.apache.rocketmq.common.message.MessageExt;

/**
 * listener message customizer
 *
 * @author texousliu
 * @since 2023-08-18
 */
public interface RocketMQListenerMessageCustomizer {

    void customize(final MessageExt messageExt);

}
  1. ListenerContainerConfiguration 注入自定义处理器
    private List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers;

    public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
                                          ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties,
                                          @Autowired(required = false) List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers) {
        this.rocketMQMessageConverter = rocketMQMessageConverter;
        this.environment = environment;
        this.rocketMQProperties = rocketMQProperties;
        this.rocketMQListenerMessageCustomizers = rocketMQListenerMessageCustomizers;
    }
  1. DefaultRocketMQListenerContainer 添加
 private List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers;
    public List<RocketMQListenerMessageCustomizer> getRocketMQListenerMessageCustomizers() {
        return rocketMQListenerMessageCustomizers;
    }

    public void setRocketMQListenerMessageCustomizers(List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers) {
        this.rocketMQListenerMessageCustomizers = rocketMQListenerMessageCustomizers;
    }
  1. ListenerContainerConfiguration 配置 DefaultRocketMQListenerContainer
container.setRocketMQListenerMessageCustomizers(rocketMQListenerMessageCustomizers);
  1. 处理消息的地方添加调用
private void handleMessage(
        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
        if (rocketMQListenerMessageCustomizers != null) {
            for (RocketMQListenerMessageCustomizer customizer : rocketMQListenerMessageCustomizers) {
                customizer.customize(messageExt);
            }
        }
       // ...... other code
}
  1. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions