From e11b9867260fbff3e7eb93ffed571636f1c58af7 Mon Sep 17 00:00:00 2001
From: ggivo
Date: Fri, 1 Aug 2025 11:49:44 +0300
Subject: [PATCH 01/16] Support for Client-side opt-in
A client can tell the server if it wants to receive maintenance push notifications via the following command:
CLIENT MAINT_NOTIFICATIONS [parameter value parameter value ...]
---
.../io/lettuce/core/AbstractRedisClient.java | 10 +-
.../java/io/lettuce/core/ClientOptions.java | 32 +--
.../io/lettuce/core/ConnectionBuilder.java | 2 +-
.../io/lettuce/core/ConnectionMetadata.java | 12 ++
.../core/MaintenanceEventsOptions.java | 191 ++++++++++++++++++
.../java/io/lettuce/core/RedisHandshake.java | 47 ++++-
.../java/io/lettuce/core/TimeoutOptions.java | 4 +-
.../core/protocol/CommandExpiryWriter.java | 2 +-
.../lettuce/core/protocol/CommandKeyword.java | 2 +-
.../protocol/MaintenanceAwareComponent.java | 2 +-
.../MaintenanceAwareConnectionWatchdog.java | 94 +++++----
.../MaintenanceAwareExpiryWriter.java | 4 +-
.../io/lettuce/core/protocol/RebindState.java | 2 +-
.../LettuceMaintenanceEventsDemo.java | 3 +-
.../lettuce/core/RedisHandshakeUnitTests.java | 12 +-
...nanceAwareConnectionWatchdogUnitTests.java | 118 ++++++++---
...MaintenanceAwareExpiryWriterUnitTests.java | 14 +-
17 files changed, 444 insertions(+), 107 deletions(-)
create mode 100644 src/main/java/io/lettuce/core/MaintenanceEventsOptions.java
diff --git a/src/main/java/io/lettuce/core/AbstractRedisClient.java b/src/main/java/io/lettuce/core/AbstractRedisClient.java
index fcb65f2914..b2a01c632c 100644
--- a/src/main/java/io/lettuce/core/AbstractRedisClient.java
+++ b/src/main/java/io/lettuce/core/AbstractRedisClient.java
@@ -20,6 +20,8 @@
package io.lettuce.core;
import java.io.Closeable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
@@ -34,6 +36,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import io.lettuce.core.MaintenanceEventsOptions.AddressTypeSource;
import reactor.core.publisher.Mono;
import io.lettuce.core.event.command.CommandListener;
import io.lettuce.core.event.connection.ConnectEvent;
@@ -629,8 +632,13 @@ private CompletableFuture closeClientResources(long quietPeriod, long time
}
protected RedisHandshake createHandshake(ConnectionState state) {
+ AddressTypeSource source = null;
+ if (clientOptions.getMaintenanceEventsOptions().supportsMaintenanceEvents()) {
+ source = clientOptions.getMaintenanceEventsOptions().getAddressTypeSource();
+ }
+
return new RedisHandshake(clientOptions.getConfiguredProtocolVersion(), clientOptions.isPingBeforeActivateConnection(),
- state);
+ state, source);
}
}
diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java
index 4fb91c9506..31ef2fa059 100644
--- a/src/main/java/io/lettuce/core/ClientOptions.java
+++ b/src/main/java/io/lettuce/core/ClientOptions.java
@@ -52,7 +52,7 @@ public class ClientOptions implements Serializable {
public static final boolean DEFAULT_AUTO_RECONNECT = true;
- public static final boolean DEFAULT_SUPPORT_MAINTENANCE_EVENTS = false;
+ public static final MaintenanceEventsOptions DEFAULT_MAINTENANCE_EVENTS_OPTIONS = MaintenanceEventsOptions.disabled();
public static final Predicate> DEFAULT_REPLAY_FILTER = (cmd) -> false;
@@ -98,7 +98,7 @@ public class ClientOptions implements Serializable {
private final boolean autoReconnect;
- private final boolean supportMaintenanceEvents;
+ private final MaintenanceEventsOptions maintenanceEventsOptions;
private final Predicate> replayFilter;
@@ -136,7 +136,7 @@ public class ClientOptions implements Serializable {
protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
- this.supportMaintenanceEvents = builder.supportMaintenanceEvents;
+ this.maintenanceEventsOptions = builder.maintenanceEventsOptions;
this.replayFilter = builder.replayFilter;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
@@ -158,7 +158,7 @@ protected ClientOptions(Builder builder) {
protected ClientOptions(ClientOptions original) {
this.autoReconnect = original.isAutoReconnect();
- this.supportMaintenanceEvents = original.supportsMaintenanceEvents();
+ this.maintenanceEventsOptions = original.getMaintenanceEventsOptions();
this.replayFilter = original.getReplayFilter();
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
@@ -213,7 +213,7 @@ public static class Builder {
private boolean autoReconnect = DEFAULT_AUTO_RECONNECT;
- private boolean supportMaintenanceEvents = DEFAULT_SUPPORT_MAINTENANCE_EVENTS;
+ private MaintenanceEventsOptions maintenanceEventsOptions = DEFAULT_MAINTENANCE_EVENTS_OPTIONS;
private Predicate> replayFilter = DEFAULT_REPLAY_FILTER;
@@ -268,14 +268,14 @@ public Builder autoReconnect(boolean autoReconnect) {
* Configure whether the driver should listen for server events that notify on current maintenance activities. When
* enabled, this option will help with the connection handover and reduce the number of failed commands. This feature
* requires the server to support maintenance events. Defaults to {@code false}. See
- * {@link #DEFAULT_SUPPORT_MAINTENANCE_EVENTS}.
+ * {@link #DEFAULT_MAINTENANCE_EVENTS_OPTIONS}.
*
- * @param supportEvents true/false
+ * @param maintenanceEventsOptions true/false
* @return {@code this}
* @since 7.0
*/
- public Builder supportMaintenanceEvents(boolean supportEvents) {
- this.supportMaintenanceEvents = supportEvents;
+ public Builder supportMaintenanceEvents(MaintenanceEventsOptions maintenanceEventsOptions) {
+ this.maintenanceEventsOptions = maintenanceEventsOptions;
return this;
}
@@ -574,7 +574,7 @@ public ClientOptions build() {
public ClientOptions.Builder mutate() {
Builder builder = new Builder();
- builder.autoReconnect(isAutoReconnect()).supportMaintenanceEvents(supportsMaintenanceEvents())
+ builder.autoReconnect(isAutoReconnect()).supportMaintenanceEvents(getMaintenanceEventsOptions())
.cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()).replayFilter(getReplayFilter())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
@@ -601,15 +601,15 @@ public boolean isAutoReconnect() {
}
/**
- * Returns whether the client supports maintenance events.
+ * Returns the {@link MaintenanceEventsOptions} to listen for server events that notify on current maintenance activities.
*
- * @return {@code true} if maintenance events are supported.
+ * @return {@link MaintenanceEventsOptions}
* @since 7.0
- * @see #DEFAULT_SUPPORT_MAINTENANCE_EVENTS
- * @see #supportsMaintenanceEvents()
+ * @see #DEFAULT_MAINTENANCE_EVENTS_OPTIONS
+ * @see #getMaintenanceEventsOptions()
*/
- public boolean supportsMaintenanceEvents() {
- return supportMaintenanceEvents;
+ public MaintenanceEventsOptions getMaintenanceEventsOptions() {
+ return maintenanceEventsOptions;
}
/**
diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java
index b86b276ea8..bb5732fa7e 100644
--- a/src/main/java/io/lettuce/core/ConnectionBuilder.java
+++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java
@@ -155,7 +155,7 @@ protected ConnectionWatchdog createConnectionWatchdog() {
LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true");
ConnectionWatchdog watchdog;
- if (clientOptions.supportsMaintenanceEvents()) {
+ if (clientOptions.getMaintenanceEventsOptions().supportsMaintenanceEvents()) {
watchdog = new MaintenanceAwareConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint);
diff --git a/src/main/java/io/lettuce/core/ConnectionMetadata.java b/src/main/java/io/lettuce/core/ConnectionMetadata.java
index d74c839b25..db9b87b721 100644
--- a/src/main/java/io/lettuce/core/ConnectionMetadata.java
+++ b/src/main/java/io/lettuce/core/ConnectionMetadata.java
@@ -11,6 +11,8 @@ class ConnectionMetadata {
private volatile String libraryVersion;
+ private volatile boolean sslEnabled;
+
public ConnectionMetadata() {
}
@@ -23,6 +25,7 @@ public void apply(RedisURI redisURI) {
setClientName(redisURI.getClientName());
setLibraryName(redisURI.getLibraryName());
setLibraryVersion(redisURI.getLibraryVersion());
+ setSslEnabled(redisURI.isSsl());
}
public void apply(ConnectionMetadata metadata) {
@@ -30,6 +33,7 @@ public void apply(ConnectionMetadata metadata) {
setClientName(metadata.getClientName());
setLibraryName(metadata.getLibraryName());
setLibraryVersion(metadata.getLibraryVersion());
+ setSslEnabled(metadata.isSslEnabled());
}
protected void setClientName(String clientName) {
@@ -56,4 +60,12 @@ String getLibraryVersion() {
return libraryVersion;
}
+ boolean isSslEnabled() {
+ return sslEnabled;
+ }
+
+ void setSslEnabled(boolean sslEnabled) {
+ this.sslEnabled = sslEnabled;
+ }
+
}
diff --git a/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java b/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java
new file mode 100644
index 0000000000..4e68e790d2
--- /dev/null
+++ b/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2011-Present, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lettuce.core;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class MaintenanceEventsOptions {
+
+ public static final boolean DEFAULT_SUPPORT_MAINTENANCE_EVENTS = false;
+
+ private final boolean supportMaintenanceEvents;
+
+ private final AddressTypeSource addressTypeSource;
+
+ protected MaintenanceEventsOptions(MaintenanceEventsOptions.Builder builder) {
+ this.addressTypeSource = builder.addressTypeSource;
+ this.supportMaintenanceEvents = builder.supportMaintenanceEvents;
+ }
+
+ public static MaintenanceEventsOptions.Builder builder() {
+ return new MaintenanceEventsOptions.Builder();
+ }
+
+ public static MaintenanceEventsOptions create() {
+ return builder().build();
+ }
+
+ public static MaintenanceEventsOptions disabled() {
+ return builder().supportMaintenanceEvents(false).build();
+ }
+
+ public static MaintenanceEventsOptions enabled() {
+ return builder().supportMaintenanceEvents().autoResolveAddressType().build();
+ }
+
+ public static MaintenanceEventsOptions enabled(AddressType addressType) {
+ return builder().supportMaintenanceEvents().fixedAddressType(addressType).build();
+ }
+
+ public boolean supportsMaintenanceEvents() {
+ return supportMaintenanceEvents;
+ }
+
+ /**
+ * @return the address type source to determine the requested address type when maintenance events are enabled . Can be
+ * {@code null} if {@link #supportsMaintenanceEvents()} is {@code false}.
+ */
+ public AddressTypeSource getAddressTypeSource() {
+ return addressTypeSource;
+ }
+
+ public static class Builder {
+
+ private boolean supportMaintenanceEvents = DEFAULT_SUPPORT_MAINTENANCE_EVENTS;
+
+ private AddressTypeSource addressTypeSource;
+
+ public MaintenanceEventsOptions.Builder supportMaintenanceEvents() {
+ return supportMaintenanceEvents(true);
+ }
+
+ public MaintenanceEventsOptions.Builder supportMaintenanceEvents(boolean supportMaintenanceEvents) {
+ this.supportMaintenanceEvents = supportMaintenanceEvents;
+ return this;
+ }
+
+ public Builder fixedAddressType(AddressType addressType) {
+ this.addressTypeSource = new FixedAddressTypeSource(addressType);
+ return this;
+ }
+
+ public Builder autoResolveAddressType() {
+ this.addressTypeSource = new AutoresolveAddressTypeSource();
+ return this;
+ }
+
+ public MaintenanceEventsOptions build() {
+ return new MaintenanceEventsOptions(this);
+ }
+
+ }
+
+ public enum AddressType {
+ INTERNAL_IP, INTERNAL_FQDN, PUBLIC_IP, PUBLIC_FQDN
+ }
+
+ private static class FixedAddressTypeSource extends MaintenanceEventsOptions.AddressTypeSource {
+
+ private final AddressType addressType;
+
+ FixedAddressTypeSource(AddressType addressType) {
+
+ this.addressType = addressType;
+ }
+
+ @Override
+ public AddressType getAddressType(SocketAddress socketAddress, boolean sslEnabled) {
+ return addressType;
+ }
+
+ }
+
+ private static class AutoresolveAddressTypeSource extends MaintenanceEventsOptions.AddressTypeSource {
+
+ AutoresolveAddressTypeSource() {
+ }
+
+ @Override
+ public MaintenanceEventsOptions.AddressType getAddressType(SocketAddress socketAddress, boolean sslEnabled) {
+ if (isReservedIp(socketAddress)) {
+ // use private
+ if (sslEnabled) {
+ return MaintenanceEventsOptions.AddressType.INTERNAL_FQDN;
+ } else {
+ return MaintenanceEventsOptions.AddressType.INTERNAL_IP;
+ }
+ } else {
+ // use public
+ if (sslEnabled) {
+ return MaintenanceEventsOptions.AddressType.PUBLIC_FQDN;
+ } else {
+ return MaintenanceEventsOptions.AddressType.PUBLIC_IP;
+ }
+ }
+ }
+
+ public static boolean isReservedIp(SocketAddress socketAddress) {
+ if (!(socketAddress instanceof InetSocketAddress)) {
+ return false;
+ }
+
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+ InetAddress address = inetSocketAddress.getAddress();
+
+ if (address == null || address.isAnyLocalAddress() || address.isLoopbackAddress()) {
+ return false;
+ }
+
+ byte[] bytes = address.getAddress();
+
+ // IPv4 only
+ if (bytes.length != 4) {
+ return false;
+ }
+
+ int firstByte = bytes[0] & 0xFF;
+ int secondByte = bytes[1] & 0xFF;
+
+ // 10.0.0.0/8
+ if (firstByte == 10)
+ return true;
+
+ // 172.16.0.0/12
+ if (firstByte == 172 && (secondByte >= 16 && secondByte <= 31))
+ return true;
+
+ // 192.168.0.0/16
+ if (firstByte == 192 && secondByte == 168)
+ return true;
+
+ return false;
+ }
+
+ }
+
+ public static abstract class AddressTypeSource {
+
+ public abstract AddressType getAddressType(SocketAddress socketAddress, boolean sslEnabled);
+
+ }
+
+}
diff --git a/src/main/java/io/lettuce/core/RedisHandshake.java b/src/main/java/io/lettuce/core/RedisHandshake.java
index bf5ad7e4b5..e07d69d2b5 100644
--- a/src/main/java/io/lettuce/core/RedisHandshake.java
+++ b/src/main/java/io/lettuce/core/RedisHandshake.java
@@ -29,18 +29,24 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import io.lettuce.core.MaintenanceEventsOptions.AddressTypeSource;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceStrings;
+import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.Command;
+import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.ConnectionInitializer;
import io.lettuce.core.protocol.ProtocolVersion;
import io.netty.channel.Channel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
+import static io.lettuce.core.protocol.CommandKeyword.MAINT_NOTIFICATIONS;
+import static io.lettuce.core.protocol.CommandType.CLIENT;
+
/**
* Redis RESP2/RESP3 handshake using the configured {@link ProtocolVersion} and other options for connection initialization and
* connection state restoration. This class is part of the internal API.
@@ -63,8 +69,12 @@ class RedisHandshake implements ConnectionInitializer {
private volatile ProtocolVersion negotiatedProtocolVersion;
- RedisHandshake(ProtocolVersion requestedProtocolVersion, boolean pingOnConnect, ConnectionState connectionState) {
+ private final MaintenanceEventsOptions.AddressTypeSource addressTypeSource;
+
+ RedisHandshake(ProtocolVersion requestedProtocolVersion, boolean pingOnConnect, ConnectionState connectionState,
+ MaintenanceEventsOptions.AddressTypeSource addressTypeSource) {
+ this.addressTypeSource = addressTypeSource;
this.requestedProtocolVersion = requestedProtocolVersion;
this.pingOnConnect = pingOnConnect;
this.connectionState = connectionState;
@@ -261,6 +271,19 @@ private CompletableFuture applyPostHandshake(Channel channel) {
postHandshake.add(new AsyncCommand<>(this.commandBuilder.readOnly()));
}
+ if (addressTypeSource != null) {
+ CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(MAINT_NOTIFICATIONS).add("on");
+ String addressType = addressType(channel, connectionState, addressTypeSource);
+
+ if (addressType != null) {
+ args.add("moving-endpoint-type").add(addressType);
+ }
+
+ Command maintNotificationsOn = new Command<>(CLIENT, new StatusOutput<>(StringCodec.UTF8),
+ args);
+ postHandshake.add(new AsyncCommand<>(maintNotificationsOn));
+ }
+
if (postHandshake.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
@@ -268,6 +291,28 @@ private CompletableFuture applyPostHandshake(Channel channel) {
return dispatch(channel, postHandshake);
}
+ private String addressType(Channel channel, ConnectionState state, AddressTypeSource addressTypeSource) {
+ MaintenanceEventsOptions.AddressType addressType = addressTypeSource.getAddressType(channel.remoteAddress(),
+ state.getConnectionMetadata().isSslEnabled());
+
+ if (addressType == null) {
+ return null;
+ }
+
+ switch (addressType) {
+ case INTERNAL_IP:
+ return "internal-ip";
+ case INTERNAL_FQDN:
+ return "internal-fqdn";
+ case PUBLIC_IP:
+ return "external-ip";
+ case PUBLIC_FQDN:
+ return "external-fqdn";
+ default:
+ throw new IllegalArgumentException("Unknown moving endpoint address type:" + addressType);
+ }
+ }
+
private CompletionStage applyConnectionMetadataSafely(Channel channel) {
return applyConnectionMetadata(channel).handle((result, error) -> {
if (error != null) {
diff --git a/src/main/java/io/lettuce/core/TimeoutOptions.java b/src/main/java/io/lettuce/core/TimeoutOptions.java
index 434db7cc9e..ac96cbace3 100644
--- a/src/main/java/io/lettuce/core/TimeoutOptions.java
+++ b/src/main/java/io/lettuce/core/TimeoutOptions.java
@@ -121,7 +121,7 @@ public Builder timeoutCommands(boolean enabled) {
* Enable timeout relaxing during maintenance events. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}.
*
* If the Redis server supports sending maintenance events, and the client is set up to use that by the
- * {@link ClientOptions#supportsMaintenanceEvents()} option, the client would listen to notifications that the current
+ * {@link ClientOptions#getMaintenanceEventsOptions()} option, the client would listen to notifications that the current
* endpoint is about to go down (as part of some maintenance activity, for example). In such cases, the driver could
* extend the existing timeout settings for newly issued commands, or such that are in flight, to make sure they do not
* time out during this process. These commands could be either a part of the offline buffer or waiting for a reply.
@@ -129,7 +129,7 @@ public Builder timeoutCommands(boolean enabled) {
* @param duration {@link Duration} to relax timeouts proactively, must not be {@code null}.
* @return {@code this}
* @since 7.0
- * @see ClientOptions#supportsMaintenanceEvents()
+ * @see ClientOptions#getMaintenanceEventsOptions()
*/
public Builder timeoutsRelaxingDuringMaintenance(Duration duration) {
LettuceAssert.notNull(duration, "Duration must not be null");
diff --git a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
index b637ada08c..c57ef515cb 100644
--- a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
+++ b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
@@ -95,7 +95,7 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti
*/
public static RedisChannelWriter buildCommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOptions,
ClientResources clientResources) {
- if (clientOptions.supportsMaintenanceEvents()) {
+ if (clientOptions.getMaintenanceEventsOptions().supportsMaintenanceEvents()) {
return new MaintenanceAwareExpiryWriter(delegate, clientOptions, clientResources);
} else {
return new CommandExpiryWriter(delegate, clientOptions, clientResources);
diff --git a/src/main/java/io/lettuce/core/protocol/CommandKeyword.java b/src/main/java/io/lettuce/core/protocol/CommandKeyword.java
index 628365ca4e..fb1cde32c6 100644
--- a/src/main/java/io/lettuce/core/protocol/CommandKeyword.java
+++ b/src/main/java/io/lettuce/core/protocol/CommandKeyword.java
@@ -47,7 +47,7 @@ public enum CommandKeyword implements ProtocolKeyword {
RESETSTAT, RESTART, RETRYCOUNT, REWRITE, RIGHT, SAVECONFIG, SDSLEN, SETINFO, SETNAME, SETSLOT, SHARDS, SLOTS, STABLE,
- MIGRATING, IMPORTING, SAVE, SKIPME, SLAVES, STREAM, STORE, SUM, SEGFAULT, SETUSER, TAKEOVER, TRACKING, TRACKINGINFO, TYPE, UNBLOCK, USERS, USAGE, WEIGHTS, WHOAMI,
+ MIGRATING, IMPORTING, SAVE, SKIPME, SLAVES, STREAM, STORE, SUM, SEGFAULT, SETUSER, TAKEOVER, TRACKING, MAINT_NOTIFICATIONS, TRACKINGINFO, TYPE, UNBLOCK, USERS, USAGE, WEIGHTS, WHOAMI,
WITHMATCHLEN, WITHSCORE, WITHSCORES, WITHVALUES, XOR, XX, FXX, YES, INDENT, NEWLINE, SPACE, GT, LT,
diff --git a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareComponent.java b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareComponent.java
index b405d23e2d..2a24ab12dd 100644
--- a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareComponent.java
+++ b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareComponent.java
@@ -13,7 +13,7 @@
*
* @author Tihomir Mateev
* @since 7.0
- * @see ClientOptions#supportsMaintenanceEvents()
+ * @see ClientOptions#getMaintenanceEventsOptions()
*/
public interface MaintenanceAwareComponent {
diff --git a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java
index b81cff6338..8da1458d6e 100644
--- a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java
+++ b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java
@@ -37,7 +37,7 @@
*
* @author Tihomir Mateev
* @since 7.0
- * @see ClientOptions#supportsMaintenanceEvents()
+ * @see ClientOptions#getMaintenanceEventsOptions()
*/
@ChannelHandler.Sharable
public class MaintenanceAwareConnectionWatchdog extends ConnectionWatchdog implements PushListener {
@@ -54,13 +54,17 @@ public class MaintenanceAwareConnectionWatchdog extends ConnectionWatchdog imple
private static final String FAILED_OVER_MESSAGE_TYPE = "FAILED_OVER";
- private static final int REBIND_ADDRESS_INDEX = 2;
+ private static final int REBIND_ADDRESS_INDEX = 3;
public static final AttributeKey REBIND_ATTRIBUTE = AttributeKey.newInstance("rebindAddress");
- private static final int FAILING_OVER_SHARDS_INDEX = 2;
+ private static final int MIGRATING_SHARDS_INDEX = 3;
- private static final int FAILED_OVER_SHARDS_INDEX = 1;
+ private static final int MIGRATED_SHARDS_INDEX = 2;
+
+ private static final int FAILING_OVER_SHARDS_INDEX = 3;
+
+ private static final int FAILED_OVER_SHARDS_INDEX = 2;
private Channel channel;
@@ -124,10 +128,10 @@ public void onPushMessage(PushMessage message) {
}
} else if (MIGRATING_MESSAGE_TYPE.equals(mType)) {
logger.debug("Shard migration started");
- notifyMigrateStarted();
+ notifyMigrateStarted(getMigratingShards(message));
} else if (MIGRATED_MESSAGE_TYPE.equals(mType)) {
logger.debug("Shard migration completed");
- notifyMigrateCompleted();
+ notifyMigrateCompleted(getMigratedShards(message));
} else if (FAILING_OVER_MESSAGE_TYPE.equals(mType)) {
logger.debug("Failover started");
notifyFailoverStarted(getFailingOverShards(message));
@@ -137,55 +141,71 @@ public void onPushMessage(PushMessage message) {
}
}
- private String getFailingOverShards(PushMessage message) {
- List
+ *
+ * The options provided by this class are used by Lettuce to determine whether to listen for and
+ * process maintenance events, and how to resolve address types when such events occur.
+ *
+ */
public class MaintenanceEventsOptions {
public static final boolean DEFAULT_SUPPORT_MAINTENANCE_EVENTS = false;
diff --git a/src/main/java/io/lettuce/core/internal/NetUtils.java b/src/main/java/io/lettuce/core/internal/NetUtils.java
index 226cd48f1a..1fab8580dc 100644
--- a/src/main/java/io/lettuce/core/internal/NetUtils.java
+++ b/src/main/java/io/lettuce/core/internal/NetUtils.java
@@ -1,3 +1,22 @@
+/*
+ * Copyright 2011-Present, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package io.lettuce.core.internal;
import java.net.Inet6Address;
diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceMaintenanceEventsDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceMaintenanceEventsDemo.java
index 16615fdc50..f0ca0979cd 100644
--- a/src/test/java/biz/paluch/redis/extensibility/LettuceMaintenanceEventsDemo.java
+++ b/src/test/java/biz/paluch/redis/extensibility/LettuceMaintenanceEventsDemo.java
@@ -33,7 +33,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
// (optional) relax timeouts during re-bind to decrease risk of timeouts
.timeoutsRelaxingDuringMaintenance(Duration.ofMillis(750)).build();
// (required) enable proactive re-bind by enabling it in the ClientOptions
- ClientOptions options = ClientOptions.builder().timeoutOptions(timeoutOpts).supportMaintenanceEvents(null).build();
+ ClientOptions options = ClientOptions.builder().timeoutOptions(timeoutOpts).supportMaintenanceEvents(MaintenanceEventsOptions.enabled()).build();
RedisClient redisClient = RedisClient.create(RedisURI.create(ADDRESS == null ? "redis://localhost:6379" : ADDRESS));
redisClient.setOptions(options);
From 1d788302bf3a8eb675b5970b3204bfb073f43a18 Mon Sep 17 00:00:00 2001
From: ggivo
Date: Tue, 5 Aug 2025 11:06:30 +0300
Subject: [PATCH 08/16] format
---
.../core/MaintenanceEventsOptions.java | 19 ++++++++++---------
.../LettuceMaintenanceEventsDemo.java | 3 ++-
2 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java b/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java
index c5cf9907f9..a730f4d292 100644
--- a/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java
+++ b/src/main/java/io/lettuce/core/MaintenanceEventsOptions.java
@@ -26,24 +26,25 @@
/**
* Configuration options for enabling and customizing support for maintenance events in Lettuce.
*
- * Maintenance events are notifications or signals that indicate changes in the underlying infrastructure,
- * such as failovers, topology changes, or other events that may require special handling by the client.
- * This class allows users to enable or disable maintenance event support, and to configure how
- * address types are resolved for maintenance event handling.
+ * Maintenance events are notifications or signals that indicate changes in the underlying infrastructure, such as failovers,
+ * topology changes, or other events that may require special handling by the client. This class allows users to enable or
+ * disable maintenance event support, and to configure how address types are resolved for maintenance event handling.
*
*
- * Usage patterns typically involve using the static factory methods such as {@link #enabled()},
- * {@link #disabled()}, or {@link #builder()} to create and configure an instance.
+ * Usage patterns typically involve using the static factory methods such as {@link #enabled()}, {@link #disabled()}, or
+ * {@link #builder()} to create and configure an instance.
*
- * The options provided by this class are used by Lettuce to determine whether to listen for and
- * process maintenance events, and how to resolve address types when such events occur.
+ * The options provided by this class are used by Lettuce to determine whether to listen for and process maintenance events, and
+ * how to resolve address types when such events occur.
*
+ * A specific endpoint is going to move to another node within
+ *
+ * @param endpoint address of the target endpoint
+ * @param time estimated time for the re-bind to complete
*/
- void onRebindStarted();
+ void onRebindStarted(Duration time, SocketAddress endpoint);
/**
* Called whenever the re-bind has been completed
diff --git a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java
index 99a6dd9836..6f091fc017 100644
--- a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java
+++ b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdog.java
@@ -27,10 +27,14 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.time.LocalTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
/**
* An extension to {@link ConnectionWatchdog} that intercepts maintenance events.
@@ -54,8 +58,6 @@ public class MaintenanceAwareConnectionWatchdog extends ConnectionWatchdog imple
private static final String FAILED_OVER_MESSAGE_TYPE = "FAILED_OVER";
- private static final int REBIND_ADDRESS_INDEX = 3;
-
public static final AttributeKey REBIND_ATTRIBUTE = AttributeKey.newInstance("rebindAddress");
private static final int MIGRATING_SHARDS_INDEX = 3;
@@ -70,6 +72,8 @@ public class MaintenanceAwareConnectionWatchdog extends ConnectionWatchdog imple
private final Set componentListeners = new HashSet<>();
+ private RebindAwareAddressSupplier rebindAwareAddressSupplier;
+
public MaintenanceAwareConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap,
Timer timer, EventExecutorGroup reconnectWorkers, Mono socketAddressSupplier,
ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus,
@@ -105,17 +109,37 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
+ @Override
+ protected Mono wrapSocketAddressSupplier(Mono socketAddressSupplier) {
+ Mono source = super.wrapSocketAddressSupplier(socketAddressSupplier);
+ rebindAwareAddressSupplier = new RebindAwareAddressSupplier();
+ return rebindAwareAddressSupplier.wrappedSupplier(source);
+ }
+
+ /**
+ * Creates a wrapped socket address supplier with a custom clock for testing purposes.
+ *
+ * @param socketAddressSupplier the original socket address supplier
+ * @param clock the clock to use for time-based operations
+ * @return the wrapped supplier
+ */
+ protected Mono wrapSocketAddressSupplier(Mono socketAddressSupplier, Clock clock) {
+ Mono source = super.wrapSocketAddressSupplier(socketAddressSupplier);
+ rebindAwareAddressSupplier = new RebindAwareAddressSupplier(clock);
+ return rebindAwareAddressSupplier.wrappedSupplier(source);
+ }
+
@Override
public void onPushMessage(PushMessage message) {
String mType = message.getType();
if (REBIND_MESSAGE_TYPE.equals(mType)) {
- final SocketAddress rebindAddress = getRemoteAddress(message);
- if (rebindAddress != null) {
- logger.debug("Attempting to rebind to new endpoint '{}'", rebindAddress);
+ final MovingEvent movingEvent = MovingEvent.from(message);
+ if (movingEvent != null) {
+ logger.debug("Attempting to rebind to new endpoint '{}'", movingEvent.getEndpoint());
channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED);
- this.reconnectionHandler.setSocketAddressSupplier(rebindAddress);
+ rebindAwareAddressSupplier.rebind(movingEvent.getTime(), movingEvent.getEndpoint());
ChannelPipeline pipeline = channel.pipeline();
CommandHandler commandHandler = pipeline.get(CommandHandler.class);
@@ -123,7 +147,7 @@ public void onPushMessage(PushMessage message) {
channel.close().awaitUninterruptibly();
channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
} else {
- notifyRebindStarted();
+ notifyRebindStarted(movingEvent.getTime(), movingEvent.getEndpoint());
}
}
} else if (MIGRATING_MESSAGE_TYPE.equals(mType)) {
@@ -199,31 +223,68 @@ private static String getShards(List content, int shardsIndex, String ma
return StringCodec.UTF8.decodeKey((ByteBuffer) shardsObject);
}
- private SocketAddress getRemoteAddress(PushMessage message) {
+ static class MovingEvent {
- List content = message.getContent();
- if (content.size() != 4) {
- logger.warn("Invalid re-bind message format, expected 4 elements, got {}", content.size());
- return null;
+ private static final int EVENT_ID_INDEX = 1;
+
+ private static final int TIME_INDEX = 2;
+
+ private static final int ADDRESS_INDEX = 3;
+
+ private final Long eventId;
+
+ private final InetSocketAddress endpoint;
+
+ private final Duration time;
+
+ private MovingEvent(Long eventId, Duration time, InetSocketAddress endpoint) {
+ this.eventId = eventId;
+ this.endpoint = endpoint;
+ this.time = time;
}
- Object addressObject = content.get(REBIND_ADDRESS_INDEX);
- if (!(addressObject instanceof ByteBuffer)) {
- logger.warn("Invalid re-bind message format, expected 3rd element to be a ByteBuffer, got {}",
- addressObject.getClass());
- return null;
+ static MovingEvent from(PushMessage message) {
+ if (!REBIND_MESSAGE_TYPE.equals(message.getType())) {
+ return null;
+ }
+
+ List content = message.getContent();
+
+ if (content.size() != 4) {
+ logger.warn("Invalid re-bind message format, expected 4 elements, got {}", content.size());
+ return null;
+ }
+
+ try {
+ Long eventId = (Long) content.get(EVENT_ID_INDEX);
+ Long timeInSec = (Long) content.get(TIME_INDEX);
+ ByteBuffer addressBuffer = (ByteBuffer) content.get(ADDRESS_INDEX);
+
+ String addressAndPort = StringCodec.UTF8.decodeKey((ByteBuffer) addressBuffer);
+ String[] parts = addressAndPort.split(":");
+ String address = parts[0];
+ int port = Integer.parseInt(parts[1]);
+ InetSocketAddress addr = new InetSocketAddress(address, port);
+
+ return new MovingEvent(eventId, Duration.ofSeconds(timeInSec), addr);
+ } catch (Exception e) {
+ logger.error("Invalid re-bind message format", e);
+ return null;
+ }
}
- String addressAndPort = StringCodec.UTF8.decodeKey((ByteBuffer) addressObject);
- try {
- String[] parts = addressAndPort.split(":");
- String address = parts[0];
- int port = Integer.parseInt(parts[1]);
- return new InetSocketAddress(address, port);
- } catch (Exception e) {
- logger.error("Failed to parse address and port from '{}'", addressAndPort, e);
- return null;
+ public Long getEventId() {
+ return eventId;
+ }
+
+ public InetSocketAddress getEndpoint() {
+ return endpoint;
}
+
+ public Duration getTime() {
+ return time;
+ }
+
}
/**
@@ -240,8 +301,17 @@ private void notifyRebindCompleted() {
this.componentListeners.forEach(MaintenanceAwareComponent::onRebindCompleted);
}
- private void notifyRebindStarted() {
- this.componentListeners.forEach(MaintenanceAwareComponent::onRebindStarted);
+ /**
+ * Called whenever a re-bind has been initiated by the remote server
+ *
+ * A specific endpoint is going to move to another node within
+ *
+ * @param endpoint address of the target endpoint
+ * @param time estimated time for the re-bind to complete
+ */
+ private void notifyRebindStarted(Duration time, SocketAddress endpoint) {
+ this.componentListeners.forEach(e -> e.onRebindStarted(time, endpoint));
}
private void notifyMigrateStarted(String shards) {
@@ -260,4 +330,50 @@ private void notifyFailoverCompleted(String shards) {
this.componentListeners.forEach(component -> component.onFailoverCompleted(shards));
}
+ static class RebindAwareAddressSupplier {
+
+ private static final class State {
+
+ final Instant cutoff;
+
+ final SocketAddress rebindAddress;
+
+ State(Instant cutoff, SocketAddress rebindAddress) {
+ this.cutoff = cutoff;
+ this.rebindAddress = rebindAddress;
+ }
+
+ }
+
+ private final AtomicReference state = new AtomicReference<>();
+
+ private final Clock clock;
+
+ public RebindAwareAddressSupplier() {
+ this(Clock.systemUTC());
+ }
+
+ public RebindAwareAddressSupplier(Clock clock) {
+ this.clock = clock;
+ }
+
+ public void rebind(Duration duration, SocketAddress rebindAddress) {
+ Instant newCutoff = clock.instant().plus(duration);
+ state.set(new State(newCutoff, rebindAddress));
+ }
+
+ public Mono wrappedSupplier(Mono original) {
+ return Mono.defer(() -> {
+ State current = state.get();
+ if (current != null && clock.instant().isBefore(current.cutoff)) {
+ return Mono.just(current.rebindAddress);
+ } else {
+ state.compareAndSet(current, null);
+ return original;
+ }
+ });
+ }
+
+ }
+
}
diff --git a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriter.java
index 23fdc88dba..89ca2ced31 100644
--- a/src/main/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriter.java
+++ b/src/main/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriter.java
@@ -17,6 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
@@ -182,7 +183,7 @@ private void registerAsMaintenanceAwareComponent() {
}
@Override
- public void onRebindStarted() {
+ public void onRebindStarted(Duration time, SocketAddress endpoint) {
enableRelaxedTimeout("Re-bind started");
}
diff --git a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java
index a2c7fc23fa..760e3331eb 100644
--- a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java
+++ b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java
@@ -114,18 +114,6 @@ protected Tuple2, CompletableFuture> r
return Tuples.of(future, address);
}
- /**
- * Replace the existing @link SocketAddressSupplier} with a new one.
- *
- * This could be used in a scenario where a re-bind is happening, e.g., the old node is going down and a new node is
- * supposed to handle the requests to the same endpoint.
- *
- * @param socketAddressSupplier the new address of the endpoint
- */
- public void setSocketAddressSupplier(SocketAddress socketAddressSupplier) {
- this.socketAddressSupplier = Mono.just(socketAddressSupplier);
- }
-
private void reconnect0(CompletableFuture result, SocketAddress remoteAddress) {
ChannelHandler handler = bootstrap.config().handler();
diff --git a/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java b/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java
index 8b73f5c9bb..a69f0d3e45 100644
--- a/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java
+++ b/src/test/java/io/lettuce/core/protocol/MaintenanceAwareConnectionWatchdogUnitTests.java
@@ -7,6 +7,7 @@
package io.lettuce.core.protocol;
import static io.lettuce.TestTags.UNIT_TEST;
+import static io.lettuce.test.ReflectionTestUtils.getField;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static io.lettuce.test.ReflectionTestUtils.setField;
@@ -18,7 +19,9 @@
import io.lettuce.core.api.push.PushMessage;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.EventBus;
+import io.lettuce.core.protocol.MaintenanceAwareConnectionWatchdog.RebindAwareAddressSupplier;
import io.lettuce.core.resource.Delay;
+import io.lettuce.test.MutableClock;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.util.Attribute;
@@ -29,18 +32,25 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Unit tests for {@link MaintenanceAwareConnectionWatchdog}.
@@ -109,6 +119,8 @@ class MaintenanceAwareConnectionWatchdogUnitTests {
@Mock
private ReconnectionHandler reconnectionHandler;
+ private RebindAwareAddressSupplier rebindAwareAddressSupplier;
+
private MaintenanceAwareConnectionWatchdog watchdog;
@BeforeEach
@@ -124,6 +136,9 @@ void setUp() {
// Set up the reconnectionHandler field using ReflectionTestUtils
setField(watchdog, "reconnectionHandler", reconnectionHandler);
+ // Spy the rebindAwareAddressSupplier field using ReflectionTestUtils
+ rebindAwareAddressSupplier = Mockito.spy((RebindAwareAddressSupplier) getField(watchdog, "rebindAwareAddressSupplier"));
+ setField(watchdog, "rebindAwareAddressSupplier", rebindAwareAddressSupplier);
}
@Test
@@ -199,6 +214,7 @@ void testChannelReadCompleteWithoutRebindCompleted() throws Exception {
// Then
verify(channel, never()).close();
verify(component1, never()).onRebindCompleted();
+
}
@Test
@@ -225,10 +241,10 @@ void testOnPushMessageMovingWithEmptyStack() {
// Then
verify(rebindAttribute).set(RebindState.STARTED);
- verify(reconnectionHandler).setSocketAddressSupplier(any(InetSocketAddress.class));
+ verify(rebindAwareAddressSupplier).rebind(eq(Duration.ofSeconds(15)), eq(new InetSocketAddress("127.0.0.1", 6380)));
verify(channel).close();
verify(rebindAttribute).set(RebindState.COMPLETED);
- verify(component1, never()).onRebindStarted(); // Not called when stack is empty
+ verify(component1, never()).onRebindStarted(any(), any()); // Not called when stack is empty
}
/**
@@ -319,9 +335,8 @@ void testOnPushMessageMovingWithNonEmptyStack() {
// Then
verify(rebindAttribute).set(RebindState.STARTED);
- verify(reconnectionHandler).setSocketAddressSupplier(any(InetSocketAddress.class));
verify(channel, never()).close();
- verify(component1).onRebindStarted(); // Called when stack is not empty
+ verify(component1).onRebindStarted(any(), any()); // Called when stack is not empty
}
@Test
@@ -342,8 +357,7 @@ void testOnPushMessageMovingWithInvalidContent() {
// Then
verify(rebindAttribute, never()).set(any());
- verify(reconnectionHandler, never()).setSocketAddressSupplier(any());
- verify(component1, never()).onRebindStarted();
+ verify(component1, never()).onRebindStarted(any(), any());
}
@Test
@@ -510,8 +524,7 @@ void testOnPushMessageMovingWithInvalidAddressFormat() {
// Then - should not proceed with rebind due to invalid address
verify(rebindAttribute, never()).set(any());
- verify(reconnectionHandler, never()).setSocketAddressSupplier(any());
- verify(component1, never()).onRebindStarted();
+ verify(component1, never()).onRebindStarted(any(), any());
}
@Test
@@ -532,8 +545,7 @@ void testOnPushMessageMovingWithNonByteBufferAddress() {
// Then - should not proceed with rebind due to invalid address type
verify(rebindAttribute, never()).set(any());
- verify(reconnectionHandler, never()).setSocketAddressSupplier(any());
- verify(component1, never()).onRebindStarted();
+ verify(component1, never()).onRebindStarted(any(), any());
}
@Test
@@ -558,4 +570,69 @@ void testRebindAttributeKey() {
assertThat(MaintenanceAwareConnectionWatchdog.REBIND_ATTRIBUTE.name()).isEqualTo("rebindAddress");
}
+ @Test
+ void testRebindAwareAddressSupplierWithFixedClock() {
+ // Given
+ Instant fixedTime = Instant.parse("2023-01-01T10:00:00Z");
+ Clock fixedClock = Clock.fixed(fixedTime, ZoneId.systemDefault());
+ RebindAwareAddressSupplier supplier = new RebindAwareAddressSupplier(fixedClock);
+
+ SocketAddress originalAddress = new InetSocketAddress("localhost", 6379);
+ SocketAddress rebindAddress = new InetSocketAddress("127.0.0.1", 6380);
+ Mono originalSupplier = Mono.just(originalAddress);
+
+ // When - rebind with 30 seconds duration
+ supplier.rebind(Duration.ofSeconds(30), rebindAddress);
+
+ // Then - should return rebind address since we're within the cutoff time
+ Mono wrappedSupplier = supplier.wrappedSupplier(originalSupplier);
+
+ StepVerifier.create(wrappedSupplier).expectNext(rebindAddress).verifyComplete();
+ }
+
+ @Test
+ void testRebindAwareAddressSupplierExpirationWithFixedClock() {
+ // Given - Create supplier with a mutable clock that we can advance
+ MutableClock clock = new MutableClock(Instant.parse("2023-01-01T10:00:00Z"));
+
+ RebindAwareAddressSupplier supplier = new RebindAwareAddressSupplier(clock);
+ SocketAddress originalAddress = new InetSocketAddress("localhost", 6379);
+ SocketAddress rebindAddress = new InetSocketAddress("127.0.0.1", 6380);
+ Mono originalSupplier = Mono.just(originalAddress);
+
+ // Step 1: Create wrapped supplier once (same instance used throughout)
+ Mono wrappedSupplier = supplier.wrappedSupplier(originalSupplier);
+
+ // Step 2: First subscription should return original address (no rebind set yet)
+ StepVerifier.create(wrappedSupplier).expectNext(originalAddress).verifyComplete();
+
+ // Step 3: Invoke rebind with 30 seconds duration
+ supplier.rebind(Duration.ofSeconds(30), rebindAddress);
+
+ // Step 4: New subscription to same wrappedSupplier should return rebind address
+ StepVerifier.create(wrappedSupplier).expectNext(rebindAddress).verifyComplete();
+
+ // Step 5: Advance clock past expiration (31 seconds)
+ clock.tick(Duration.ofSeconds(31));
+
+ // Step 6: New subscription to same wrappedSupplier should return original address again
+ StepVerifier.create(wrappedSupplier).expectNext(originalAddress).verifyComplete();
+ }
+
+ @Test
+ void testRebindAwareAddressSupplierWithNullState() {
+ // Given
+ Clock fixedClock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
+ RebindAwareAddressSupplier supplier = new RebindAwareAddressSupplier(fixedClock);
+
+ SocketAddress originalAddress = new InetSocketAddress("localhost", 6379);
+ Mono originalSupplier = Mono.just(originalAddress);
+
+ // When - no rebind has been set
+ Mono wrappedSupplier = supplier.wrappedSupplier(originalSupplier);
+
+ // Then - should return original address
+ StepVerifier.create(wrappedSupplier).expectNext(originalAddress).verifyComplete();
+ }
+
}
diff --git a/src/test/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriterUnitTests.java b/src/test/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriterUnitTests.java
index 8194952d48..061378f2e8 100644
--- a/src/test/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriterUnitTests.java
+++ b/src/test/java/io/lettuce/core/protocol/MaintenanceAwareExpiryWriterUnitTests.java
@@ -34,6 +34,7 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -308,7 +309,7 @@ void testReset() {
@Test
void testOnRebindStarted() {
// When
- writer.onRebindStarted();
+ writer.onRebindStarted(Duration.ofSeconds(15), new InetSocketAddress("10.0.0.1", 6379));
// Then - relaxTimeouts should be enabled (tested indirectly through timeout behavior)
// Verify that the relaxed timeout is not negative
@@ -323,7 +324,7 @@ void testOnRebindStartedWithNegativeRelaxedTimeout() {
writer = new MaintenanceAwareExpiryWriter(delegate, clientOptions, clientResources);
// When
- writer.onRebindStarted();
+ writer.onRebindStarted(Duration.ofSeconds(15), new InetSocketAddress("10.0.0.1", 6379));
// Then - relaxTimeouts should remain disabled for negative timeout
assertThat(writer).isNotNull();
@@ -405,7 +406,7 @@ void testRelaxedTimeoutCancellation() {
setField(writer, "relaxTimeout", previousTimeout);
// When
- writer.onRebindStarted();
+ writer.onRebindStarted(Duration.ofSeconds(15), new InetSocketAddress("10.0.0.1", 6379));
// Then
verify(previousTimeout).cancel();
@@ -477,7 +478,7 @@ void testTimeoutOptionsConfiguration() {
@Test
void testMultipleMaintenanceEvents() {
// When
- writer.onRebindStarted();
+ writer.onRebindStarted(Duration.ofSeconds(15), new InetSocketAddress("10.0.0.1", 6379));
writer.onMigrateStarted("[\"1\",\"2\",\"3\"]");
writer.onFailoverStarted("[\"1\",\"2\",\"3\"]");
diff --git a/src/test/java/io/lettuce/test/MutableClock.java b/src/test/java/io/lettuce/test/MutableClock.java
new file mode 100644
index 0000000000..7d70840216
--- /dev/null
+++ b/src/test/java/io/lettuce/test/MutableClock.java
@@ -0,0 +1,40 @@
+package io.lettuce.test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MutableClock extends Clock {
+
+ AtomicReference currentTime = new AtomicReference<>(Instant.now());
+
+ public MutableClock() {
+ this.currentTime.set(Instant.now());
+ }
+
+ public MutableClock(Instant startTime) {
+ this.currentTime.set(startTime);
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return ZoneId.systemDefault();
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ return this;
+ }
+
+ @Override
+ public Instant instant() {
+ return currentTime.get();
+ }
+
+ public void tick(Duration duration) {
+ currentTime.set(currentTime.get().plus(duration));
+ }
+
+};
From 6acbcf2ee0f0e62e470beb8485309e4b60c3c60f Mon Sep 17 00:00:00 2001
From: ggivo
Date: Wed, 6 Aug 2025 13:01:40 +0300
Subject: [PATCH 11/16] Update event's validation spec
- MIGRATING
- *
- * The options provided by this class are used by Lettuce to determine whether to listen for and process maintenance events, and
- * how to resolve address types when such events occur.
- *
+ *
+ * @see Redis Maintenance Events Specification
*/
public class MaintenanceEventsOptions {
@@ -51,29 +56,62 @@ public static MaintenanceEventsOptions.Builder builder() {
return new MaintenanceEventsOptions.Builder();
}
+ /**
+ * Create a new instance of {@link MaintenanceEventsOptions} with default settings.
+ *
+ * @return a new instance of {@link MaintenanceEventsOptions} with default settings.
+ */
public static MaintenanceEventsOptions create() {
return builder().build();
}
+ /**
+ * Creates maintenance events options with maintenance events disabled.
+ *
+ * @return disabled maintenance events options
+ */
public static MaintenanceEventsOptions disabled() {
return builder().supportMaintenanceEvents(false).build();
}
+ /**
+ * Creates maintenance events options with automatic address type resolution.
+ *
+ * The address type is automatically determined based on connection characteristics.
+ *
+ * @return enabled options with auto-resolution
+ */
public static MaintenanceEventsOptions enabled() {
return builder().supportMaintenanceEvents().autoResolveAddressType().build();
}
+ /**
+ * Creates maintenance events options with a fixed address type.
+ *
+ * Always requests the specified address type for maintenance notifications.
+ *
+ * @param addressType the fixed address type to request
+ * @return enabled options with fixed address type
+ */
public static MaintenanceEventsOptions enabled(AddressType addressType) {
return builder().supportMaintenanceEvents().fixedAddressType(addressType).build();
}
+ /**
+ * Check if maintenance events are supported.
+ *
+ * @return true if maintenance events are supported
+ */
public boolean supportsMaintenanceEvents() {
return supportMaintenanceEvents;
}
/**
- * @return the address type source to determine the requested address type when maintenance events are enabled . Can be
- * {@code null} if {@link #supportsMaintenanceEvents()} is {@code false}.
+ * Returns the address type source used to determine the requested address type.
+ *
+ * The address type source determines what address type to request for maintenance notifications.
+ *
+ * @return the address type source, or {@code null} if maintenance events are disabled
*/
public AddressTypeSource getAddressTypeSource() {
return addressTypeSource;
@@ -94,11 +132,40 @@ public MaintenanceEventsOptions.Builder supportMaintenanceEvents(boolean support
return this;
}
+ /**
+ * Configure a fixed address type for all maintenance notifications.
+ *
+ * Overrides automatic resolution and always requests the specified type.
+ *
+ * @param addressType the address type to request from server
+ * @return this builder
+ * @see AddressType
+ */
public Builder fixedAddressType(AddressType addressType) {
this.addressTypeSource = new FixedAddressTypeSource(addressType);
return this;
}
+ /**
+ * Configure automatic address type resolution based on connection characteristics.
+ *
+ * Resolution logic:
+ *
+ *
Network detection: If remote IP is private (see {@link NetUtils#isPrivateIp(SocketAddress)}),
+ * use INTERNAL_*, otherwise EXTERNAL_*
+ *
Format selection: If TLS is enabled, use *_FQDN, otherwise use *_IP
+ *
+ *
+ * Examples:
+ *
+ *
Private IP + no TLS → {@code INTERNAL_IP}
+ *
Private IP + TLS → {@code INTERNAL_FQDN}
+ *
Public IP + no TLS → {@code EXTERNAL_IP}
+ *
Public IP + TLS → {@code EXTERNAL_FQDN}
+ *
+ *
+ * @return this builder
+ */
public Builder autoResolveAddressType() {
this.addressTypeSource = new AutoresolveAddressTypeSource();
return this;
@@ -110,8 +177,22 @@ public MaintenanceEventsOptions build() {
}
+ /**
+ * Address types for maintenance event notifications.
+ *
+ * Determines the format of endpoint addresses returned in MOVING notifications.
+ *
+ * @since 7.0
+ */
public enum AddressType {
- INTERNAL_IP, INTERNAL_FQDN, EXTERNAL_IP, EXTERNAL_FQDN
+ /** Internal IP address (for private network connections) */
+ INTERNAL_IP,
+ /** Internal fully qualified domain name (for private network connections with TLS) */
+ INTERNAL_FQDN,
+ /** External IP address (for public network connections) */
+ EXTERNAL_IP,
+ /** External fully qualified domain name (for public network connections with TLS) */
+ EXTERNAL_FQDN
}
private static class FixedAddressTypeSource extends MaintenanceEventsOptions.AddressTypeSource {
@@ -156,8 +237,20 @@ public MaintenanceEventsOptions.AddressType getAddressType(SocketAddress socketA
}
+ /**
+ * Strategy interface for determining the address type to request in maintenance notifications.
+ *
+ * Implementations determine what address type to request for maintenance notifications.
+ */
public static abstract class AddressTypeSource {
+ /**
+ * Determines the address type based on connection characteristics.
+ *
+ * @param socketAddress the remote socket address of the connection
+ * @param sslEnabled whether TLS/SSL is enabled for the connection
+ * @return the address type to request, or null if no specific type is needed
+ */
public abstract AddressType getAddressType(SocketAddress socketAddress, boolean sslEnabled);
}
From 1062298bf2dd8ef0c377273ec60a1961e60efb4c Mon Sep 17 00:00:00 2001
From: Ivo Gaydazhiev
Date: Thu, 7 Aug 2025 10:19:26 +0300
Subject: [PATCH 16/16] Update
src/main/java/io/lettuce/core/internal/NetUtils.java
Co-authored-by: Tihomir Krasimirov Mateev
---
.../java/io/lettuce/core/internal/NetUtils.java | 13 -------------
1 file changed, 13 deletions(-)
diff --git a/src/main/java/io/lettuce/core/internal/NetUtils.java b/src/main/java/io/lettuce/core/internal/NetUtils.java
index 1fab8580dc..a5011e0cbb 100644
--- a/src/main/java/io/lettuce/core/internal/NetUtils.java
+++ b/src/main/java/io/lettuce/core/internal/NetUtils.java
@@ -3,19 +3,6 @@
* All rights reserved.
*
* Licensed under the MIT License.
- *
- * This file contains contributions from third-party contributors
- * licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package io.lettuce.core.internal;