Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>rocketmq-client</artifactId>
<name>rocketmq-client ${project.version}</name>

<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,

}

public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
public AclConfig getBrokerClusterConfig(final String addr,
final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);

Expand All @@ -407,7 +408,7 @@ public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMil
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
Expand Down Expand Up @@ -505,7 +506,7 @@ private SendResult sendMessageSync(
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response,addr);
return this.processSendResponse(brokerName, msg, response, addr);
}

private void sendMessageAsync(
Expand Down Expand Up @@ -612,7 +613,10 @@ private void onExceptionImpl(final String brokerName,
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException | RemotingTooMuchRequestException e1) {
} catch (InterruptedException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
Expand Down Expand Up @@ -664,7 +668,7 @@ private SendResult processSendResponse(
}

SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);

//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
Expand All @@ -683,8 +687,8 @@ private SendResult processSendResponse(
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
Expand Down Expand Up @@ -1425,8 +1429,8 @@ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
}

public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, final long timeoutMillis)
throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);

Expand All @@ -1437,7 +1441,7 @@ public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, fin
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
AddWritePermOfBrokerResponseHeader responseHeader =
(AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
(AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
return responseHeader.getAddTopicCount();
}
default:
Expand Down Expand Up @@ -1485,7 +1489,8 @@ public void deleteTopicInNameServer(final String addr, final String topic, final
throw new MQClientException(response.getCode(), response.getRemark());
}

public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis)
public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset,
final long timeoutMillis)
throws RemotingException, InterruptedException, MQClientException {
DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
requestHeader.setGroupName(groupName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private enum SubscriptionType {

private final MessageQueueLock messageQueueLock = new MessageQueueLock();

private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();

public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,8 +897,12 @@ private void unregisterClient(final String producerGroup, final String consumerG
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException | InterruptedException | MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (RemotingException e) {
log.warn("unregister client RemotingException from broker: {}, {}", addr, e.getMessage());
} catch (InterruptedException e) {
log.warn("unregister client InterruptedException from broker: {}, {}", addr, e.getMessage());
} catch (MQBrokerException e) {
log.warn("unregister client MQBrokerException from broker: {}, {}", addr, e.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public void send(Message msg,
*
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws RejectedExecutionException
*/
@Deprecated
Expand Down Expand Up @@ -597,7 +597,14 @@ private SendResult sendDefaultImpl(
default:
break;
}
} catch (RemotingException | MQClientException e) {
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
Expand Down Expand Up @@ -856,7 +863,19 @@ private SendResult sendKernelImpl(final Message msg,
}

return sendResult;
} catch (RemotingException | MQBrokerException | InterruptedException e) {
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
Expand Down Expand Up @@ -969,6 +988,7 @@ public void doExecuteEndTransactionHook(Message msg, String msgId, String broker
executeEndTransactionHook(context);
}
}

/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
Expand Down Expand Up @@ -1021,7 +1041,7 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
* @param msg
* @param mq
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
Expand Down Expand Up @@ -1151,7 +1171,7 @@ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCal
* @param selector
* @param arg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
Expand Down Expand Up @@ -1491,7 +1511,8 @@ public void onException(Throwable e) {
}
}

private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture,
long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class RequestFutureHolder {
private static InternalLogger log = ClientLogger.getLog();
private static final RequestFutureHolder INSTANCE = new RequestFutureHolder();
private ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
private final Set<DefaultMQProducerImpl> producerSet = new HashSet<>();
private final Set<DefaultMQProducerImpl> producerSet = new HashSet<DefaultMQProducerImpl>();
private ScheduledExecutorService scheduledExecutorService = null;

public ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
List<Span> spanList = new ArrayList<>();
List<Span> spanList = new ArrayList<Span>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand Down Expand Up @@ -95,7 +96,9 @@ public class DefaultLitePullConsumerTest {
@Before
public void init() throws Exception {
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.forEach((s, instance) -> instance.shutdown());
for (Map.Entry<String, MQClientInstance> entry : factoryTable.entrySet()) {
entry.getValue().shutdown();
}
factoryTable.clear();

Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
Expand Down Expand Up @@ -481,7 +484,7 @@ public void testCheckConfig_Exception() {
}

@Test
public void testComputePullFromWhereReturnedNotFound() throws Exception{
public void testComputePullFromWhereReturnedNotFound() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
Expand All @@ -491,7 +494,7 @@ public void testComputePullFromWhereReturnedNotFound() throws Exception{
}

@Test
public void testComputePullFromWhereReturned() throws Exception{
public void testComputePullFromWhereReturned() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
Expand All @@ -500,9 +503,8 @@ public void testComputePullFromWhereReturned() throws Exception{
assertThat(offset).isEqualTo(100);
}


@Test
public void testComputePullFromLast() throws Exception{
public void testComputePullFromLast() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
Expand All @@ -513,13 +515,13 @@ public void testComputePullFromLast() throws Exception{
}

@Test
public void testComputePullByTimeStamp() throws Exception{
public void testComputePullByTimeStamp() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
MessageQueue messageQueue = createMessageQueue();
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L);
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class), anyLong())).thenReturn(100L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(100);
}
Expand Down Expand Up @@ -660,7 +662,8 @@ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, P
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}

private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
private static void suppressUpdateTopicRouteInfoFromNameServer(
DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
Expand Down
Loading