From a19879ba07b5bab42c3574010ba39da0f9e44e52 Mon Sep 17 00:00:00 2001 From: Michael Dent Date: Fri, 2 Jul 2021 13:46:13 -0500 Subject: [PATCH] Allow changing queue name during recovery --- .../com/rabbitmq/client/ConnectionFactory.java | 13 +++++++++++++ .../rabbitmq/client/impl/ConnectionParams.java | 12 +++++++++++- .../impl/recovery/AutorecoveringChannel.java | 6 ++++-- .../recovery/AutorecoveringConnection.java | 14 +++++++++----- .../client/impl/recovery/RecordedQueue.java | 15 ++++++++++----- .../recovery/RecoveredQueueNameSupplier.java | 18 ++++++++++++++++++ 6 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RecoveredQueueNameSupplier.java diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index b3b8612b59..6488be4838 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -19,6 +19,7 @@ import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier; import com.rabbitmq.client.impl.recovery.RetryHandler; import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import org.slf4j.Logger; @@ -190,6 +191,8 @@ public class ConnectionFactory implements Cloneable { * @since 5.4.0 */ private RetryHandler topologyRecoveryRetryHandler; + + private RecoveredQueueNameSupplier recoveredQueueNameSupplier; /** * Traffic listener notified of inbound and outbound {@link Command}s. @@ -1267,6 +1270,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setTopologyRecoveryFilter(topologyRecoveryFilter); result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition); result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler); + result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier); result.setTrafficListener(trafficListener); result.setCredentialsRefreshService(credentialsRefreshService); return result; @@ -1648,6 +1652,15 @@ public void setConnectionRecoveryTriggeringCondition(Predicate connectionRecoveryTriggeringCondition; private RetryHandler topologyRecoveryRetryHandler; - + private RecoveredQueueNameSupplier recoveredQueueNameSupplier; + private ExceptionHandler exceptionHandler; private ThreadFactory threadFactory; @@ -271,6 +273,14 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa public RetryHandler getTopologyRecoveryRetryHandler() { return topologyRecoveryRetryHandler; } + + public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) { + this.recoveredQueueNameSupplier = recoveredQueueNameSupplier; + } + + public RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() { + return recoveredQueueNameSupplier; + } public void setTrafficListener(TrafficListener trafficListener) { this.trafficListener = trafficListener; diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 97cacc81b0..cfba283dd2 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -355,7 +355,8 @@ public void queueDeclareNoWait(String queue, durable(durable). exclusive(exclusive). autoDelete(autoDelete). - arguments(arguments); + arguments(arguments). + recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier()); delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments); recordQueue(queue, meta); @@ -848,7 +849,8 @@ private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable, durable(durable). exclusive(exclusive). autoDelete(autoDelete). - arguments(arguments); + arguments(arguments). + recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier()); if (queue.equals(RecordedQueue.EMPTY_STRING)) { q.serverNamed(true); } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 98d5a4d610..c332e06fc4 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -96,6 +96,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC private final Predicate connectionRecoveryTriggeringCondition; private final RetryHandler retryHandler; + + private final RecoveredQueueNameSupplier recoveredQueueNameSupplier; public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List
addrs) { this(params, f, new ListAddressResolver(addrs)); @@ -119,6 +121,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, letAllPassFilter() : params.getTopologyRecoveryFilter(); this.retryHandler = params.getTopologyRecoveryRetryHandler(); + this.recoveredQueueNameSupplier = params.getRecoveredQueueNameSupplier() == null ? + RecordedQueue.DEFAULT_QUEUE_NAME_SUPPLIER : params.getRecoveredQueueNameSupplier(); } private void setupErrorOnWriteListenerForPotentialRecovery() { @@ -564,6 +568,10 @@ public void addConsumerRecoveryListener(ConsumerRecoveryListener listener) { public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) { this.consumerRecoveryListeners.remove(listener); } + + RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() { + return this.recoveredQueueNameSupplier; + } private synchronized void beginAutomaticRecovery() throws InterruptedException { final long delay = this.params.getRecoveryDelayHandler().getDelay(0); @@ -782,11 +790,7 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) { this.propagateQueueNameChangeToConsumers(oldName, newName); // bug26552: // remove old name after we've updated the bindings and consumers, - // plus only for server-named queues, both to make sure we don't lose - // anything to recover. MK. - if(q.isServerNamed()) { - deleteRecordedQueue(oldName); - } + deleteRecordedQueue(oldName); this.recordedQueues.put(newName, q); } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java index 3580fe091e..d18fe8c99d 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java @@ -23,6 +23,10 @@ */ public class RecordedQueue extends RecordedNamedEntity { public static final String EMPTY_STRING = ""; + + static final RecoveredQueueNameSupplier DEFAULT_QUEUE_NAME_SUPPLIER = q -> q.isServerNamed() ? EMPTY_STRING : q.name; + + private RecoveredQueueNameSupplier recoveredQueueNameSupplier = DEFAULT_QUEUE_NAME_SUPPLIER; private boolean durable; private boolean autoDelete; private Map arguments; @@ -58,11 +62,7 @@ public void recover() throws IOException { } public String getNameToUseForRecovery() { - if(isServerNamed()) { - return EMPTY_STRING; - } else { - return this.name; - } + return recoveredQueueNameSupplier.getNameToUseForRecovery(this); } public RecordedQueue durable(boolean value) { @@ -80,6 +80,11 @@ public RecordedQueue arguments(Map value) { return this; } + public RecordedQueue recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) { + this.recoveredQueueNameSupplier = recoveredQueueNameSupplier; + return this; + } + @Override public String toString() { return "RecordedQueue[name=" + name + ", durable=" + durable + ", autoDelete=" + autoDelete + ", exclusive=" + exclusive + ", arguments=" + arguments + "serverNamed=" + serverNamed + ", channel=" + channel + "]"; diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveredQueueNameSupplier.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveredQueueNameSupplier.java new file mode 100644 index 0000000000..c1fb3bd930 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveredQueueNameSupplier.java @@ -0,0 +1,18 @@ +package com.rabbitmq.client.impl.recovery; + +/** + * Functional callback interface that can be used to rename a queue during topology recovery. + * Can use along with {@link QueueRecoveryListener} to know when such a queue has been recovered successfully. + * + * @see QueueRecoveryListener + */ +@FunctionalInterface +public interface RecoveredQueueNameSupplier { + + /** + * Get the queue name to use when recovering this RecordedQueue entity + * @param recordedQueue the queue to be recovered + * @return new queue name + */ + String getNameToUseForRecovery(final RecordedQueue recordedQueue); +} \ No newline at end of file