diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ
index 999bf3ef87..edbe1dddcc 100644
--- a/LICENSE-MPL-RabbitMQ
+++ b/LICENSE-MPL-RabbitMQ
@@ -364,4 +364,4 @@ file, then You may include the notice in a location (such as a LICENSE
file in a relevant directory) where a recipient would be likely to look
for such a notice.
-Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
\ No newline at end of file
+Copyright (c) 2007-2021 VMware, Inc. or its affiliates.
diff --git a/RUNNING_TESTS.md b/RUNNING_TESTS.md
index 32ac74e016..b7241210ca 100644
--- a/RUNNING_TESTS.md
+++ b/RUNNING_TESTS.md
@@ -107,4 +107,4 @@ Launch the tests:
```
Note the `rabbitmqctl.bin` system property uses the syntax
-`DOCKER:{containerId}`.
\ No newline at end of file
+`DOCKER:{containerId}`.
diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
index ef70124f55..8c8d8785b2 100644
--- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java
+++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
@@ -19,8 +19,11 @@
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
+import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map.Entry;
import java.util.function.BiConsumer;
@@ -49,6 +52,7 @@
*/
public class ConnectionFactory implements Cloneable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactory.class);
private static final int MAX_UNSIGNED_SHORT = 65535;
/** Default user name */
@@ -186,6 +190,7 @@ public class ConnectionFactory implements Cloneable {
* @since 5.4.0
*/
private RetryHandler topologyRecoveryRetryHandler;
+ private RecoveredQueueNameSupplier recoveredQueueNameSupplier;
/**
* Traffic listener notified of inbound and outbound {@link Command}s.
@@ -1261,6 +1266,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
+ result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
result.setTrafficListener(trafficListener);
result.setCredentialsRefreshService(credentialsRefreshService);
return result;
@@ -1639,6 +1645,15 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
}
+ /**
+ * Set the recovered queue name supplier. Default is use the same queue name when recovering queues.
+ *
+ * @param recoveredQueueNameSupplier queue name supplier
+ */
+ public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
+ this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
+ }
+
/**
* Traffic listener notified of inbound and outbound {@link Command}s.
*
diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java
index 4e636767ce..0ce3642522 100644
--- a/src/main/java/com/rabbitmq/client/MetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java
@@ -36,13 +36,21 @@ public interface MetricsCollector {
void basicPublish(Channel channel, long deliveryTag);
- void basicPublishFailure(Channel channel, Throwable cause);
+ default void basicPublishFailure(Channel channel, Throwable cause) {
- void basicPublishAck(Channel channel, long deliveryTag, boolean multiple);
+ }
- void basicPublishNack(Channel channel, long deliveryTag, boolean multiple);
+ default void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
- void basicPublishUnrouted(Channel channel);
+ }
+
+ default void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
+
+ }
+
+ default void basicPublishUnrouted(Channel channel) {
+
+ }
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
index 60bfd549eb..e21f58447e 100644
--- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
+++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
@@ -397,35 +397,35 @@ public void start()
}
try {
- int channelMax =
+ int negotiatedChannelMax =
negotiateChannelMax(this.requestedChannelMax,
connTune.getChannelMax());
- if (!checkUnsignedShort(channelMax)) {
- throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + channelMax);
+ if (!checkUnsignedShort(negotiatedChannelMax)) {
+ throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + negotiatedChannelMax);
}
- _channelManager = instantiateChannelManager(channelMax, threadFactory);
+ _channelManager = instantiateChannelManager(negotiatedChannelMax, threadFactory);
int frameMax =
negotiatedMaxValue(this.requestedFrameMax,
connTune.getFrameMax());
this._frameMax = frameMax;
- int heartbeat =
+ int negotiatedHeartbeat =
negotiatedMaxValue(this.requestedHeartbeat,
connTune.getHeartbeat());
- if (!checkUnsignedShort(heartbeat)) {
- throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + heartbeat);
+ if (!checkUnsignedShort(negotiatedHeartbeat)) {
+ throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + negotiatedHeartbeat);
}
- setHeartbeat(heartbeat);
+ setHeartbeat(negotiatedHeartbeat);
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
- .channelMax(channelMax)
+ .channelMax(negotiatedChannelMax)
.frameMax(frameMax)
- .heartbeat(heartbeat)
+ .heartbeat(negotiatedHeartbeat)
.build());
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
.virtualHost(_virtualHost)
diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
index e3edc917fc..8ecdb32d8d 100644
--- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
@@ -151,9 +151,9 @@ public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple
@Override
public void basicPublishUnrouted(Channel channel) {
try {
- markPublishedMessageNotRouted();
+ markPublishedMessageUnrouted();
} catch(Exception e) {
- LOGGER.info("Error while computing metrics in markPublishedMessageNotRouted: " + e.getMessage());
+ LOGGER.info("Error while computing metrics in markPublishedMessageUnrouted: " + e.getMessage());
}
}
@@ -420,5 +420,5 @@ private ChannelState(Channel channel) {
/**
* Marks the event of a published message not being routed.
*/
- protected abstract void markPublishedMessageNotRouted();
+ protected abstract void markPublishedMessageUnrouted();
}
diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
index a2b49d0f98..b7e6f3dd68 100644
--- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
+++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
@@ -21,6 +21,7 @@
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.TrafficListener;
+import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
@@ -54,7 +55,7 @@ public class ConnectionParams {
private TopologyRecoveryFilter topologyRecoveryFilter;
private Predicate connectionRecoveryTriggeringCondition;
private RetryHandler topologyRecoveryRetryHandler;
-
+ private RecoveredQueueNameSupplier recoveredQueueNameSupplier;
private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
@@ -272,6 +273,14 @@ public RetryHandler getTopologyRecoveryRetryHandler() {
return topologyRecoveryRetryHandler;
}
+ public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
+ this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
+ }
+
+ public RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
+ return recoveredQueueNameSupplier;
+ }
+
public void setTrafficListener(TrafficListener trafficListener) {
this.trafficListener = trafficListener;
}
diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
index d38c0b14c4..9d8cd5bd1e 100644
--- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
@@ -148,7 +148,7 @@ protected void markMessagePublishNotAcknowledged() {
}
@Override
- protected void markPublishedMessageNotRouted() {
+ protected void markPublishedMessageUnrouted() {
unroutedPublishedMessages.increment();
}
diff --git a/src/main/java/com/rabbitmq/client/impl/OAuth2ClientCredentialsGrantCredentialsProvider.java b/src/main/java/com/rabbitmq/client/impl/OAuth2ClientCredentialsGrantCredentialsProvider.java
index cc8189ab05..a76fb81dfd 100644
--- a/src/main/java/com/rabbitmq/client/impl/OAuth2ClientCredentialsGrantCredentialsProvider.java
+++ b/src/main/java/com/rabbitmq/client/impl/OAuth2ClientCredentialsGrantCredentialsProvider.java
@@ -595,5 +595,4 @@ private SSLSocketFactory sslSocketFactory() {
}
}
-
}
diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java
index e504ac5fc6..07e7780817 100644
--- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java
@@ -125,7 +125,7 @@ protected void markMessagePublishNotAcknowledged() {
}
@Override
- protected void markPublishedMessageNotRouted() {
+ protected void markPublishedMessageUnrouted() {
publishUnroutedMessages.mark();
}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java
index 97cacc81b0..cfba283dd2 100644
--- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java
@@ -355,7 +355,8 @@ public void queueDeclareNoWait(String queue,
durable(durable).
exclusive(exclusive).
autoDelete(autoDelete).
- arguments(arguments);
+ arguments(arguments).
+ recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
recordQueue(queue, meta);
@@ -848,7 +849,8 @@ private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable,
durable(durable).
exclusive(exclusive).
autoDelete(autoDelete).
- arguments(arguments);
+ arguments(arguments).
+ recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
q.serverNamed(true);
}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
index 98d5a4d610..1b2225d48f 100644
--- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
@@ -96,6 +96,7 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
private final Predicate connectionRecoveryTriggeringCondition;
private final RetryHandler retryHandler;
+ private final RecoveredQueueNameSupplier recoveredQueueNameSupplier;
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List addrs) {
this(params, f, new ListAddressResolver(addrs));
@@ -119,6 +120,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
letAllPassFilter() : params.getTopologyRecoveryFilter();
this.retryHandler = params.getTopologyRecoveryRetryHandler();
+ this.recoveredQueueNameSupplier = params.getRecoveredQueueNameSupplier() == null ?
+ RecordedQueue.DEFAULT_QUEUE_NAME_SUPPLIER : params.getRecoveredQueueNameSupplier();
}
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -564,6 +567,10 @@ public void addConsumerRecoveryListener(ConsumerRecoveryListener listener) {
public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
this.consumerRecoveryListeners.remove(listener);
}
+
+ RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
+ return this.recoveredQueueNameSupplier;
+ }
private synchronized void beginAutomaticRecovery() throws InterruptedException {
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
@@ -759,42 +766,15 @@ public void recoverExchange(RecordedExchange x, boolean retry) {
}
}
-
+ /**
+ * Recover the queue. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
+ * @param oldName queue name
+ * @param q recorded queue
+ * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
+ */
public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
try {
- if (topologyRecoveryFilter.filterQueue(q)) {
- LOGGER.debug("Recovering {}", q);
- if (retry) {
- final RecordedQueue entity = q;
- q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
- entity.recover();
- return null;
- }).getRecordedEntity();
- } else {
- q.recover();
- }
- String newName = q.getName();
- if (!oldName.equals(newName)) {
- // make sure server-named queues are re-added with
- // their new names. MK.
- synchronized (this.recordedQueues) {
- this.propagateQueueNameChangeToBindings(oldName, newName);
- this.propagateQueueNameChangeToConsumers(oldName, newName);
- // bug26552:
- // remove old name after we've updated the bindings and consumers,
- // plus only for server-named queues, both to make sure we don't lose
- // anything to recover. MK.
- if(q.isServerNamed()) {
- deleteRecordedQueue(oldName);
- }
- this.recordedQueues.put(newName, q);
- }
- }
- for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
- qrl.queueRecovered(oldName, newName);
- }
- LOGGER.debug("{} has recovered", q);
- }
+ internalRecoverQueue(oldName, q, retry);
} catch (Exception cause) {
final String message = "Caught an exception while recovering queue " + oldName +
": " + cause.getMessage();
@@ -802,6 +782,48 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
}
}
+
+ /**
+ * Recover the queue. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
+ * @param oldName queue name
+ * @param q recorded queue
+ * @throws Exception if an error occurs recovering the queue
+ */
+ void recoverQueue(final String oldName, RecordedQueue q) throws Exception {
+ internalRecoverQueue(oldName, q, false);
+ }
+
+ private void internalRecoverQueue(final String oldName, RecordedQueue q, boolean retry) throws Exception {
+ if (topologyRecoveryFilter.filterQueue(q)) {
+ LOGGER.debug("Recovering {}", q);
+ if (retry) {
+ final RecordedQueue entity = q;
+ q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
+ entity.recover();
+ return null;
+ }).getRecordedEntity();
+ } else {
+ q.recover();
+ }
+ String newName = q.getName();
+ if (!oldName.equals(newName)) {
+ // make sure queues are re-added with
+ // their new names, if applicable. MK.
+ synchronized (this.recordedQueues) {
+ this.propagateQueueNameChangeToBindings(oldName, newName);
+ this.propagateQueueNameChangeToConsumers(oldName, newName);
+ // bug26552:
+ // remove old name after we've updated the bindings and consumers,
+ deleteRecordedQueue(oldName);
+ this.recordedQueues.put(newName, q);
+ }
+ }
+ for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
+ qrl.queueRecovered(oldName, newName);
+ }
+ LOGGER.debug("{} has recovered", q);
+ }
+ }
public void recoverBinding(RecordedBinding b, boolean retry) {
try {
@@ -825,34 +847,15 @@ public void recoverBinding(RecordedBinding b, boolean retry) {
}
}
+ /**
+ * Recover the consumer. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
+ * @param tag consumer tag
+ * @param consumer recorded consumer
+ * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
+ */
public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
try {
- if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
- LOGGER.debug("Recovering {}", consumer);
- String newTag = null;
- if (retry) {
- final RecordedConsumer entity = consumer;
- RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
- consumer = (RecordedConsumer) retryResult.getRecordedEntity();
- newTag = (String) retryResult.getResult();
- } else {
- newTag = consumer.recover();
- }
-
- // make sure server-generated tags are re-added. MK.
- if(tag != null && !tag.equals(newTag)) {
- synchronized (this.consumers) {
- this.consumers.remove(tag);
- this.consumers.put(newTag, consumer);
- }
- consumer.getChannel().updateConsumerTag(tag, newTag);
- }
-
- for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
- crl.consumerRecovered(tag, newTag);
- }
- LOGGER.debug("{} has recovered", consumer);
- }
+ internalRecoverConsumer(tag, consumer, retry);
} catch (Exception cause) {
final String message = "Caught an exception while recovering consumer " + tag +
": " + cause.getMessage();
@@ -860,6 +863,45 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
}
}
+
+ /**
+ * Recover the consumer. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
+ * @param tag consumer tag
+ * @param consumer recorded consumer
+ * @throws Exception if an error occurs recovering the consumer
+ */
+ void recoverConsumer(final String tag, RecordedConsumer consumer) throws Exception {
+ internalRecoverConsumer(tag, consumer, false);
+ }
+
+ private void internalRecoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) throws Exception {
+ if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
+ LOGGER.debug("Recovering {}", consumer);
+ String newTag = null;
+ if (retry) {
+ final RecordedConsumer entity = consumer;
+ RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
+ consumer = (RecordedConsumer) retryResult.getRecordedEntity();
+ newTag = (String) retryResult.getResult();
+ } else {
+ newTag = consumer.recover();
+ }
+
+ // make sure server-generated tags are re-added. MK.
+ if(tag != null && !tag.equals(newTag)) {
+ synchronized (this.consumers) {
+ this.consumers.remove(tag);
+ this.consumers.put(newTag, consumer);
+ }
+ consumer.getChannel().updateConsumerTag(tag, newTag);
+ }
+
+ for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
+ crl.consumerRecovered(tag, newTag);
+ }
+ LOGGER.debug("{} has recovered", consumer);
+ }
+ }
private RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable recoveryAction) throws Exception {
if (this.retryHandler == null) {
@@ -888,6 +930,7 @@ private RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable
}
}
+
private void propagateQueueNameChangeToBindings(String oldName, String newName) {
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
if (b.getDestination().equals(oldName)) {
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java
index 3580fe091e..52caced2af 100644
--- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java
@@ -23,6 +23,10 @@
*/
public class RecordedQueue extends RecordedNamedEntity {
public static final String EMPTY_STRING = "";
+
+ static final RecoveredQueueNameSupplier DEFAULT_QUEUE_NAME_SUPPLIER = q -> q.isServerNamed() ? EMPTY_STRING : q.name;
+
+ private RecoveredQueueNameSupplier recoveredQueueNameSupplier = DEFAULT_QUEUE_NAME_SUPPLIER;
private boolean durable;
private boolean autoDelete;
private Map arguments;
@@ -37,6 +41,10 @@ public RecordedQueue exclusive(boolean value) {
this.exclusive = value;
return this;
}
+
+ public boolean isExclusive() {
+ return this.exclusive;
+ }
public RecordedQueue serverNamed(boolean value) {
this.serverNamed = value;
@@ -47,8 +55,6 @@ public boolean isServerNamed() {
return this.serverNamed;
}
- public boolean isAutoDelete() { return this.autoDelete; }
-
public void recover() throws IOException {
this.name = this.channel.getDelegate().queueDeclare(this.getNameToUseForRecovery(),
this.durable,
@@ -58,27 +64,40 @@ public void recover() throws IOException {
}
public String getNameToUseForRecovery() {
- if(isServerNamed()) {
- return EMPTY_STRING;
- } else {
- return this.name;
- }
+ return recoveredQueueNameSupplier.getNameToUseForRecovery(this);
}
public RecordedQueue durable(boolean value) {
this.durable = value;
return this;
}
+
+ public boolean isDurable() {
+ return this.durable;
+ }
public RecordedQueue autoDelete(boolean value) {
this.autoDelete = value;
return this;
}
+
+ public boolean isAutoDelete() {
+ return this.autoDelete;
+ }
public RecordedQueue arguments(Map value) {
this.arguments = value;
return this;
}
+
+ public Map getArguments() {
+ return arguments;
+ }
+
+ public RecordedQueue recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
+ this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
+ return this;
+ }
@Override
public String toString() {
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveredQueueNameSupplier.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveredQueueNameSupplier.java
new file mode 100644
index 0000000000..c1fb3bd930
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveredQueueNameSupplier.java
@@ -0,0 +1,18 @@
+package com.rabbitmq.client.impl.recovery;
+
+/**
+ * Functional callback interface that can be used to rename a queue during topology recovery.
+ * Can use along with {@link QueueRecoveryListener} to know when such a queue has been recovered successfully.
+ *
+ * @see QueueRecoveryListener
+ */
+@FunctionalInterface
+public interface RecoveredQueueNameSupplier {
+
+ /**
+ * Get the queue name to use when recovering this RecordedQueue entity
+ * @param recordedQueue the queue to be recovered
+ * @return new queue name
+ */
+ String getNameToUseForRecovery(final RecordedQueue recordedQueue);
+}
\ No newline at end of file
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java
index 28a8f3cb99..c452b07f71 100644
--- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java
@@ -19,6 +19,9 @@
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.function.BiPredicate;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
@@ -63,7 +66,7 @@ public abstract class TopologyRecoveryRetryLogic {
if (context.entity() instanceof RecordedQueue) {
final RecordedQueue recordedQueue = context.queue();
AutorecoveringConnection connection = context.connection();
- connection.recoverQueue(recordedQueue.getName(), recordedQueue, false);
+ connection.recoverQueue(recordedQueue.getName(), recordedQueue);
}
return null;
};
@@ -77,9 +80,7 @@ public abstract class TopologyRecoveryRetryLogic {
AutorecoveringConnection connection = context.connection();
RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination());
if (recordedQueue != null) {
- connection.recoverQueue(
- recordedQueue.getName(), recordedQueue, false
- );
+ connection.recoverQueue(recordedQueue.getName(), recordedQueue);
}
}
return null;
@@ -123,9 +124,7 @@ public abstract class TopologyRecoveryRetryLogic {
AutorecoveringConnection connection = context.connection();
RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue());
if (recordedQueue != null) {
- connection.recoverQueue(
- recordedQueue.getName(), recordedQueue, false
- );
+ connection.recoverQueue(recordedQueue.getName(), recordedQueue);
}
}
return null;
@@ -166,7 +165,7 @@ public abstract class TopologyRecoveryRetryLogic {
} else if (consumer.getChannel() == channel) {
final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection());
RECOVER_CONSUMER_QUEUE.call(retryContext);
- context.connection().recoverConsumer(consumer.getConsumerTag(), consumer, false);
+ context.connection().recoverConsumer(consumer.getConsumerTag(), consumer);
RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext);
}
}
@@ -175,6 +174,44 @@ public abstract class TopologyRecoveryRetryLogic {
return null;
};
+ /**
+ * Recover earlier auto-delete or exclusive queues that share the same channel as this retry context
+ */
+ public static final DefaultRetryHandler.RetryOperation RECOVER_PREVIOUS_AUTO_DELETE_QUEUES = context -> {
+ if (context.entity() instanceof RecordedQueue) {
+ AutorecoveringConnection connection = context.connection();
+ RecordedQueue queue = context.queue();
+ // recover all queues for the same channel that had already been recovered successfully before this queue failed.
+ // If the previous ones were auto-delete or exclusive, they need recovered again
+ for (Map.Entry entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
+ if (entry.getValue() == queue) {
+ // we have gotten to the queue in this context. Since this is an ordered map we can now break
+ // as we know we have recovered all the earlier queues on this channel
+ break;
+ } else if (queue.getChannel() == entry.getValue().getChannel()
+ && (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
+ connection.recoverQueue(entry.getKey(), entry.getValue());
+ }
+ }
+ } else if (context.entity() instanceof RecordedQueueBinding) {
+ AutorecoveringConnection connection = context.connection();
+ Set queues = new LinkedHashSet<>();
+ for (Map.Entry entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
+ if (context.entity().getChannel() == entry.getValue().getChannel()
+ && (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
+ connection.recoverQueue(entry.getKey(), entry.getValue());
+ queues.add(entry.getValue().getName());
+ }
+ }
+ for (final RecordedBinding binding : Utility.copy(connection.getRecordedBindings())) {
+ if (binding instanceof RecordedQueueBinding && queues.contains(binding.getDestination())) {
+ binding.recover();
+ }
+ }
+ }
+ return null;
+ };
+
/**
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
* when their respective queue is not found.
@@ -189,11 +226,13 @@ public abstract class TopologyRecoveryRetryLogic {
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.queueRecoveryRetryOperation(RECOVER_CHANNEL
- .andThen(RECOVER_QUEUE))
+ .andThen(RECOVER_QUEUE)
+ .andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
.bindingRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_BINDING_QUEUE)
.andThen(RECOVER_BINDING)
- .andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
+ .andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)
+ .andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_CONSUMER_QUEUE)
.andThen(RECOVER_CONSUMER)
diff --git a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java
index b5f039cbf0..b1c9360ea5 100644
--- a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java
+++ b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java
@@ -25,7 +25,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class ChannelNTest {