Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {

try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}

producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}

if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {

try {
sendCallback.onSuccess(sendResult);
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}

producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
return;
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);

if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}

try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}

producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
}
});
});
} catch (Exception ex) {
long cost = System.currentTimeMillis() - beginStartTime;
producer.updateFaultItem(brokerName, cost, true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}

private void onExceptionImpl(final String brokerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,31 @@ public void onException(Throwable e) {
public void testSendMessageAsync_WithException() throws RemotingException, InterruptedException, MQBrokerException {
doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient)
.invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
try {
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
failBecauseExceptionWasNotThrown(RemotingException.class);
} catch (RemotingException e) {
assertThat(e).hasMessage("Remoting Exception in Test");
}
SendMessageContext sendMessageContext = new SendMessageContext();
sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
assertThat(e).hasMessage("Remoting Exception in Test");
}
}, null, null, 0, sendMessageContext, defaultMQProducerImpl);

doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient)
.invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
try {
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
failBecauseExceptionWasNotThrown(InterruptedException.class);
} catch (InterruptedException e) {
assertThat(e).hasMessage("Interrupted Exception in Test");
}
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
assertThat(e).hasMessage("Interrupted Exception in Test");
}
}, null, null, 0, sendMessageContext, defaultMQProducerImpl);
}

@Test
Expand Down