Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;

public interface AckCallback {
void onSuccess(final AckResult ackResult);

void onException(final Throwable e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;


public class AckResult {
private AckStatus status;
private String extraInfo;
private long popTime;

public void setPopTime(long popTime) {
this.popTime = popTime;
}

public long getPopTime() {
return popTime;
}

public AckStatus getStatus() {
return status;
}

public void setStatus(AckStatus status) {
this.status = status;
}

public void setExtraInfo(String extraInfo) {
this.extraInfo = extraInfo;
}

public String getExtraInfo() {
return extraInfo;
}

@Override
public String toString() {
return "AckResult [AckStatus=" + status + ",extraInfo=" + extraInfo + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;

public enum AckStatus {
/**
* ack success
*/
OK,
/**
* msg not exist
*/
NO_EXIST,
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private int pullThresholdForQueue = 1000;

/**
* Flow control threshold on queue level, means max num of messages waiting to ack.
* in contrast with pull threshold, once a message is popped, it's considered the beginning of consumption.
*/
private int popThresholdForQueue = 96;

/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
Expand Down Expand Up @@ -254,6 +260,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long consumeTimeout = 15;

/**
* Maximum amount of invisible time in millisecond of a message, rang is [5000, 300000]
*/
private long popInvisibleTime = 60000;

/**
* Batch pop size. range is [1, 32]
*/
private int popBatchNums = 32;

/**
* Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
*/
Expand All @@ -264,6 +280,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private TraceDispatcher traceDispatcher = null;

// force to use client rebalance
private boolean clientRebalance = false;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -598,6 +617,14 @@ public void setPullThresholdForQueue(int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
}

public int getPopThresholdForQueue() {
return popThresholdForQueue;
}

public void setPopThresholdForQueue(int popThresholdForQueue) {
this.popThresholdForQueue = popThresholdForQueue;
}

public int getPullThresholdForTopic() {
return pullThresholdForTopic;
}
Expand Down Expand Up @@ -891,6 +918,14 @@ public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}

public long getPopInvisibleTime() {
return popInvisibleTime;
}

public void setPopInvisibleTime(long popInvisibleTime) {
this.popInvisibleTime = popInvisibleTime;
}

public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}
Expand All @@ -902,4 +937,20 @@ public void setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhe
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}

public int getPopBatchNums() {
return popBatchNums;
}

public void setPopBatchNums(int popBatchNums) {
this.popBatchNums = popBatchNums;
}

public boolean isClientRebalance() {
return clientRebalance;
}

public void setClientRebalance(boolean clientRebalance) {
this.clientRebalance = clientRebalance;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;

/**
* Async message pop interface
*/
public interface PopCallback {
void onSuccess(final PopResult popResult);

void onException(final Throwable e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;

import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;

public class PopResult {
private List<MessageExt> msgFoundList;
private PopStatus popStatus;
private long popTime;
private long invisibleTime;
private long restNum;

public PopResult(PopStatus popStatus, List<MessageExt> msgFoundList) {
this.popStatus = popStatus;
this.msgFoundList = msgFoundList;
}

public long getPopTime() {
return popTime;
}


public void setPopTime(long popTime) {
this.popTime = popTime;
}

public long getRestNum() {
return restNum;
}

public void setRestNum(long restNum) {
this.restNum = restNum;
}

public long getInvisibleTime() {
return invisibleTime;
}


public void setInvisibleTime(long invisibleTime) {
this.invisibleTime = invisibleTime;
}


public void setPopStatus(PopStatus popStatus) {
this.popStatus = popStatus;
}

public PopStatus getPopStatus() {
return popStatus;
}

public List<MessageExt> getMsgFoundList() {
return msgFoundList;
}

public void setMsgFoundList(List<MessageExt> msgFoundList) {
this.msgFoundList = msgFoundList;
}

@Override
public String toString() {
return "PopResult [popStatus=" + popStatus + ",msgFoundList="
+ (msgFoundList == null ? 0 : msgFoundList.size()) + ",restNum=" + restNum + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;

public enum PopStatus {
/**
* Founded
*/
FOUND,
/**
* No new message can be pull after polling time out
* delete after next realease
*/
NO_NEW_MSG,
/**
* polling pool is full, do not try again immediately.
*/
POLLING_FULL,
/**
* polling time out but no message find
*/
POLLING_NOT_FOUND
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.impl;

import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.netty.ResponseFuture;

public abstract class BaseInvokeCallback implements InvokeCallback {
private final MQClientAPIImpl mqClientAPI;

public BaseInvokeCallback(MQClientAPIImpl mqClientAPI) {
this.mqClientAPI = mqClientAPI;
}

@Override
public void operationComplete(final ResponseFuture responseFuture) {
onComplete(responseFuture);
}

public abstract void onComplete(final ResponseFuture responseFuture);
}
Loading