diff --git a/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java b/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java index b3ee8c73b..3a9336adf 100644 --- a/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java +++ b/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java @@ -192,7 +192,13 @@ public enum AddressType { /** External IP address (for public network connections) */ EXTERNAL_IP, /** External fully qualified domain name (for public network connections with TLS) */ - EXTERNAL_FQDN + EXTERNAL_FQDN, + /** + * none indicates that the MOVING message doesn’t need to contain an endpoint. In such a case, the client is expected to + * schedule a graceful reconnect to its currently configured endpoint after half of the grace period that was + * communicated by the server is over. + */ + NONE } private static class FixedAddressTypeSource extends MaintenanceEventsOptions.AddressTypeSource { diff --git a/src/main/java/io/lettuce/core/RedisHandshake.java b/src/main/java/io/lettuce/core/RedisHandshake.java index 65247f279..6ef1f9197 100644 --- a/src/main/java/io/lettuce/core/RedisHandshake.java +++ b/src/main/java/io/lettuce/core/RedisHandshake.java @@ -308,6 +308,8 @@ private String addressType(Channel channel, ConnectionState state, AddressTypeSo return "external-ip"; case EXTERNAL_FQDN: return "external-fqdn"; + case NONE: + return "none"; default: throw new IllegalArgumentException("Unknown moving endpoint address type:" + addressType); } diff --git a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java index a97c93e02..fe15bedb6 100644 --- a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -116,38 +117,20 @@ protected Mono wrapSocketAddressSupplier(Mono sock return rebindAwareAddressSupplier.wrappedSupplier(source); } - /** - * Creates a wrapped socket address supplier with a custom clock for testing purposes. - * - * @param socketAddressSupplier the original socket address supplier - * @param clock the clock to use for time-based operations - * @return the wrapped supplier - */ - protected Mono wrapSocketAddressSupplier(Mono socketAddressSupplier, Clock clock) { - Mono source = super.wrapSocketAddressSupplier(socketAddressSupplier); - rebindAwareAddressSupplier = new RebindAwareAddressSupplier(clock); - return rebindAwareAddressSupplier.wrappedSupplier(source); - } - @Override public void onPushMessage(PushMessage message) { String mType = message.getType(); if (REBIND_MESSAGE_TYPE.equals(mType)) { + logger.debug("Rebind requested"); final MovingEvent movingEvent = MovingEvent.from(message); if (movingEvent != null) { - logger.debug("Attempting to rebind to new endpoint '{}'", movingEvent.getEndpoint()); - - channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED); - rebindAwareAddressSupplier.rebind(movingEvent.getTime(), movingEvent.getEndpoint()); - - ChannelPipeline pipeline = channel.pipeline(); - CommandHandler commandHandler = pipeline.get(CommandHandler.class); - if (commandHandler.getStack().isEmpty()) { - channel.close().awaitUninterruptibly(); - channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED); + if (null == movingEvent.getEndpoint()) { + logger.debug("Deferred Rebind requested. Rebinding to current endpoint after '{}'", movingEvent.getTime()); + channel.eventLoop().schedule(() -> rebind(movingEvent), movingEvent.getTime().toMillis() / 2, + TimeUnit.MILLISECONDS); } else { - notifyRebindStarted(movingEvent.getTime(), movingEvent.getEndpoint()); + rebind(movingEvent); } } } else if (MIGRATING_MESSAGE_TYPE.equals(mType)) { @@ -165,6 +148,21 @@ public void onPushMessage(PushMessage message) { } } + private void rebind(MovingEvent movingEvent) { + logger.debug("Attempting to rebind to new endpoint '{}'", movingEvent.getEndpoint()); + channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED); + rebindAwareAddressSupplier.rebind(movingEvent.getTime(), movingEvent.getEndpoint()); + + ChannelPipeline pipeline = channel.pipeline(); + CommandHandler commandHandler = pipeline.get(CommandHandler.class); + if (commandHandler.getStack().isEmpty()) { + channel.close().awaitUninterruptibly(); + channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED); + } else { + notifyRebindStarted(movingEvent.getTime(), movingEvent.getEndpoint()); + } + } + private String getMigratingShards(PushMessage message) { List content = message.getContent(); @@ -260,13 +258,20 @@ static MovingEvent from(PushMessage message) { Long timeInSec = (Long) content.get(TIME_INDEX); ByteBuffer addressBuffer = (ByteBuffer) content.get(ADDRESS_INDEX); - String addressAndPort = StringCodec.UTF8.decodeKey((ByteBuffer) addressBuffer); - String[] parts = addressAndPort.split(":"); - String address = parts[0]; - int port = Integer.parseInt(parts[1]); - InetSocketAddress addr = new InetSocketAddress(address, port); + InetSocketAddress endpoint = null; + if (addressBuffer != null) { + String addressAndPort = StringCodec.UTF8.decodeKey(addressBuffer); + + // Handle "none" option where endpoint is null + if (addressAndPort != null && !"null".equals(addressAndPort)) { + String[] parts = addressAndPort.split(":"); + String address = parts[0]; + int port = Integer.parseInt(parts[1]); + endpoint = new InetSocketAddress(address, port); + } + } - return new MovingEvent(eventId, Duration.ofSeconds(timeInSec), addr); + return new MovingEvent(eventId, Duration.ofSeconds(timeInSec), endpoint); } catch (Exception e) { logger.error("Invalid re-bind message format", e); return null; @@ -330,12 +335,23 @@ private void notifyFailoverCompleted(String shards) { this.componentListeners.forEach(component -> component.onFailoverCompleted(shards)); } + /** + * A supplier that is aware of re-bind events and can provide the appropriate address based on the current state. + *

+ * During a re-bind, the supplier will return the rebind address for a certain period of time. After that period, it will + * return the original address. + *

+ */ static class RebindAwareAddressSupplier { private static final class State { + // Cutoff time for the current rebind + // If the current time is before the cutoff time, the rebind address should be returned final Instant cutoff; + // Address to which the connection should be re-bound + // If null, the original address should be returned final SocketAddress rebindAddress; State(Instant cutoff, SocketAddress rebindAddress) { @@ -357,15 +373,33 @@ public RebindAwareAddressSupplier(Clock clock) { this.clock = clock; } + /** + * Set a new rebind address for the specified duration. + * + * @param duration the duration for which the rebind address should be used + * @param rebindAddress the address to which the connection should be re-bound + */ public void rebind(Duration duration, SocketAddress rebindAddress) { Instant newCutoff = clock.instant().plus(duration); state.set(new State(newCutoff, rebindAddress)); } + /** + * Wrap the original supplier with a rebind-aware supplier. + * + *

+ * The returned supplier will return the rebind address if a rebind is in progress and the current time is before the + * cutoff time set by the last call to {@link #rebind(Duration, SocketAddress)}. Otherwise, it will return the original + * address. + *

+ * + * @param original the original supplier + * @return a new supplier that is aware of re-bind events + */ public Mono wrappedSupplier(Mono original) { return Mono.defer(() -> { State current = state.get(); - if (current != null && clock.instant().isBefore(current.cutoff)) { + if (current != null && current.rebindAddress != null && clock.instant().isBefore(current.cutoff)) { return Mono.just(current.rebindAddress); } else { state.compareAndSet(current, null); diff --git a/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java b/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java index c08934d31..26c116531 100644 --- a/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java +++ b/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java @@ -27,10 +27,13 @@ import io.netty.util.Attribute; import io.netty.util.Timer; import io.netty.util.concurrent.EventExecutorGroup; + +import io.netty.util.concurrent.ScheduledFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -50,7 +53,7 @@ import java.util.Collections; import java.util.List; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; /** * Unit tests for {@link MaintenanceAwareConnectionWatchdog}. @@ -247,71 +250,6 @@ void testOnPushMessageMovingWithEmptyStack() { verify(component1, never()).onRebindStarted(any(), any()); // Not called when stack is empty } - /** - * MOVING