From 559d136de355ba93b12dfaabe92ffdf30f154965 Mon Sep 17 00:00:00 2001 From: lwclover Date: Tue, 30 Nov 2021 15:47:12 +0800 Subject: [PATCH 1/8] fix:when broker is down,async send model can not retry --- .../rocketmq/client/impl/producer/DefaultMQProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 2a784b55cbd..ffbc2cd8fb6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -582,7 +582,7 @@ private SendResult sendDefaultImpl( MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; - int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; + int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1 + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(); int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { From 7bb99a76a729d8d5ef8d651cb4a511d229e82284 Mon Sep 17 00:00:00 2001 From: lwclover Date: Tue, 28 Dec 2021 16:39:13 +0800 Subject: [PATCH 2/8] fix Issue #3556 --- .../rocketmq/client/impl/MQClientAPIImpl.java | 104 +++++++++--------- .../impl/producer/DefaultMQProducerImpl.java | 2 +- 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 89c998b7f4e..0a173b729a4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -523,65 +523,71 @@ private void sendMessageAsync( final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { final long beginStartTime = System.currentTimeMillis(); - this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { - @Override - public void operationComplete(ResponseFuture responseFuture) { - long cost = System.currentTimeMillis() - beginStartTime; - RemotingCommand response = responseFuture.getResponseCommand(); - if (null == sendCallback && response != null) { - - try { - SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); - if (context != null && sendResult != null) { - context.setSendResult(sendResult); - context.getProducer().executeSendMessageHookAfter(context); - } - } catch (Throwable e) { - } - - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); - return; - } - - if (response != null) { - try { - SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); - assert sendResult != null; - if (context != null) { - context.setSendResult(sendResult); - context.getProducer().executeSendMessageHookAfter(context); - } + try { + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + long cost = System.currentTimeMillis() - beginStartTime; + RemotingCommand response = responseFuture.getResponseCommand(); + if (null == sendCallback && response != null) { try { - sendCallback.onSuccess(sendResult); + SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); + if (context != null && sendResult != null) { + context.setSendResult(sendResult); + context.getProducer().executeSendMessageHookAfter(context); + } } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); - } catch (Exception e) { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, e, context, false, producer); + return; } - } else { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); - if (!responseFuture.isSendRequestOK()) { - MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); - } else if (responseFuture.isTimeout()) { - MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", - responseFuture.getCause()); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); + + if (response != null) { + try { + SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); + assert sendResult != null; + if (context != null) { + context.setSendResult(sendResult); + context.getProducer().executeSendMessageHookAfter(context); + } + + try { + sendCallback.onSuccess(sendResult); + } catch (Throwable e) { + } + + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); + } catch (Exception e) { + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, e, context, false, producer); + } } else { - MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); + if (!responseFuture.isSendRequestOK()) { + MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } else if (responseFuture.isTimeout()) { + MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", + responseFuture.getCause()); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } else { + MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } } } - } - }); + }); + } catch (RemotingConnectException ex) { + long cost = System.currentTimeMillis() - beginStartTime; + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } } private void onExceptionImpl(final String brokerName, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index c882769099d..cedbbdb6ca3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -557,7 +557,7 @@ private SendResult sendDefaultImpl( MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; - int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1 + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(); + int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { From a67469e919e456c8014276fd0b462837d92d1e26 Mon Sep 17 00:00:00 2001 From: lwclover Date: Tue, 28 Dec 2021 17:06:51 +0800 Subject: [PATCH 3/8] fix Issue #3556 --- .../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 0a173b729a4..df699e6257a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -585,6 +585,7 @@ public void operationComplete(ResponseFuture responseFuture) { }); } catch (RemotingConnectException ex) { long cost = System.currentTimeMillis() - beginStartTime; + producer.updateFaultItem(brokerName, cost, true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } From 1e5878d0cbba9d8d06ea4c96630cdfda0c1f0754 Mon Sep 17 00:00:00 2001 From: lwclover Date: Tue, 28 Dec 2021 19:08:10 +0800 Subject: [PATCH 4/8] fix Issue #3556 --- .../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index df699e6257a..b862299b3c9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -583,7 +583,7 @@ public void operationComplete(ResponseFuture responseFuture) { } } }); - } catch (RemotingConnectException ex) { + } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException ex) { long cost = System.currentTimeMillis() - beginStartTime; producer.updateFaultItem(brokerName, cost, true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, From a0b2bd7dcde8680f851c6bdf64035fc675e9ea3f Mon Sep 17 00:00:00 2001 From: lwclover Date: Tue, 28 Dec 2021 19:19:48 +0800 Subject: [PATCH 5/8] fix Issue #3556 --- .../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b862299b3c9..6d83f0033dd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -583,7 +583,7 @@ public void operationComplete(ResponseFuture responseFuture) { } } }); - } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException ex) { + } catch (Exception ex) { long cost = System.currentTimeMillis() - beginStartTime; producer.updateFaultItem(brokerName, cost, true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, From c424e1ad81410ae29f7c35dfd576a7be4c2dd58c Mon Sep 17 00:00:00 2001 From: lwclover Date: Wed, 12 Jan 2022 15:05:30 +0800 Subject: [PATCH 6/8] async send model success retry when occurs Exception --- .../client/impl/MQClientAPIImplTest.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index e1b3bed76fd..2bd2aca7982 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -194,26 +194,18 @@ public void onException(Throwable e) { } @Test - public void testSendMessageAsync_WithException() throws RemotingException, InterruptedException, MQBrokerException { + public void testSendMessageAsync_Success_WithException() throws RemotingException, InterruptedException, MQBrokerException { doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient) .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); - try { - mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), - 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl); - failBecauseExceptionWasNotThrown(RemotingException.class); - } catch (RemotingException e) { - assertThat(e).hasMessage("Remoting Exception in Test"); - } + SendMessageContext sendMessageContext = new SendMessageContext(); + sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer())); + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), + 3 * 1000, CommunicationMode.ASYNC, sendMessageContext, defaultMQProducerImpl); doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient) .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); - try { mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), - 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl); - failBecauseExceptionWasNotThrown(InterruptedException.class); - } catch (InterruptedException e) { - assertThat(e).hasMessage("Interrupted Exception in Test"); - } + 3 * 1000, CommunicationMode.ASYNC, sendMessageContext, defaultMQProducerImpl); } @Test From 4c9d3010e050b5afeaf1169ed795598c0752280d Mon Sep 17 00:00:00 2001 From: lwclover Date: Fri, 14 Jan 2022 09:17:15 +0800 Subject: [PATCH 7/8] test case testSendMessageAsync_WithException --- .../client/impl/MQClientAPIImplTest.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 2bd2aca7982..02dd5316634 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -194,18 +194,36 @@ public void onException(Throwable e) { } @Test - public void testSendMessageAsync_Success_WithException() throws RemotingException, InterruptedException, MQBrokerException { + public void testSendMessageAsync_WithException() throws RemotingException, InterruptedException, MQBrokerException { doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient) .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); SendMessageContext sendMessageContext = new SendMessageContext(); sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer())); - mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), - 3 * 1000, CommunicationMode.ASYNC, sendMessageContext, defaultMQProducerImpl); + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC, + new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + } + @Override + public void onException(Throwable e) { + assertThat(e).hasMessage("Remoting Exception in Test"); + } + }, + null, null, 0, sendMessageContext, defaultMQProducerImpl); doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient) .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); - mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), - 3 * 1000, CommunicationMode.ASYNC, sendMessageContext, defaultMQProducerImpl); + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC, + new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + } + @Override + public void onException(Throwable e) { + assertThat(e).hasMessage("Interrupted Exception in Test"); + } + }, + null, null, 0, sendMessageContext, defaultMQProducerImpl); } @Test From 895decbfc15f4c1b10b649875a96c0fbd1b4f2f3 Mon Sep 17 00:00:00 2001 From: lwclover Date: Fri, 14 Jan 2022 09:54:53 +0800 Subject: [PATCH 8/8] modify test case testSendMessageAsync_WithException --- .../rocketmq/client/impl/MQClientAPIImplTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 02dd5316634..c8446bdf104 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -206,10 +206,9 @@ public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable e) { - assertThat(e).hasMessage("Remoting Exception in Test"); + assertThat(e).hasMessage("Remoting Exception in Test"); } - }, - null, null, 0, sendMessageContext, defaultMQProducerImpl); + }, null, null, 0, sendMessageContext, defaultMQProducerImpl); doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient) .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -220,10 +219,9 @@ public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable e) { - assertThat(e).hasMessage("Interrupted Exception in Test"); + assertThat(e).hasMessage("Interrupted Exception in Test"); } - }, - null, null, 0, sendMessageContext, defaultMQProducerImpl); + }, null, null, 0, sendMessageContext, defaultMQProducerImpl); } @Test