Skip to content

Added option to maintain AddressResolver address order for AutoRecove… #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class ConnectionFactory implements Cloneable {
private boolean automaticRecovery = true;
private boolean topologyRecovery = true;
private ExecutorService topologyRecoveryExecutor;
private boolean maintainAddressOrder;

// long is used to make sure the users can use both ints
// and longs safely. It is unlikely that anybody'd need
Expand Down Expand Up @@ -198,6 +199,8 @@ public class ConnectionFactory implements Cloneable {

private CredentialsRefreshService credentialsRefreshService;



/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -1263,6 +1266,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
result.setTrafficListener(trafficListener);
result.setCredentialsRefreshService(credentialsRefreshService);
result.setMaintainAddressOrder(maintainAddressOrder);
return result;
}

Expand Down Expand Up @@ -1653,4 +1657,17 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
public void setTrafficListener(TrafficListener trafficListener) {
this.trafficListener = trafficListener;
}

/**
* Set to true in order to maintain the order of Addresses when setting automaticRecovery=true
* Default is false and automatic shuffling.
* @param maintainAddressOrder
*/
public void setMaintainAddressOrder(boolean maintainAddressOrder) {
this.maintainAddressOrder = maintainAddressOrder;
}

public boolean shouldMaintainAddressOrder() {
return this.maintainAddressOrder;
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ConnectionParams {
private ExecutorService topologyRecoveryExecutor;
private int channelRpcTimeout;
private boolean channelShouldCheckRpcResponseType;
private boolean maintainAddressOrder;
private ErrorOnWriteListener errorOnWriteListener;
private int workPoolTimeout = -1;
private TopologyRecoveryFilter topologyRecoveryFilter;
Expand Down Expand Up @@ -287,4 +288,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe
public CredentialsRefreshService getCredentialsRefreshService() {
return credentialsRefreshService;
}

public boolean shouldMaintainAddressOrder() {
return maintainAddressOrder;
}

public void setMaintainAddressOrder(boolean maintainAddressOrder) {
this.maintainAddressOrder = maintainAddressOrder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa
// package protected API, made public for testing only
public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
Exception lastException = null;
List<Address> shuffled = shuffle(addressResolver.getAddresses());
List<Address> originalOrder = addressResolver.getAddresses();
List<Address> shuffled = params.shouldMaintainAddressOrder() ? originalOrder : shuffle(originalOrder);

for (Address addr : shuffled) {
try {
Expand Down