Skip to content

Conversation

lwclover
Copy link
Contributor

@lwclover lwclover commented Nov 30, 2021

Fix Issue[3556]
RocketMQ client updates topic route infomations every 30 seconds,
When broker is down,Rocketmq client can not connect to the broker. Sending async message can not retry,Throwing a RemotingConnectException.

The RemotingConnectException below:

org.apache.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [5]ms, Topic: SSP_VC_COMMAND_CFG_PULL_PUSH_RESPOND_RECORD, BrokersSent: [broker-ssp-11]
See http://rocketmq.apache.org/docs/faq/ for further details.
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:610)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:87)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$2.run(DefaultMQProducerImpl.java:467)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <x.x.x.x10911> failed
	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:540)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:375)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf(MQClientAPIImpl.java:332)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf$accessor$wRKVBfSV(MQClientAPIImpl.java)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl$auxiliary$5onkBFUr.call(Unknown Source)
	at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:765)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:529)
	... 7 common frames omitted

The RemotingConnectException caused by: throw new RemotingConnectException(addr);

    @Override
    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
                }
                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

CommunicationMode.Async retry times is 1, when throws RemotingConnectException can not retry.

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException | MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                            continue;
                        } else {
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

@lwclover lwclover changed the title Under async send modelwhen broker is down, can not retry When broker is down, rocketmq client can not retry under Async send model Nov 30, 2021
@lwclover lwclover changed the title When broker is down, rocketmq client can not retry under Async send model Issue[#3556]When broker is down, rocketmq client can not retry under Async send model Nov 30, 2021
@lwclover lwclover changed the title Issue[#3556]When broker is down, rocketmq client can not retry under Async send model [Issue #3556]When broker is down, rocketmq client can not retry under Async send model Nov 30, 2021
@lwclover lwclover changed the title [Issue #3556]When broker is down, rocketmq client can not retry under Async send model [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model Nov 30, 2021
@coveralls
Copy link

coveralls commented Nov 30, 2021

Coverage Status

Coverage decreased (-2.2%) to 51.08% when pulling 895decb on lwclover:develop into 28d7849 on apache:develop.

@codecov-commenter
Copy link

codecov-commenter commented Nov 30, 2021

Codecov Report

Merging #3555 (895decb) into develop (28d7849) will decrease coverage by 0.30%.
The diff coverage is 35.89%.

Impacted file tree graph

@@              Coverage Diff              @@
##             develop    #3555      +/-   ##
=============================================
- Coverage      47.32%   47.02%   -0.31%     
+ Complexity      5038     4858     -180     
=============================================
  Files            627      636       +9     
  Lines          41348    42249     +901     
  Branches        5372     5521     +149     
=============================================
+ Hits           19568    19867     +299     
- Misses         19356    19891     +535     
- Partials        2424     2491      +67     
Impacted Files Coverage Δ
...g/apache/rocketmq/client/impl/MQClientAPIImpl.java 13.98% <35.89%> (+1.10%) ⬆️
...ketmq/common/protocol/body/ConsumerConnection.java 95.83% <0.00%> (-4.17%) ⬇️
...a/org/apache/rocketmq/filter/util/BloomFilter.java 59.13% <0.00%> (-2.16%) ⬇️
...ava/org/apache/rocketmq/filter/util/BitsArray.java 58.11% <0.00%> (-1.71%) ⬇️
...a/org/apache/rocketmq/store/StoreStatsService.java 28.66% <0.00%> (-1.31%) ⬇️
...a/org/apache/rocketmq/broker/BrokerController.java 46.54% <0.00%> (-0.89%) ⬇️
...org/apache/rocketmq/store/DefaultMessageStore.java 55.48% <0.00%> (-0.76%) ⬇️
...e/rocketmq/client/impl/consumer/RebalanceImpl.java 42.18% <0.00%> (-0.40%) ⬇️
...ocketmq/broker/processor/SendMessageProcessor.java 39.59% <0.00%> (-0.30%) ⬇️
...java/org/apache/rocketmq/broker/BrokerStartup.java 5.47% <0.00%> (-0.20%) ⬇️
... and 28 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 28d7849...895decb. Read the comment docs.

@XiaoyiPeng
Copy link
Contributor

Asynchronous retry is here, in class DefaultMQProducerImpl, line: 843

private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// ...
// line#843:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);

}

@lwclover
Copy link
Contributor Author

lwclover commented Dec 1, 2021

sendKernelImpl

This is when the broker can connect to, But when can not connect to the broker, can not retry

@guyinyou
Copy link
Contributor

guyinyou commented Dec 3, 2021

In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.

@lwclover
Copy link
Contributor Author

lwclover commented Dec 6, 2021

In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.

the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException

@guyinyou
Copy link
Contributor

guyinyou commented Dec 6, 2021

In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.

the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException

I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"

@Git-Yang
Copy link
Member

Git-Yang commented Dec 6, 2021

Agree, in the asynchronous sending mode, there is no retry when the connection establishment fails, and only when the request and response fail.

@lwclover
Copy link
Contributor Author

lwclover commented Dec 6, 2021

In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.

the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException

I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"

In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.

the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException

I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"

No, after trying retryTimesWhenSendAsyncFailed times, it calls callback.onException method.

@lwclover
Copy link
Contributor Author

lwclover commented Dec 6, 2021

Agree, in the asynchronous sending mode, there is no retry when the connection establishment fails, and only when the request and response fail.

Thank you for labeling the bug

@lwclover
Copy link
Contributor Author

@duhenglucky

@MatrixHB
Copy link
Contributor

@lwclover you are right.
When the channel to the broker cannot be established, it will directly enter the following code block.
image

Retry for send async is made for the execution of "org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeAsyncImpl", but in the given case, "invokeAsyncImpl" has not been executed yet.

@duhenglucky duhenglucky added this to the 4.9.3 milestone Dec 27, 2021
@guyinyou
Copy link
Contributor

In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.

the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException

I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"

In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.

the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException

I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"

No, after trying retryTimesWhenSendAsyncFailed times, it calls callback.onException method.

image
You are right. I looked at the code. Asynchronous sending will only go through the for loop when an exception is thrown. In other cases, it will only be executed once and return.

@vongosling
Copy link
Member

Backoff strategy optimization is one of the spots that I would most like to see in RocketMQ. You should consider the retry tuple in here, when you get callback invoked. you have finished the retry in async mode.

@guyinyou
Copy link
Contributor

image
It is feasible to try the "try catch" way proposed by @duhenglucky . This way can converge anomalies to one place for processing.

@lwclover lwclover requested a review from duhenglucky December 28, 2021 09:14
@lwclover lwclover requested a review from duhenglucky December 29, 2021 06:48
@duhenglucky
Copy link
Contributor

duhenglucky commented Jan 12, 2022

@lwclover Would you like to pay more attention to the failed tests: MQClientAPIImplTest.testSendMessageAsync_WithException:201 » NullPointer

@lwclover lwclover closed this Jan 12, 2022
@lwclover lwclover reopened this Jan 12, 2022
@lwclover lwclover closed this Jan 14, 2022
@lwclover lwclover reopened this Jan 14, 2022
@lwclover lwclover closed this Jan 14, 2022
@lwclover lwclover reopened this Jan 14, 2022
@odbozhou odbozhou merged commit 80bcfbc into apache:develop Jan 14, 2022
devfat pushed a commit to devfat/rocketmq that referenced this pull request Mar 3, 2022
GenerousMan pushed a commit to GenerousMan/rocketmq that referenced this pull request Aug 12, 2022
carlvine500 pushed a commit to carlvine500/rocketmq-apache that referenced this pull request Sep 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.