From ae0b44883c9611adfae97a71e5febc72ab7bf7c3 Mon Sep 17 00:00:00 2001 From: vasmith Date: Fri, 18 Jun 2021 13:49:41 -0500 Subject: [PATCH] Added option to maintain AddressResolver address order for AutoRecoveringConnection --- .../com/rabbitmq/client/ConnectionFactory.java | 17 +++++++++++++++++ .../rabbitmq/client/impl/ConnectionParams.java | 9 +++++++++ .../RecoveryAwareAMQConnectionFactory.java | 3 ++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index ef70124f55..70d96942f8 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -126,6 +126,7 @@ public class ConnectionFactory implements Cloneable { private boolean automaticRecovery = true; private boolean topologyRecovery = true; private ExecutorService topologyRecoveryExecutor; + private boolean maintainAddressOrder; // long is used to make sure the users can use both ints // and longs safely. It is unlikely that anybody'd need @@ -198,6 +199,8 @@ public class ConnectionFactory implements Cloneable { private CredentialsRefreshService credentialsRefreshService; + + /** @return the default host to use for connections */ public String getHost() { return host; @@ -1263,6 +1266,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler); result.setTrafficListener(trafficListener); result.setCredentialsRefreshService(credentialsRefreshService); + result.setMaintainAddressOrder(maintainAddressOrder); return result; } @@ -1653,4 +1657,17 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa public void setTrafficListener(TrafficListener trafficListener) { this.trafficListener = trafficListener; } + + /** + * Set to true in order to maintain the order of Addresses when setting automaticRecovery=true + * Default is false and automatic shuffling. + * @param maintainAddressOrder + */ + public void setMaintainAddressOrder(boolean maintainAddressOrder) { + this.maintainAddressOrder = maintainAddressOrder; + } + + public boolean shouldMaintainAddressOrder() { + return this.maintainAddressOrder; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java index a2b49d0f98..935b11c4e6 100644 --- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java @@ -49,6 +49,7 @@ public class ConnectionParams { private ExecutorService topologyRecoveryExecutor; private int channelRpcTimeout; private boolean channelShouldCheckRpcResponseType; + private boolean maintainAddressOrder; private ErrorOnWriteListener errorOnWriteListener; private int workPoolTimeout = -1; private TopologyRecoveryFilter topologyRecoveryFilter; @@ -287,4 +288,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe public CredentialsRefreshService getCredentialsRefreshService() { return credentialsRefreshService; } + + public boolean shouldMaintainAddressOrder() { + return maintainAddressOrder; + } + + public void setMaintainAddressOrder(boolean maintainAddressOrder) { + this.maintainAddressOrder = maintainAddressOrder; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java index ab23cc4494..b2a78fd42c 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java @@ -55,7 +55,8 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa // package protected API, made public for testing only public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException { Exception lastException = null; - List
shuffled = shuffle(addressResolver.getAddresses()); + List
originalOrder = addressResolver.getAddresses(); + List
shuffled = params.shouldMaintainAddressOrder() ? originalOrder : shuffle(originalOrder); for (Address addr : shuffled) { try {