Skip to content

Add metrics for tracking total disconnected time and reconnection attempts #3220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1938b57
v0.1
tishun Mar 12, 2025
f7723c8
Simple reconnect now working
tishun Mar 12, 2025
97761fe
Bind address from message is now considered
tishun Mar 12, 2025
73f3834
Self-register the handler
tishun Mar 13, 2025
cad3370
Format code
tishun Mar 13, 2025
e53dbf6
Filter push messages in a more stable way
tishun Mar 13, 2025
04112c2
(very hacky) Relax comand expire timers globbaly
tishun Mar 13, 2025
fc47155
Configure if timeout relaxing should be applied
tishun Mar 13, 2025
bd85245
Proper way to close channel
tishun Mar 18, 2025
c397419
Configure the timneout relaxing
tishun Mar 27, 2025
465e249
Sequential handover implemented
uglide Mar 26, 2024
b74a168
Did not address formatting
tishun Apr 14, 2025
bb1dd67
Prolong the rebind windwow for relaxed tiemouts
tishun Apr 16, 2025
b5b2118
PubSub no longer required; CommandExpiryWriter is now channel aware; …
tishun Apr 28, 2025
81f783f
Use the new MOVING push message from the RE server
tishun May 8, 2025
b86dfec
Unit test was not chaining delgates in the same way that the RedisCli…
tishun May 9, 2025
b09a297
Fix REBIND message validation
ggivo May 12, 2025
f94cff0
Fixed the expiry mechanism
tishun May 19, 2025
2bb04e8
Polishing
tishun May 19, 2025
27094c4
Fix NPE.
ggivo May 19, 2025
36b838f
Add support for MIGRATING/MIGRATED message handling in command expiry
ggivo Jun 12, 2025
2610739
formating
ggivo Jun 12, 2025
e0e0a60
Fix Disabling relaxTimeouts after upgrade can interfere with an ongoi…
ggivo Jun 13, 2025
e8efa02
Additional fix for timeout relaxing disabled
ggivo Jun 13, 2025
392c406
Fix push message listener registered multiple times after rebind.
ggivo Jun 16, 2025
d1869c5
Fix: Report correct command timeout when relaxTimeout is configured
ggivo Jun 17, 2025
c52a519
Disable relaxedTimeout after configured grace period
ggivo Jun 18, 2025
5de6071
Code clean up
ggivo Jun 18, 2025
2c5efa8
Add FAILING_OVER/FAILED_OVER
ggivo Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class ClientOptions implements Serializable {

public static final boolean DEFAULT_AUTO_RECONNECT = true;

public static final boolean DEFAULT_PROACTIVE_REBIND = false;

public static final Predicate<RedisCommand<?, ?, ?>> DEFAULT_REPLAY_FILTER = (cmd) -> false;

public static final int DEFAULT_BUFFER_USAGE_RATIO = 3;
Expand Down Expand Up @@ -96,6 +98,8 @@ public class ClientOptions implements Serializable {

private final boolean autoReconnect;

private final boolean proactiveRebind;

private final Predicate<RedisCommand<?, ?, ?>> replayFilter;

private final boolean cancelCommandsOnReconnectFailure;
Expand Down Expand Up @@ -132,6 +136,7 @@ public class ClientOptions implements Serializable {

protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.proactiveRebind = builder.proactiveRebind;
this.replayFilter = builder.replayFilter;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
Expand All @@ -153,6 +158,7 @@ protected ClientOptions(Builder builder) {

protected ClientOptions(ClientOptions original) {
this.autoReconnect = original.isAutoReconnect();
this.proactiveRebind = original.isProactiveRebindEnabled();
this.replayFilter = original.getReplayFilter();
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
Expand Down Expand Up @@ -207,6 +213,8 @@ public static class Builder {

private boolean autoReconnect = DEFAULT_AUTO_RECONNECT;

private boolean proactiveRebind = DEFAULT_PROACTIVE_REBIND;

private Predicate<RedisCommand<?, ?, ?>> replayFilter = DEFAULT_REPLAY_FILTER;

private boolean cancelCommandsOnReconnectFailure = DEFAULT_CANCEL_CMD_RECONNECT_FAIL;
Expand Down Expand Up @@ -256,6 +264,20 @@ public Builder autoReconnect(boolean autoReconnect) {
return this;
}

/**
* Configure whether the driver should listen for server events that indicate the current endpoint is being re-bound.
* When enabled, the proactive re-bind will help with the connection handover and reduce the number of failed commands.
* This feature requires the server to support proactive re-binds. Defaults to {@code false}. See
* {@link #DEFAULT_PROACTIVE_REBIND}.
*
* @param proactiveRebind true/false
* @return {@code this}
*/
public Builder proactiveRebind(boolean proactiveRebind) {
this.proactiveRebind = proactiveRebind;
return this;
}

/**
* When {@link #autoReconnect(boolean)} is set to true, this {@link Predicate} is used to filter commands to replay when
* the connection is reestablished after a disconnect. Returning <code>false</code> means the command will not be
Expand Down Expand Up @@ -551,14 +573,15 @@ public ClientOptions build() {
public ClientOptions.Builder mutate() {
Builder builder = new Builder();

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.replayFilter(getReplayFilter()).decodeBufferPolicy(getDecodeBufferPolicy())
.disconnectedBehavior(getDisconnectedBehavior()).reauthenticateBehavior(getReauthenticateBehaviour())
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
builder.autoReconnect(isAutoReconnect()).proactiveRebind(isProactiveRebindEnabled())
.cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()).replayFilter(getReplayFilter())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
.timeoutOptions(getTimeoutOptions());

return builder;
}
Expand All @@ -576,6 +599,19 @@ public boolean isAutoReconnect() {
return autoReconnect;
}

/**
* Controls auto-reconnect behavior on connections. If auto-reconnect is {@code true} (default), it is enabled. As soon as a
* connection gets closed/reset without the intention to close it, the client will try to reconnect and re-issue any queued
* commands.
*
* This flag has also the effect that disconnected connections will refuse commands and cancel these with an exception.
*
* @return {@code true} if auto-reconnect is enabled.
*/
public boolean isProactiveRebindEnabled() {
return proactiveRebind;
}

/**
* Controls which {@link RedisCommand} will be replayed after a re-connect. The {@link Predicate} returns <code>true</code>
* if command should be filtered out and not replayed. Defaults to {@link #DEFAULT_REPLAY_FILTER}.
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import io.lettuce.core.protocol.RebindAwareConnectionWatchdog;
import jdk.net.ExtendedSocketOptions;
import reactor.core.publisher.Mono;
import io.lettuce.core.internal.LettuceAssert;
Expand Down Expand Up @@ -153,9 +154,16 @@ protected ConnectionWatchdog createConnectionWatchdog() {
LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true");
LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true");

ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint);
ConnectionWatchdog watchdog;
if (clientOptions.isProactiveRebindEnabled()) {
watchdog = new RebindAwareConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint);
} else {
watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint);
}

endpoint.registerConnectionWatchdog(watchdog);

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -413,7 +413,7 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -580,7 +580,7 @@ private <K, V> ConnectionFuture<StatefulRedisSentinelConnection<K, V>> doConnect
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down
40 changes: 38 additions & 2 deletions src/main/java/io/lettuce/core/TimeoutOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@
@SuppressWarnings("serial")
public class TimeoutOptions implements Serializable {

public static final Duration DISABLED_TIMEOUT = Duration.ZERO.minusSeconds(1);

public static final boolean DEFAULT_TIMEOUT_COMMANDS = false;

public static final Duration DEFAULT_RELAXED_TIMEOUT = DISABLED_TIMEOUT;

private final boolean timeoutCommands;

private final boolean applyConnectionTimeout;

private final Duration relaxedTimeout;

private final TimeoutSource source;

private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source) {
private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source,
Duration relaxedTimeout) {

this.timeoutCommands = timeoutCommands;
this.applyConnectionTimeout = applyConnectionTimeout;
this.relaxedTimeout = relaxedTimeout;
this.source = source;
}

Expand Down Expand Up @@ -84,6 +92,8 @@ public static class Builder {

private boolean applyConnectionTimeout = false;

private Duration relaxedTimeout = DEFAULT_RELAXED_TIMEOUT;

private TimeoutSource source;

/**
Expand All @@ -107,6 +117,25 @@ public Builder timeoutCommands(boolean enabled) {
return this;
}

/**
* Enable proactive timeout relaxing. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}.
* <p/>
* If the Redis server supports this, and the client is set up to use it by the
* {@link ClientOptions#isProactiveRebindEnabled()} option, the client would listen to notifications that the current
* endpoint is about to go down (as part of some maintenance activity, for example). In such cases, the driver could
* extend the existing timeout settings for newly issued commands, or such that are in flight, to make sure they do not
* time out during this process. These commands could be either a part of the offline buffer or waiting for a reply.
*
* @param duration {@link Duration} to relax timeouts proactively, must not be {@code null}.
* @return {@code this}
*/
public Builder proactiveTimeoutsRelaxing(Duration duration) {
LettuceAssert.notNull(duration, "Duration must not be null");

this.relaxedTimeout = duration;
return this;
}

/**
* Set a fixed timeout for all commands.
*
Expand Down Expand Up @@ -158,7 +187,7 @@ public TimeoutOptions build() {
}
}

return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source);
return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source, relaxedTimeout);
}

}
Expand All @@ -177,6 +206,13 @@ public boolean isApplyConnectionTimeout() {
return applyConnectionTimeout;
}

/**
* @return the {@link Duration} to relax timeouts proactively, {@link #DISABLED_TIMEOUT} if disabled.
*/
public Duration getRelaxedTimeout() {
return relaxedTimeout;
}

/**
* @return the timeout source to determine the timeout for a {@link RedisCommand}. Can be {@code null} if
* {@link #isTimeoutCommands()} is {@code false}.
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -634,7 +634,7 @@ <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubToNode
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -680,7 +680,7 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -798,7 +798,7 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti
this.timer = clientResources.timer();
}

/**
* Create a new {@link CommandExpiryWriter} or {@link RebindAwareExpiryWriter} depending on the {@link ClientOptions}
* configuration.
*
* @param delegate must not be {@code null}.
* @param clientOptions must not be {@code null}.
* @param clientResources must not be {@code null}.
* @return the {@link CommandExpiryWriter} or {@link RebindAwareExpiryWriter}.
* @since 6.7
*/
public static RedisChannelWriter buildCommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOptions,
ClientResources clientResources) {
if (clientOptions.isProactiveRebindEnabled()) {
return new RebindAwareExpiryWriter(delegate, clientOptions, clientResources);
} else {
return new CommandExpiryWriter(delegate, clientOptions, clientResources);
}
}

/**
* Check whether {@link ClientOptions} is configured to timeout commands.
*
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package io.lettuce.core.protocol;

import static io.lettuce.core.ConnectionEvents.*;
import static io.lettuce.core.protocol.RebindAwareConnectionWatchdog.REBIND_ATTRIBUTE;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.LocalTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -625,6 +627,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE)
&& ctx.channel().attr(REBIND_ATTRIBUTE).get() != null
&& ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED);
if (debugEnabled && rebindInProgress) {
logger.debug("{} Processing command while rebind is in progress, stack has {} more to process", logPrefix(),
stack.size());
}

if (pristine) {

Expand Down Expand Up @@ -711,6 +720,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
}
}

if (rebindInProgress && stack.isEmpty()) {
logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now());
ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
}

decodeBufferPolicy.afterDecoding(buffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {

private final EventExecutorGroup reconnectWorkers;

private final ReconnectionHandler reconnectionHandler;
protected final ReconnectionHandler reconnectionHandler;

private final ReconnectionListener reconnectionListener;

Expand Down
Loading