diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 3be55ba49f0..668f9b6b690 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -480,14 +480,12 @@ public void send(Message msg, } /** - * @deprecated - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be - * provided in next version - * * @param msg * @param sendCallback * @param timeout the sendCallback will be invoked at most time * @throws RejectedExecutionException + * @deprecated It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version */ @Deprecated public void send(final Message msg, final SendCallback sendCallback, final long timeout) @@ -1034,10 +1032,6 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) } /** - * @deprecated - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be - * provided in next version - * * @param msg * @param mq * @param sendCallback @@ -1045,6 +1039,8 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) * @throws MQClientException * @throws RemotingException * @throws InterruptedException + * @deprecated It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version */ @Deprecated public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) @@ -1367,7 +1363,7 @@ public SendResult send(Message msg, return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } - public Message request(Message msg, + public Message request(final Message msg, long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); @@ -1382,6 +1378,7 @@ public Message request(Message msg, @Override public void onSuccess(SendResult sendResult) { requestResponseFuture.setSendRequestOk(true); + requestResponseFuture.putResponseMessage(msg); } @Override @@ -1412,6 +1409,7 @@ public void request(Message msg, final RequestCallback requestCallback, long tim @Override public void onSuccess(SendResult sendResult) { requestResponseFuture.setSendRequestOk(true); + requestResponseFuture.executeRequestCallback(); } @Override diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java index f200f77da50..69048de2d0e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java @@ -29,6 +29,8 @@ public static void main(String[] args) throws MQClientException, InterruptedExce long ttl = 3000; DefaultMQProducer producer = new DefaultMQProducer(producerGroup); + + //You need to set namesrvAddr to the address of the local namesrv producer.setNamesrvAddr("127.0.0.1:9876"); producer.start();