Skip to content

Commit ae0b448

Browse files
author
vasmith
committed
Added option to maintain AddressResolver address order for AutoRecoveringConnection
1 parent 39fe122 commit ae0b448

File tree

3 files changed

+28
-1
lines changed

3 files changed

+28
-1
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public class ConnectionFactory implements Cloneable {
126126
private boolean automaticRecovery = true;
127127
private boolean topologyRecovery = true;
128128
private ExecutorService topologyRecoveryExecutor;
129+
private boolean maintainAddressOrder;
129130

130131
// long is used to make sure the users can use both ints
131132
// and longs safely. It is unlikely that anybody'd need
@@ -198,6 +199,8 @@ public class ConnectionFactory implements Cloneable {
198199

199200
private CredentialsRefreshService credentialsRefreshService;
200201

202+
203+
201204
/** @return the default host to use for connections */
202205
public String getHost() {
203206
return host;
@@ -1263,6 +1266,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
12631266
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
12641267
result.setTrafficListener(trafficListener);
12651268
result.setCredentialsRefreshService(credentialsRefreshService);
1269+
result.setMaintainAddressOrder(maintainAddressOrder);
12661270
return result;
12671271
}
12681272

@@ -1653,4 +1657,17 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
16531657
public void setTrafficListener(TrafficListener trafficListener) {
16541658
this.trafficListener = trafficListener;
16551659
}
1660+
1661+
/**
1662+
* Set to true in order to maintain the order of Addresses when setting automaticRecovery=true
1663+
* Default is false and automatic shuffling.
1664+
* @param maintainAddressOrder
1665+
*/
1666+
public void setMaintainAddressOrder(boolean maintainAddressOrder) {
1667+
this.maintainAddressOrder = maintainAddressOrder;
1668+
}
1669+
1670+
public boolean shouldMaintainAddressOrder() {
1671+
return this.maintainAddressOrder;
1672+
}
16561673
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class ConnectionParams {
4949
private ExecutorService topologyRecoveryExecutor;
5050
private int channelRpcTimeout;
5151
private boolean channelShouldCheckRpcResponseType;
52+
private boolean maintainAddressOrder;
5253
private ErrorOnWriteListener errorOnWriteListener;
5354
private int workPoolTimeout = -1;
5455
private TopologyRecoveryFilter topologyRecoveryFilter;
@@ -287,4 +288,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe
287288
public CredentialsRefreshService getCredentialsRefreshService() {
288289
return credentialsRefreshService;
289290
}
291+
292+
public boolean shouldMaintainAddressOrder() {
293+
return maintainAddressOrder;
294+
}
295+
296+
public void setMaintainAddressOrder(boolean maintainAddressOrder) {
297+
this.maintainAddressOrder = maintainAddressOrder;
298+
}
290299
}

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa
5555
// package protected API, made public for testing only
5656
public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
5757
Exception lastException = null;
58-
List<Address> shuffled = shuffle(addressResolver.getAddresses());
58+
List<Address> originalOrder = addressResolver.getAddresses();
59+
List<Address> shuffled = params.shouldMaintainAddressOrder() ? originalOrder : shuffle(originalOrder);
5960

6061
for (Address addr : shuffled) {
6162
try {

0 commit comments

Comments
 (0)