From 9be760a62c9ce532dcf99fda5d269dd6b8b61480 Mon Sep 17 00:00:00 2001 From: xuhongcao Date: Thu, 14 Mar 2019 11:01:31 +0800 Subject: [PATCH] add pull consumer --- .../API_Reference_ DefaultPullConsumer.md | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 docs/cn/client/java/API_Reference_ DefaultPullConsumer.md diff --git a/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md b/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md new file mode 100644 index 00000000000..64a2b7bd80f --- /dev/null +++ b/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md @@ -0,0 +1,143 @@ +## DefaultPullConsumer +--- +### 类简介 + +1. `DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer` + +2. `DefaultMQPullConsumer`主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。 + +3. 优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。 + +4. 缺点:由于主动权在消费方,消费方无法及时获取最新的消息。比较适合不及时批处理场景。 + +``` java + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public class MQPullConsumer { + + private static final Map OFFSE_TABLE = new HashMap(); + + public static void main(String[] args) throws MQClientException { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName"); + consumer.setNamesrvAddr("127.0.0.1:9876"); + consumer.start(); + // 从指定topic中拉取所有消息队列 + Set mqs = consumer.fetchSubscribeMessageQueues("order-topic"); + for(MessageQueue mq:mqs){ + try { + // 获取消息的offset,指定从store中获取 + long offset = consumer.fetchConsumeOffset(mq,true); + System.out.println("consumer from the queue:"+mq+":"+offset); + while(true){ + PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, + getMessageQueueOffset(mq), 32); + putMessageQueueOffset(mq,pullResult.getNextBeginOffset()); + switch(pullResult.getPullStatus()){ + case FOUND: + List messageExtList = pullResult.getMsgFoundList(); + for (MessageExt m : messageExtList) { + System.out.println(new String(m.getBody())); + } + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break; + case OFFSET_ILLEGAL: + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + consumer.shutdown(); + } + + // 保存上次消费的消息下标 + private static void putMessageQueueOffset(MessageQueue mq, + long nextBeginOffset) { + OFFSE_TABLE.put(mq, nextBeginOffset); + } + + // 获取上次消费的消息的下标 + private static Long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if(offset != null){ + return offset; + } + return 0l; + } + + +} +``` + + + +### 字段摘要 +|类型|字段名称|描述| +|------|-------|-------| +|DefaultMQPullConsumerImpl|defaultMQPullConsumerImpl|DefaultMQPullConsumer的内部核心处理默认实现| +|String|consumerGroup|消费的唯一分组| +|long|brokerSuspendMaxTimeMillis|consumer取连接broker的最大延迟时间,不建议修改| +|long|consumerTimeoutMillisWhenSuspend|pull取连接的最大超时时间,必须大于brokerSuspendMaxTimeMillis,不建议修改| +|long|consumerPullTimeoutMillis|socket连接的最大超时时间,不建议修改| +|String|messageModel|默认cluster模式| +|int|messageQueueListener|消息queue监听器,用来获取topic的queue变化| +|int|offsetStore|RemoteBrokerOffsetStore 远程与本地offset存储器| +|int|registerTopics|注册到该consumer的topic集合| +|int|allocateMessageQueueStrategy|consumer的默认获取queue的负载分配策略算法| + +### 构造方法摘要 + +|方法名称|方法描述| +|-------|------------| +|DefaultMQPullConsumer()|由默认参数值创建一个Pull消费者 | +|DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook)|使用指定的分组名,hook创建一个消费者| +|DefaultMQPullConsumer(final String consumerGroup)|使用指定的分组名消费者| +|DefaultMQPullConsumer(RPCHook rpcHook)|使用指定的hook创建一个生产者| + + +### 使用方法摘要 + +|返回值|方法名称|方法描述| +|-------|-------|------------| +|MQAdmin接口method|-------|------------| +|void|createTopic(String key, String newTopic, int queueNum)|在broker上创建指定的topic| +|void|createTopic(String key, String newTopic, int queueNum, int topicSysFlag)|在broker上创建指定的topic| +|long|earliestMsgStoreTime(MessageQueue mq)|查询最早的消息存储时间| +|long|maxOffset(MessageQueue mq)|查询给定消息队列的最大offset| +|long|minOffset(MessageQueue mq)|查询给定消息队列的最小offset| +|QueryResult|queryMessage(String topic, String key, int maxNum, long begin, long end)|按关键字查询消息| +|long|searchOffset(MessageQueue mq, long timestamp)|查找指定时间的消息队列的物理offset| +|MessageExt|viewMessage(String offsetMsgId)|根据给定的msgId查询消息| +|MessageExt|public MessageExt viewMessage(String topic, String msgId)|根据给定的msgId查询消息,并指定topic| +|MQConsumer接口method|-------|------------| +|Set|fetchSubscribeMessageQueues(String topic)|根据topic获取订阅的Queue| +|void|sendMessageBack(final MessageExt msg, final int delayLevel)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL| +|void|sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL| +|MQPullConsumer接口method|-------|------------| +|long|fetchConsumeOffset(MessageQueue mq, boolean fromStore)|查询给定消息队列的最大offset| +|PullResult |pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums)|异步拉取制定匹配的消息| +|PullResult| pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final long timeout)|异步拉取制定匹配的消息| +|PullResult|pull(final MessageQueue mq, final MessageSelector selector, final long offset,final int maxNums)|异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType| +|PullResult|pullBlockIfNotFound(final MessageQueue mq, final String subExpression,final long offset, final int maxNums)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis| +|void|pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final PullCallback pullCallback)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费| +|void|updateConsumeOffset(final MessageQueue mq, final long offset)|更新指定mq的offset| +|long|fetchMessageQueuesInBalance(String topic)|根据topic获取订阅的Queue(是balance分配后的)| +|void|void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费| +|void|shutdown()|关闭当前消费者实例并释放相关资源| +|void|start()|启动消费者| +