Skip to content

Commit 80bcfbc

Browse files
authored
[Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model (#3555)
1 parent bfaef39 commit 80bcfbc

File tree

2 files changed

+78
-63
lines changed

2 files changed

+78
-63
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -523,65 +523,72 @@ private void sendMessageAsync(
523523
final DefaultMQProducerImpl producer
524524
) throws InterruptedException, RemotingException {
525525
final long beginStartTime = System.currentTimeMillis();
526-
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
527-
@Override
528-
public void operationComplete(ResponseFuture responseFuture) {
529-
long cost = System.currentTimeMillis() - beginStartTime;
530-
RemotingCommand response = responseFuture.getResponseCommand();
531-
if (null == sendCallback && response != null) {
532-
533-
try {
534-
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
535-
if (context != null && sendResult != null) {
536-
context.setSendResult(sendResult);
537-
context.getProducer().executeSendMessageHookAfter(context);
538-
}
539-
} catch (Throwable e) {
540-
}
541-
542-
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
543-
return;
544-
}
545-
546-
if (response != null) {
547-
try {
548-
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
549-
assert sendResult != null;
550-
if (context != null) {
551-
context.setSendResult(sendResult);
552-
context.getProducer().executeSendMessageHookAfter(context);
553-
}
526+
try {
527+
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
528+
@Override
529+
public void operationComplete(ResponseFuture responseFuture) {
530+
long cost = System.currentTimeMillis() - beginStartTime;
531+
RemotingCommand response = responseFuture.getResponseCommand();
532+
if (null == sendCallback && response != null) {
554533

555534
try {
556-
sendCallback.onSuccess(sendResult);
535+
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
536+
if (context != null && sendResult != null) {
537+
context.setSendResult(sendResult);
538+
context.getProducer().executeSendMessageHookAfter(context);
539+
}
557540
} catch (Throwable e) {
558541
}
559542

560543
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
561-
} catch (Exception e) {
562-
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
563-
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
564-
retryTimesWhenSendFailed, times, e, context, false, producer);
544+
return;
565545
}
566-
} else {
567-
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
568-
if (!responseFuture.isSendRequestOK()) {
569-
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
570-
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
571-
retryTimesWhenSendFailed, times, ex, context, true, producer);
572-
} else if (responseFuture.isTimeout()) {
573-
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
574-
responseFuture.getCause());
575-
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
576-
retryTimesWhenSendFailed, times, ex, context, true, producer);
546+
547+
if (response != null) {
548+
try {
549+
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
550+
assert sendResult != null;
551+
if (context != null) {
552+
context.setSendResult(sendResult);
553+
context.getProducer().executeSendMessageHookAfter(context);
554+
}
555+
556+
try {
557+
sendCallback.onSuccess(sendResult);
558+
} catch (Throwable e) {
559+
}
560+
561+
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
562+
} catch (Exception e) {
563+
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
564+
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
565+
retryTimesWhenSendFailed, times, e, context, false, producer);
566+
}
577567
} else {
578-
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
579-
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
580-
retryTimesWhenSendFailed, times, ex, context, true, producer);
568+
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
569+
if (!responseFuture.isSendRequestOK()) {
570+
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
571+
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
572+
retryTimesWhenSendFailed, times, ex, context, true, producer);
573+
} else if (responseFuture.isTimeout()) {
574+
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
575+
responseFuture.getCause());
576+
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
577+
retryTimesWhenSendFailed, times, ex, context, true, producer);
578+
} else {
579+
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
580+
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
581+
retryTimesWhenSendFailed, times, ex, context, true, producer);
582+
}
581583
}
582584
}
583-
}
584-
});
585+
});
586+
} catch (Exception ex) {
587+
long cost = System.currentTimeMillis() - beginStartTime;
588+
producer.updateFaultItem(brokerName, cost, true);
589+
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
590+
retryTimesWhenSendFailed, times, ex, context, true, producer);
591+
}
585592
}
586593

587594
private void onExceptionImpl(final String brokerName,

client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -197,23 +197,31 @@ public void onException(Throwable e) {
197197
public void testSendMessageAsync_WithException() throws RemotingException, InterruptedException, MQBrokerException {
198198
doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient)
199199
.invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
200-
try {
201-
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
202-
3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
203-
failBecauseExceptionWasNotThrown(RemotingException.class);
204-
} catch (RemotingException e) {
205-
assertThat(e).hasMessage("Remoting Exception in Test");
206-
}
200+
SendMessageContext sendMessageContext = new SendMessageContext();
201+
sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
202+
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
203+
new SendCallback() {
204+
@Override
205+
public void onSuccess(SendResult sendResult) {
206+
}
207+
@Override
208+
public void onException(Throwable e) {
209+
assertThat(e).hasMessage("Remoting Exception in Test");
210+
}
211+
}, null, null, 0, sendMessageContext, defaultMQProducerImpl);
207212

208213
doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient)
209214
.invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
210-
try {
211-
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
212-
3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
213-
failBecauseExceptionWasNotThrown(InterruptedException.class);
214-
} catch (InterruptedException e) {
215-
assertThat(e).hasMessage("Interrupted Exception in Test");
216-
}
215+
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
216+
new SendCallback() {
217+
@Override
218+
public void onSuccess(SendResult sendResult) {
219+
}
220+
@Override
221+
public void onException(Throwable e) {
222+
assertThat(e).hasMessage("Interrupted Exception in Test");
223+
}
224+
}, null, null, 0, sendMessageContext, defaultMQProducerImpl);
217225
}
218226

219227
@Test

0 commit comments

Comments
 (0)