Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions docs/cn/client/java/API_Reference_ DefaultPullConsumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
## DefaultPullConsumer
---
### 类简介

1. `DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer`

2. `DefaultMQPullConsumer`主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。

3. 优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。

4. 缺点:由于主动权在消费方,消费方无法及时获取最新的消息。比较适合不及时批处理场景。

``` java


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

public class MQPullConsumer {

private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();

public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
// 从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
for(MessageQueue mq:mqs){
try {
// 获取消息的offset,指定从store中获取
long offset = consumer.fetchConsumeOffset(mq,true);
System.out.println("consumer from the queue:"+mq+":"+offset);
while(true){
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}

// 保存上次消费的消息下标
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}

// 获取上次消费的消息的下标
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if(offset != null){
return offset;
}
return 0l;
}


}
```



### 字段摘要
|类型|字段名称|描述|
|------|-------|-------|
|DefaultMQPullConsumerImpl|defaultMQPullConsumerImpl|DefaultMQPullConsumer的内部核心处理默认实现|
|String|consumerGroup|消费的唯一分组|
|long|brokerSuspendMaxTimeMillis|consumer取连接broker的最大延迟时间,不建议修改|
|long|consumerTimeoutMillisWhenSuspend|pull取连接的最大超时时间,必须大于brokerSuspendMaxTimeMillis,不建议修改|
|long|consumerPullTimeoutMillis|socket连接的最大超时时间,不建议修改|
|String|messageModel|默认cluster模式|
|int|messageQueueListener|消息queue监听器,用来获取topic的queue变化|
|int|offsetStore|RemoteBrokerOffsetStore 远程与本地offset存储器|
|int|registerTopics|注册到该consumer的topic集合|
|int|allocateMessageQueueStrategy|consumer的默认获取queue的负载分配策略算法|

### 构造方法摘要

|方法名称|方法描述|
|-------|------------|
|DefaultMQPullConsumer()|由默认参数值创建一个Pull消费者 |
|DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook)|使用指定的分组名,hook创建一个消费者|
|DefaultMQPullConsumer(final String consumerGroup)|使用指定的分组名消费者|
|DefaultMQPullConsumer(RPCHook rpcHook)|使用指定的hook创建一个生产者|


### 使用方法摘要

|返回值|方法名称|方法描述|
|-------|-------|------------|
|MQAdmin接口method|-------|------------|
|void|createTopic(String key, String newTopic, int queueNum)|在broker上创建指定的topic|
|void|createTopic(String key, String newTopic, int queueNum, int topicSysFlag)|在broker上创建指定的topic|
|long|earliestMsgStoreTime(MessageQueue mq)|查询最早的消息存储时间|
|long|maxOffset(MessageQueue mq)|查询给定消息队列的最大offset|
|long|minOffset(MessageQueue mq)|查询给定消息队列的最小offset|
|QueryResult|queryMessage(String topic, String key, int maxNum, long begin, long end)|按关键字查询消息|
|long|searchOffset(MessageQueue mq, long timestamp)|查找指定时间的消息队列的物理offset|
|MessageExt|viewMessage(String offsetMsgId)|根据给定的msgId查询消息|
|MessageExt|public MessageExt viewMessage(String topic, String msgId)|根据给定的msgId查询消息,并指定topic|
|MQConsumer接口method|-------|------------|
|Set<MessageQueue>|fetchSubscribeMessageQueues(String topic)|根据topic获取订阅的Queue|
|void|sendMessageBack(final MessageExt msg, final int delayLevel)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL|
|void|sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL|
|MQPullConsumer接口method|-------|------------|
|long|fetchConsumeOffset(MessageQueue mq, boolean fromStore)|查询给定消息队列的最大offset|
|PullResult |pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums)|异步拉取制定匹配的消息|
|PullResult| pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final long timeout)|异步拉取制定匹配的消息|
|PullResult|pull(final MessageQueue mq, final MessageSelector selector, final long offset,final int maxNums)|异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType|
|PullResult|pullBlockIfNotFound(final MessageQueue mq, final String subExpression,final long offset, final int maxNums)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis|
|void|pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final PullCallback pullCallback)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费|
|void|updateConsumeOffset(final MessageQueue mq, final long offset)|更新指定mq的offset|
|long|fetchMessageQueuesInBalance(String topic)|根据topic获取订阅的Queue(是balance分配后的)|
|void|void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费|
|void|shutdown()|关闭当前消费者实例并释放相关资源|
|void|start()|启动消费者|