diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 98d5a4d610..7b5a1e16af 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -759,42 +759,15 @@ public void recoverExchange(RecordedExchange x, boolean retry) { } } - + /** + * Recover the queue. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}. + * @param oldName queue name + * @param q recorded queue + * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection + */ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) { try { - if (topologyRecoveryFilter.filterQueue(q)) { - LOGGER.debug("Recovering {}", q); - if (retry) { - final RecordedQueue entity = q; - q = (RecordedQueue) wrapRetryIfNecessary(q, () -> { - entity.recover(); - return null; - }).getRecordedEntity(); - } else { - q.recover(); - } - String newName = q.getName(); - if (!oldName.equals(newName)) { - // make sure server-named queues are re-added with - // their new names. MK. - synchronized (this.recordedQueues) { - this.propagateQueueNameChangeToBindings(oldName, newName); - this.propagateQueueNameChangeToConsumers(oldName, newName); - // bug26552: - // remove old name after we've updated the bindings and consumers, - // plus only for server-named queues, both to make sure we don't lose - // anything to recover. MK. - if(q.isServerNamed()) { - deleteRecordedQueue(oldName); - } - this.recordedQueues.put(newName, q); - } - } - for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) { - qrl.queueRecovered(oldName, newName); - } - LOGGER.debug("{} has recovered", q); - } + internalRecoverQueue(oldName, q, retry); } catch (Exception cause) { final String message = "Caught an exception while recovering queue " + oldName + ": " + cause.getMessage(); @@ -802,6 +775,52 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) { this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e); } } + + /** + * Recover the queue. Errors are not retried and not delivered to the connection's {@link ExceptionHandler} + * @param oldName queue name + * @param q recorded queue + * @throws Exception if an error occurs recovering the queue + */ + void recoverQueue(final String oldName, RecordedQueue q) throws Exception { + internalRecoverQueue(oldName, q, false); + } + + private void internalRecoverQueue(final String oldName, RecordedQueue q, boolean retry) throws Exception { + if (topologyRecoveryFilter.filterQueue(q)) { + LOGGER.debug("Recovering {}", q); + if (retry) { + final RecordedQueue entity = q; + q = (RecordedQueue) wrapRetryIfNecessary(q, () -> { + entity.recover(); + return null; + }).getRecordedEntity(); + } else { + q.recover(); + } + String newName = q.getName(); + if (!oldName.equals(newName)) { + // make sure server-named queues are re-added with + // their new names. MK. + synchronized (this.recordedQueues) { + this.propagateQueueNameChangeToBindings(oldName, newName); + this.propagateQueueNameChangeToConsumers(oldName, newName); + // bug26552: + // remove old name after we've updated the bindings and consumers, + // plus only for server-named queues, both to make sure we don't lose + // anything to recover. MK. + if(q.isServerNamed()) { + deleteRecordedQueue(oldName); + } + this.recordedQueues.put(newName, q); + } + } + for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) { + qrl.queueRecovered(oldName, newName); + } + LOGGER.debug("{} has recovered", q); + } + } public void recoverBinding(RecordedBinding b, boolean retry) { try { @@ -825,34 +844,15 @@ public void recoverBinding(RecordedBinding b, boolean retry) { } } + /** + * Recover the consumer. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}. + * @param tag consumer tag + * @param consumer recorded consumer + * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection + */ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) { try { - if (this.topologyRecoveryFilter.filterConsumer(consumer)) { - LOGGER.debug("Recovering {}", consumer); - String newTag = null; - if (retry) { - final RecordedConsumer entity = consumer; - RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover); - consumer = (RecordedConsumer) retryResult.getRecordedEntity(); - newTag = (String) retryResult.getResult(); - } else { - newTag = consumer.recover(); - } - - // make sure server-generated tags are re-added. MK. - if(tag != null && !tag.equals(newTag)) { - synchronized (this.consumers) { - this.consumers.remove(tag); - this.consumers.put(newTag, consumer); - } - consumer.getChannel().updateConsumerTag(tag, newTag); - } - - for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) { - crl.consumerRecovered(tag, newTag); - } - LOGGER.debug("{} has recovered", consumer); - } + internalRecoverConsumer(tag, consumer, retry); } catch (Exception cause) { final String message = "Caught an exception while recovering consumer " + tag + ": " + cause.getMessage(); @@ -860,6 +860,45 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e); } } + + /** + * Recover the consumer. Errors are not retried and not delivered to the connection's {@link ExceptionHandler} + * @param tag consumer tag + * @param consumer recorded consumer + * @throws Exception if an error occurs recovering the consumer + */ + void recoverConsumer(final String tag, RecordedConsumer consumer) throws Exception { + internalRecoverConsumer(tag, consumer, false); + } + + private void internalRecoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) throws Exception { + if (this.topologyRecoveryFilter.filterConsumer(consumer)) { + LOGGER.debug("Recovering {}", consumer); + String newTag = null; + if (retry) { + final RecordedConsumer entity = consumer; + RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover); + consumer = (RecordedConsumer) retryResult.getRecordedEntity(); + newTag = (String) retryResult.getResult(); + } else { + newTag = consumer.recover(); + } + + // make sure server-generated tags are re-added. MK. + if(tag != null && !tag.equals(newTag)) { + synchronized (this.consumers) { + this.consumers.remove(tag); + this.consumers.put(newTag, consumer); + } + consumer.getChannel().updateConsumerTag(tag, newTag); + } + + for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) { + crl.consumerRecovered(tag, newTag); + } + LOGGER.debug("{} has recovered", consumer); + } + } private RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable recoveryAction) throws Exception { if (this.retryHandler == null) { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java index 3580fe091e..14dd3f69d4 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java @@ -37,6 +37,10 @@ public RecordedQueue exclusive(boolean value) { this.exclusive = value; return this; } + + public boolean isExclusive() { + return this.exclusive; + } public RecordedQueue serverNamed(boolean value) { this.serverNamed = value; @@ -47,8 +51,6 @@ public boolean isServerNamed() { return this.serverNamed; } - public boolean isAutoDelete() { return this.autoDelete; } - public void recover() throws IOException { this.name = this.channel.getDelegate().queueDeclare(this.getNameToUseForRecovery(), this.durable, @@ -69,17 +71,29 @@ public RecordedQueue durable(boolean value) { this.durable = value; return this; } + + public boolean isDurable() { + return this.durable; + } public RecordedQueue autoDelete(boolean value) { this.autoDelete = value; return this; } + + public boolean isAutoDelete() { + return this.autoDelete; + } public RecordedQueue arguments(Map value) { this.arguments = value; return this; } + public Map getArguments() { + return arguments; + } + @Override public String toString() { return "RecordedQueue[name=" + name + ", durable=" + durable + ", autoDelete=" + autoDelete + ", exclusive=" + exclusive + ", arguments=" + arguments + "serverNamed=" + serverNamed + ", channel=" + channel + "]"; diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java index f947d2fd4f..5781c54d36 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -18,6 +18,9 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.utility.Utility; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.Map.Entry; import java.util.function.BiPredicate; import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; @@ -62,7 +65,7 @@ public abstract class TopologyRecoveryRetryLogic { if (context.entity() instanceof RecordedQueue) { final RecordedQueue recordedQueue = context.queue(); AutorecoveringConnection connection = context.connection(); - connection.recoverQueue(recordedQueue.getName(), recordedQueue, false); + connection.recoverQueue(recordedQueue.getName(), recordedQueue); } return null; }; @@ -76,9 +79,7 @@ public abstract class TopologyRecoveryRetryLogic { AutorecoveringConnection connection = context.connection(); RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination()); if (recordedQueue != null) { - connection.recoverQueue( - recordedQueue.getName(), recordedQueue, false - ); + connection.recoverQueue(recordedQueue.getName(), recordedQueue); } } return null; @@ -122,9 +123,7 @@ public abstract class TopologyRecoveryRetryLogic { AutorecoveringConnection connection = context.connection(); RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue()); if (recordedQueue != null) { - connection.recoverQueue( - recordedQueue.getName(), recordedQueue, false - ); + connection.recoverQueue(recordedQueue.getName(), recordedQueue); } } return null; @@ -165,7 +164,7 @@ public abstract class TopologyRecoveryRetryLogic { } else if (consumer.getChannel() == channel) { final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection()); RECOVER_CONSUMER_QUEUE.call(retryContext); - context.connection().recoverConsumer(consumer.getConsumerTag(), consumer, false); + context.connection().recoverConsumer(consumer.getConsumerTag(), consumer); RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext); } } @@ -173,6 +172,44 @@ public abstract class TopologyRecoveryRetryLogic { } return null; }; + + /** + * Recover earlier auto-delete or exclusive queues that share the same channel as this retry context + */ + public static final DefaultRetryHandler.RetryOperation RECOVER_PREVIOUS_AUTO_DELETE_QUEUES = context -> { + if (context.entity() instanceof RecordedQueue) { + AutorecoveringConnection connection = context.connection(); + RecordedQueue queue = context.queue(); + // recover all queues for the same channel that had already been recovered successfully before this queue failed. + // If the previous ones were auto-delete or exclusive, they need recovered again + for (Entry entry : Utility.copy(connection.getRecordedQueues()).entrySet()) { + if (entry.getValue() == queue) { + // we have gotten to the queue in this context. Since this is an ordered map we can now break + // as we know we have recovered all the earlier queues on this channel + break; + } else if (queue.getChannel() == entry.getValue().getChannel() + && (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) { + connection.recoverQueue(entry.getKey(), entry.getValue()); + } + } + } else if (context.entity() instanceof RecordedQueueBinding) { + AutorecoveringConnection connection = context.connection(); + Set queues = new LinkedHashSet<>(); + for (Entry entry : Utility.copy(connection.getRecordedQueues()).entrySet()) { + if (context.entity().getChannel() == entry.getValue().getChannel() + && (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) { + connection.recoverQueue(entry.getKey(), entry.getValue()); + queues.add(entry.getValue().getName()); + } + } + for (final RecordedBinding binding : Utility.copy(connection.getRecordedBindings())) { + if (binding instanceof RecordedQueueBinding && queues.contains(binding.getDestination())) { + binding.recover(); + } + } + } + return null; + }; /** * Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers @@ -188,11 +225,13 @@ public abstract class TopologyRecoveryRetryLogic { .bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) .consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) .queueRecoveryRetryOperation(RECOVER_CHANNEL - .andThen(RECOVER_QUEUE)) + .andThen(RECOVER_QUEUE) + .andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES)) .bindingRecoveryRetryOperation(RECOVER_CHANNEL .andThen(RECOVER_BINDING_QUEUE) .andThen(RECOVER_BINDING) - .andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)) + .andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS) + .andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES)) .consumerRecoveryRetryOperation(RECOVER_CHANNEL .andThen(RECOVER_CONSUMER_QUEUE) .andThen(RECOVER_CONSUMER)