Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/main/java/io/lettuce/core/MaintenanceEventsOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,13 @@ public enum AddressType {
/** External IP address (for public network connections) */
EXTERNAL_IP,
/** External fully qualified domain name (for public network connections with TLS) */
EXTERNAL_FQDN
EXTERNAL_FQDN,
/**
* none indicates that the MOVING message doesn’t need to contain an endpoint. In such a case, the client is expected to
* schedule a graceful reconnect to its currently configured endpoint after half of the grace period that was
* communicated by the server is over.
*/
NONE
}

private static class FixedAddressTypeSource extends MaintenanceEventsOptions.AddressTypeSource {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/lettuce/core/RedisHandshake.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ private String addressType(Channel channel, ConnectionState state, AddressTypeSo
return "external-ip";
case EXTERNAL_FQDN:
return "external-fqdn";
case NONE:
return "none";
default:
throw new IllegalArgumentException("Unknown moving endpoint address type:" + addressType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -116,38 +117,20 @@ protected Mono<SocketAddress> wrapSocketAddressSupplier(Mono<SocketAddress> sock
return rebindAwareAddressSupplier.wrappedSupplier(source);
}

/**
* Creates a wrapped socket address supplier with a custom clock for testing purposes.
*
* @param socketAddressSupplier the original socket address supplier
* @param clock the clock to use for time-based operations
* @return the wrapped supplier
*/
protected Mono<SocketAddress> wrapSocketAddressSupplier(Mono<SocketAddress> socketAddressSupplier, Clock clock) {
Mono<SocketAddress> source = super.wrapSocketAddressSupplier(socketAddressSupplier);
rebindAwareAddressSupplier = new RebindAwareAddressSupplier(clock);
return rebindAwareAddressSupplier.wrappedSupplier(source);
}

@Override
public void onPushMessage(PushMessage message) {
String mType = message.getType();

if (REBIND_MESSAGE_TYPE.equals(mType)) {
logger.debug("Rebind requested");
final MovingEvent movingEvent = MovingEvent.from(message);
if (movingEvent != null) {
logger.debug("Attempting to rebind to new endpoint '{}'", movingEvent.getEndpoint());

channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED);
rebindAwareAddressSupplier.rebind(movingEvent.getTime(), movingEvent.getEndpoint());

ChannelPipeline pipeline = channel.pipeline();
CommandHandler commandHandler = pipeline.get(CommandHandler.class);
if (commandHandler.getStack().isEmpty()) {
channel.close().awaitUninterruptibly();
channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
if (null == movingEvent.getEndpoint()) {
logger.debug("Deferred Rebind requested. Rebinding to current endpoint after '{}'", movingEvent.getTime());
channel.eventLoop().schedule(() -> rebind(movingEvent), movingEvent.getTime().toMillis() / 2,
TimeUnit.MILLISECONDS);
} else {
notifyRebindStarted(movingEvent.getTime(), movingEvent.getEndpoint());
rebind(movingEvent);
}
}
} else if (MIGRATING_MESSAGE_TYPE.equals(mType)) {
Expand All @@ -165,6 +148,21 @@ public void onPushMessage(PushMessage message) {
}
}

private void rebind(MovingEvent movingEvent) {
logger.debug("Attempting to rebind to new endpoint '{}'", movingEvent.getEndpoint());
channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED);
rebindAwareAddressSupplier.rebind(movingEvent.getTime(), movingEvent.getEndpoint());

ChannelPipeline pipeline = channel.pipeline();
CommandHandler commandHandler = pipeline.get(CommandHandler.class);
if (commandHandler.getStack().isEmpty()) {
channel.close().awaitUninterruptibly();
channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
} else {
notifyRebindStarted(movingEvent.getTime(), movingEvent.getEndpoint());
}
}

private String getMigratingShards(PushMessage message) {
List<Object> content = message.getContent();

Expand Down Expand Up @@ -260,13 +258,20 @@ static MovingEvent from(PushMessage message) {
Long timeInSec = (Long) content.get(TIME_INDEX);
ByteBuffer addressBuffer = (ByteBuffer) content.get(ADDRESS_INDEX);

String addressAndPort = StringCodec.UTF8.decodeKey((ByteBuffer) addressBuffer);
String[] parts = addressAndPort.split(":");
String address = parts[0];
int port = Integer.parseInt(parts[1]);
InetSocketAddress addr = new InetSocketAddress(address, port);
InetSocketAddress endpoint = null;
if (addressBuffer != null) {
String addressAndPort = StringCodec.UTF8.decodeKey(addressBuffer);

// Handle "none" option where endpoint is null
if (addressAndPort != null && !"null".equals(addressAndPort)) {
String[] parts = addressAndPort.split(":");
String address = parts[0];
int port = Integer.parseInt(parts[1]);
endpoint = new InetSocketAddress(address, port);
}
}

return new MovingEvent(eventId, Duration.ofSeconds(timeInSec), addr);
return new MovingEvent(eventId, Duration.ofSeconds(timeInSec), endpoint);
} catch (Exception e) {
logger.error("Invalid re-bind message format", e);
return null;
Expand Down Expand Up @@ -330,12 +335,23 @@ private void notifyFailoverCompleted(String shards) {
this.componentListeners.forEach(component -> component.onFailoverCompleted(shards));
}

/**
* A supplier that is aware of re-bind events and can provide the appropriate address based on the current state.
* <p>
* During a re-bind, the supplier will return the rebind address for a certain period of time. After that period, it will
* return the original address.
* </p>
*/
static class RebindAwareAddressSupplier {

private static final class State {

// Cutoff time for the current rebind
// If the current time is before the cutoff time, the rebind address should be returned
final Instant cutoff;

// Address to which the connection should be re-bound
// If null, the original address should be returned
final SocketAddress rebindAddress;

State(Instant cutoff, SocketAddress rebindAddress) {
Expand All @@ -357,15 +373,33 @@ public RebindAwareAddressSupplier(Clock clock) {
this.clock = clock;
}

/**
* Set a new rebind address for the specified duration.
*
* @param duration the duration for which the rebind address should be used
* @param rebindAddress the address to which the connection should be re-bound
*/
public void rebind(Duration duration, SocketAddress rebindAddress) {
Instant newCutoff = clock.instant().plus(duration);
state.set(new State(newCutoff, rebindAddress));
}

/**
* Wrap the original supplier with a rebind-aware supplier.
*
* <p>
* The returned supplier will return the rebind address if a rebind is in progress and the current time is before the
* cutoff time set by the last call to {@link #rebind(Duration, SocketAddress)}. Otherwise, it will return the original
* address.
* </p>
*
* @param original the original supplier
* @return a new supplier that is aware of re-bind events
*/
public Mono<SocketAddress> wrappedSupplier(Mono<SocketAddress> original) {
return Mono.defer(() -> {
State current = state.get();
if (current != null && clock.instant().isBefore(current.cutoff)) {
if (current != null && current.rebindAddress != null && clock.instant().isBefore(current.cutoff)) {
return Mono.just(current.rebindAddress);
} else {
state.compareAndSet(current, null);
Expand Down
Loading