From 1938b578209285c161c9fa8c28baea1404e28500 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 12 Mar 2025 15:50:45 +0200 Subject: [PATCH 01/29] v0.1 --- .../ProactiveWatchdogCommandHandler.java | 157 ++++++++++++++++++ .../extensibility/LettuceProactiveDemo.java | 91 ++++++++++ 2 files changed, 248 insertions(+) create mode 100644 src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java create mode 100644 src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java new file mode 100644 index 000000000..65c3c9c54 --- /dev/null +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -0,0 +1,157 @@ +package io.lettuce.core.proactive; + +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.output.StatusOutput; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandHandler; +import io.lettuce.core.protocol.ConnectionWatchdog; +import io.lettuce.core.protocol.DefaultEndpoint; +import io.lettuce.core.pubsub.PubSubOutput; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.RedisPubSubListener; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.VoidChannelPromise; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.StringTokenizer; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import static io.lettuce.core.protocol.CommandType.PING; +import static io.lettuce.core.protocol.CommandType.SUBSCRIBE; + +/** + * + * @param Key type. + * @param Value type. + * @author Will Glozer + */ +public class ProactiveWatchdogCommandHandler extends ChannelInboundHandlerAdapter implements PushListener { + + private static final Logger logger = Logger.getLogger(ProactiveWatchdogCommandHandler.class.getName()); + + private static final String REBIND_CHANNEL = "__rebind"; + + private ChannelHandlerContext context; + + private ConnectionWatchdog watchdog; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + logger.info("Channel {} active"); + + ChannelPipeline pipeline = ctx.channel().pipeline(); + CommandHandler command = pipeline.get(CommandHandler.class); + watchdog = pipeline.get(ConnectionWatchdog.class); + context = ctx; + +// Command rebind = +// new Command<>(SUBSCRIBE, +// new PubSubOutput<>(StringCodec.UTF8), +// new PubSubCommandArgs<>(StringCodec.UTF8).addKey(REBIND_CHANNEL)); +// +// if (command != null) { +// ctx.write(rebind); +// } + + super.channelActive(ctx); + } + + @Override + public void onPushMessage(PushMessage message) { + logger.info("Channel received message"); + + List content = message.getContent() + .stream() + .map( ez -> StringCodec.UTF8.decodeKey( (ByteBuffer) ez)) + .collect(Collectors.toList()); + + if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { + logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content)+"'"); + context.channel().disconnect().addListener(future -> { + Bootstrap bootstrap = watchdog.getBootstrap(); + bootstrap.connect(getRemoteAddress(content)).addListener(futur -> { + if (futur.isSuccess()) { + logger.info("Success?"); + } else { + logger.info("Failure?"); + } + }); + + }); + +// context.fireChannelInactive(); +// context.channel().connect(getRemoteAddress(content)); + + } + } + + private SocketAddress getRemoteAddress(List messageContents){ + + final String payload = messageContents.stream() + .filter(c -> c.contains("to_ep")) + .findFirst().orElseThrow(() -> new IllegalArgumentException("to_ep not found")); + + final String toEndpoint = Arrays.stream(payload.split(";")) + .filter( c->c.contains("to_ep")) + .findFirst().orElseThrow(() -> new IllegalArgumentException("to_ep not found")); + + final String addressAndPort = toEndpoint.split("=")[1]; + final String address = addressAndPort.split(":")[0]; + final int port = Integer.parseInt(addressAndPort.split(":")[1]); + + return new InetSocketAddress(address, port); + } + + + /** + * + * Command args for Pub/Sub connections. This implementation hides the first key as PubSub keys are not keys from the key-space. + * + * @author Mark Paluch + * @since 4.2 + */ + static class PubSubCommandArgs extends CommandArgs { + + /** + * @param codec Codec used to encode/decode keys and values, must not be {@code null}. + */ + public PubSubCommandArgs(RedisCodec codec) { + super(codec); + } + + /** + * + * @return always {@code null}. + */ + @Override + public ByteBuffer getFirstEncodedKey() { + return null; + } + + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + logger.info("Channel inactive"); + + super.channelInactive(ctx); + } +} diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java new file mode 100644 index 000000000..ea9b43ed8 --- /dev/null +++ b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java @@ -0,0 +1,91 @@ +package biz.paluch.redis.extensibility; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.event.EventBus; +import io.lettuce.core.proactive.ProactiveWatchdogCommandHandler; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.NettyCustomizer; +import io.netty.channel.Channel; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +public class LettuceProactiveDemo { + public static final Logger logger = Logger.getLogger(LettuceProactiveDemo.class.getName()); + + public static void main(String[] args) { + ProactiveWatchdogCommandHandler proactiveHandler = + new ProactiveWatchdogCommandHandler<>(); + + ClientResources resources = ClientResources.builder() + .nettyCustomizer(new NettyCustomizer() { + @Override + public void afterChannelInitialized(Channel channel) { + channel.pipeline().addFirst(proactiveHandler); + } + }).build(); + + RedisClient redisClient = RedisClient.create(resources, RedisURI.Builder.redis("localhost", 6379).build()); + + // Monitor connection events + EventBus eventBus = redisClient.getResources().eventBus(); + eventBus.get().subscribe(e -> { + logger.info(">>> Connection event: " + e); + }); + + // Subscribe to __rebind channel + StatefulRedisPubSubConnection redis = redisClient.connectPubSub(); + RedisPubSubCommands commands = redis.sync(); + commands.subscribe("__rebind"); + + // Used to stop the demo by sending the following command: + // publish __rebind "type=stop_demo" + Control control = new Control(); + redis.addListener(control); + + // Used to initiate the proactive rebind by sending the following command + // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10" + redis.addListener(proactiveHandler); + + while (control.shouldContinue) { + try { + logger.info("Sending PING"); + logger.info(commands.ping()); + Thread.sleep(2000); + } catch (InterruptedException e) { + logger.severe("InterruptedException: " + e.getMessage()); + } + } + + + redis.close(); + redisClient.shutdown(); + } + + static class Control implements PushListener { + + public boolean shouldContinue = true; + + @Override + public void onPushMessage(PushMessage message) { + List content = message.getContent() + .stream() + .map( ez -> StringCodec.UTF8.decodeKey( (ByteBuffer) ez)) + .collect(Collectors.toList()); + + if (content.stream().anyMatch(c -> c.equals("type=stop_demo"))) { + logger.info("Control received message to stop the demo"); + shouldContinue = false; + } + } + } + +} From f7723c8b6a41467fe5ee9d00c49d254c1aea7a2c Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 12 Mar 2025 17:39:19 +0200 Subject: [PATCH 02/29] Simple reconnect now working --- .../ProactiveWatchdogCommandHandler.java | 36 ++----------------- 1 file changed, 2 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java index 65c3c9c54..611359b35 100644 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -4,45 +4,29 @@ import io.lettuce.core.api.push.PushMessage; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.output.StatusOutput; -import io.lettuce.core.protocol.Command; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandHandler; import io.lettuce.core.protocol.ConnectionWatchdog; -import io.lettuce.core.protocol.DefaultEndpoint; -import io.lettuce.core.pubsub.PubSubOutput; -import io.lettuce.core.pubsub.RedisPubSubAdapter; -import io.lettuce.core.pubsub.RedisPubSubListener; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.VoidChannelPromise; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.Optional; -import java.util.StringTokenizer; import java.util.logging.Logger; import java.util.stream.Collectors; -import static io.lettuce.core.protocol.CommandType.PING; -import static io.lettuce.core.protocol.CommandType.SUBSCRIBE; - /** * * @param Key type. * @param Value type. * @author Will Glozer */ +@ChannelHandler.Sharable public class ProactiveWatchdogCommandHandler extends ChannelInboundHandlerAdapter implements PushListener { private static final Logger logger = Logger.getLogger(ProactiveWatchdogCommandHandler.class.getName()); @@ -76,8 +60,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void onPushMessage(PushMessage message) { - logger.info("Channel received message"); - List content = message.getContent() .stream() .map( ez -> StringCodec.UTF8.decodeKey( (ByteBuffer) ez)) @@ -85,21 +67,7 @@ public void onPushMessage(PushMessage message) { if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content)+"'"); - context.channel().disconnect().addListener(future -> { - Bootstrap bootstrap = watchdog.getBootstrap(); - bootstrap.connect(getRemoteAddress(content)).addListener(futur -> { - if (futur.isSuccess()) { - logger.info("Success?"); - } else { - logger.info("Failure?"); - } - }); - - }); - -// context.fireChannelInactive(); -// context.channel().connect(getRemoteAddress(content)); - + context.fireChannelInactive(); } } From 97761fe06edf4a0c67b1405e7d9617129ebc57a2 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 12 Mar 2025 17:58:29 +0200 Subject: [PATCH 03/29] Bind address from message is now considered --- .../core/proactive/ProactiveRebindEvent.java | 37 +++++++++++++++++++ .../ProactiveWatchdogCommandHandler.java | 1 + .../core/protocol/ConnectionWatchdog.java | 17 ++++++++- .../core/protocol/ReconnectionHandler.java | 10 +++++ 4 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/lettuce/core/proactive/ProactiveRebindEvent.java diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveRebindEvent.java b/src/main/java/io/lettuce/core/proactive/ProactiveRebindEvent.java new file mode 100644 index 000000000..cf77d5105 --- /dev/null +++ b/src/main/java/io/lettuce/core/proactive/ProactiveRebindEvent.java @@ -0,0 +1,37 @@ +/* + * Copyright 2025, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.proactive; + +import java.net.SocketAddress; + +public class ProactiveRebindEvent { + + private final SocketAddress remoteAddress; + + public ProactiveRebindEvent(SocketAddress remoteAddress) { + this.remoteAddress = remoteAddress; + } + + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + +} diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java index 611359b35..d3e7280f3 100644 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -67,6 +67,7 @@ public void onPushMessage(PushMessage message) { if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content)+"'"); + context.fireUserEventTriggered(new ProactiveRebindEvent(getRemoteAddress(content))); context.fireChannelInactive(); } } diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 84bcb41f1..7ea9b56ea 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import io.lettuce.core.proactive.ProactiveRebindEvent; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import io.lettuce.core.ClientOptions; @@ -101,6 +102,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private volatile Timeout reconnectScheduleTimeout; + private SocketAddress rebindAddress; + /** * Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new * {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address. @@ -213,6 +216,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if(evt instanceof ProactiveRebindEvent) { + ProactiveRebindEvent event = (ProactiveRebindEvent) evt; + logger.info("Proactive rebind to {}", event.getRemoteAddress()); + rebindAddress = event.getRemoteAddress(); + } + + super.userEventTriggered(ctx, evt); + } + /** * Enable {@link ConnectionWatchdog} to listen for disconnected events. */ @@ -330,7 +344,8 @@ private void run(int attempt, Duration delay) throws Exception { eventBus.publish(new ReconnectAttemptEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, attempt, delay)); logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress); - Tuple2, CompletableFuture> tuple = reconnectionHandler.reconnect(); + Tuple2, CompletableFuture> tuple = + rebindAddress == null ? reconnectionHandler.reconnect() : reconnectionHandler.reconnect(rebindAddress); CompletableFuture future = tuple.getT1(); future.whenComplete((c, t) -> { diff --git a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java index 8bf183b42..04ab026c4 100644 --- a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java +++ b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java @@ -114,6 +114,16 @@ protected Tuple2, CompletableFuture> r return Tuples.of(future, address); } + protected Tuple2, CompletableFuture> reconnect(SocketAddress remoteAddress) { + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture address = new CompletableFuture<>(); + + reconnect0(future, remoteAddress); + + this.currentFuture = future; + return Tuples.of(future, address); + } + private void reconnect0(CompletableFuture result, SocketAddress remoteAddress) { ChannelHandler handler = bootstrap.config().handler(); From 73f3834c96c65863ef8ab99d17535190287f138c Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 13 Mar 2025 13:08:51 +0200 Subject: [PATCH 04/29] Self-register the handler --- .../proactive/ProactiveWatchdogCommandHandler.java | 10 ++++++++++ .../redis/extensibility/LettuceProactiveDemo.java | 4 +++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java index d3e7280f3..6a4b305a8 100644 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -1,5 +1,6 @@ package io.lettuce.core.proactive; +import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.push.PushListener; import io.lettuce.core.api.push.PushMessage; import io.lettuce.core.codec.RedisCodec; @@ -7,6 +8,7 @@ import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandHandler; import io.lettuce.core.protocol.ConnectionWatchdog; +import io.lettuce.core.pubsub.PubSubCommandHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -46,6 +48,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { watchdog = pipeline.get(ConnectionWatchdog.class); context = ctx; + PubSubCommandHandler cmdhndlr = pipeline.get(PubSubCommandHandler.class); + cmdhndlr.getEndpoint().addListener(this); // Command rebind = // new Command<>(SUBSCRIBE, // new PubSubOutput<>(StringCodec.UTF8), @@ -69,6 +73,12 @@ public void onPushMessage(PushMessage message) { logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content)+"'"); context.fireUserEventTriggered(new ProactiveRebindEvent(getRemoteAddress(content))); context.fireChannelInactive(); + +// context.channel().pipeline().get() +// +// +// StatefulRedisConnection connection = watchdog.getConnection(); +// connection.setTimeout(); } } diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java index ea9b43ed8..970fbc93a 100644 --- a/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java +++ b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java @@ -53,7 +53,9 @@ public void afterChannelInitialized(Channel channel) { // Used to initiate the proactive rebind by sending the following command // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10" - redis.addListener(proactiveHandler); + + // NO LONGER NEEDED, HANDLER REGISTERES ITSELF + // redis.addListener(proactiveHandler); while (control.shouldContinue) { try { From cad3370931e51e2bb13f672ad6e6b7a73f916bb6 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 13 Mar 2025 13:12:51 +0200 Subject: [PATCH 05/29] Format code --- .../ProactiveWatchdogCommandHandler.java | 51 +++++++++---------- .../core/protocol/ConnectionWatchdog.java | 7 +-- .../extensibility/LettuceProactiveDemo.java | 29 +++++------ 3 files changed, 42 insertions(+), 45 deletions(-) diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java index 6a4b305a8..48d845547 100644 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -48,49 +48,45 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { watchdog = pipeline.get(ConnectionWatchdog.class); context = ctx; - PubSubCommandHandler cmdhndlr = pipeline.get(PubSubCommandHandler.class); + PubSubCommandHandler cmdhndlr = pipeline.get(PubSubCommandHandler.class); cmdhndlr.getEndpoint().addListener(this); -// Command rebind = -// new Command<>(SUBSCRIBE, -// new PubSubOutput<>(StringCodec.UTF8), -// new PubSubCommandArgs<>(StringCodec.UTF8).addKey(REBIND_CHANNEL)); -// -// if (command != null) { -// ctx.write(rebind); -// } + // Command rebind = + // new Command<>(SUBSCRIBE, + // new PubSubOutput<>(StringCodec.UTF8), + // new PubSubCommandArgs<>(StringCodec.UTF8).addKey(REBIND_CHANNEL)); + // + // if (command != null) { + // ctx.write(rebind); + // } super.channelActive(ctx); } @Override public void onPushMessage(PushMessage message) { - List content = message.getContent() - .stream() - .map( ez -> StringCodec.UTF8.decodeKey( (ByteBuffer) ez)) + List content = message.getContent().stream().map(ez -> StringCodec.UTF8.decodeKey((ByteBuffer) ez)) .collect(Collectors.toList()); if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { - logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content)+"'"); + logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content) + "'"); context.fireUserEventTriggered(new ProactiveRebindEvent(getRemoteAddress(content))); context.fireChannelInactive(); -// context.channel().pipeline().get() -// -// -// StatefulRedisConnection connection = watchdog.getConnection(); -// connection.setTimeout(); + // context.channel().pipeline().get() + // + // + // StatefulRedisConnection connection = watchdog.getConnection(); + // connection.setTimeout(); } } - private SocketAddress getRemoteAddress(List messageContents){ + private SocketAddress getRemoteAddress(List messageContents) { - final String payload = messageContents.stream() - .filter(c -> c.contains("to_ep")) - .findFirst().orElseThrow(() -> new IllegalArgumentException("to_ep not found")); + final String payload = messageContents.stream().filter(c -> c.contains("to_ep")).findFirst() + .orElseThrow(() -> new IllegalArgumentException("to_ep not found")); - final String toEndpoint = Arrays.stream(payload.split(";")) - .filter( c->c.contains("to_ep")) - .findFirst().orElseThrow(() -> new IllegalArgumentException("to_ep not found")); + final String toEndpoint = Arrays.stream(payload.split(";")).filter(c -> c.contains("to_ep")).findFirst() + .orElseThrow(() -> new IllegalArgumentException("to_ep not found")); final String addressAndPort = toEndpoint.split("=")[1]; final String address = addressAndPort.split(":")[0]; @@ -99,10 +95,10 @@ private SocketAddress getRemoteAddress(List messageContents){ return new InetSocketAddress(address, port); } - /** * - * Command args for Pub/Sub connections. This implementation hides the first key as PubSub keys are not keys from the key-space. + * Command args for Pub/Sub connections. This implementation hides the first key as PubSub keys are not keys from the + * key-space. * * @author Mark Paluch * @since 4.2 @@ -133,4 +129,5 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } + } diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 7ea9b56ea..06794cb40 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -218,7 +218,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if(evt instanceof ProactiveRebindEvent) { + if (evt instanceof ProactiveRebindEvent) { ProactiveRebindEvent event = (ProactiveRebindEvent) evt; logger.info("Proactive rebind to {}", event.getRemoteAddress()); rebindAddress = event.getRemoteAddress(); @@ -344,8 +344,9 @@ private void run(int attempt, Duration delay) throws Exception { eventBus.publish(new ReconnectAttemptEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, attempt, delay)); logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress); - Tuple2, CompletableFuture> tuple = - rebindAddress == null ? reconnectionHandler.reconnect() : reconnectionHandler.reconnect(rebindAddress); + Tuple2, CompletableFuture> tuple = rebindAddress == null + ? reconnectionHandler.reconnect() + : reconnectionHandler.reconnect(rebindAddress); CompletableFuture future = tuple.getT1(); future.whenComplete((c, t) -> { diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java index 970fbc93a..4b7d200df 100644 --- a/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java +++ b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java @@ -19,19 +19,20 @@ import java.util.stream.Collectors; public class LettuceProactiveDemo { + public static final Logger logger = Logger.getLogger(LettuceProactiveDemo.class.getName()); public static void main(String[] args) { - ProactiveWatchdogCommandHandler proactiveHandler = - new ProactiveWatchdogCommandHandler<>(); + ProactiveWatchdogCommandHandler proactiveHandler = new ProactiveWatchdogCommandHandler<>(); + + ClientResources resources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() { + + @Override + public void afterChannelInitialized(Channel channel) { + channel.pipeline().addFirst(proactiveHandler); + } - ClientResources resources = ClientResources.builder() - .nettyCustomizer(new NettyCustomizer() { - @Override - public void afterChannelInitialized(Channel channel) { - channel.pipeline().addFirst(proactiveHandler); - } - }).build(); + }).build(); RedisClient redisClient = RedisClient.create(resources, RedisURI.Builder.redis("localhost", 6379).build()); @@ -53,9 +54,9 @@ public void afterChannelInitialized(Channel channel) { // Used to initiate the proactive rebind by sending the following command // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10" - + // NO LONGER NEEDED, HANDLER REGISTERES ITSELF - // redis.addListener(proactiveHandler); + // redis.addListener(proactiveHandler); while (control.shouldContinue) { try { @@ -67,7 +68,6 @@ public void afterChannelInitialized(Channel channel) { } } - redis.close(); redisClient.shutdown(); } @@ -78,9 +78,7 @@ static class Control implements PushListener { @Override public void onPushMessage(PushMessage message) { - List content = message.getContent() - .stream() - .map( ez -> StringCodec.UTF8.decodeKey( (ByteBuffer) ez)) + List content = message.getContent().stream().map(ez -> StringCodec.UTF8.decodeKey((ByteBuffer) ez)) .collect(Collectors.toList()); if (content.stream().anyMatch(c -> c.equals("type=stop_demo"))) { @@ -88,6 +86,7 @@ public void onPushMessage(PushMessage message) { shouldContinue = false; } } + } } From e53dbf66c4680a01fca1b45196556a4cc9fb0904 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 13 Mar 2025 13:25:17 +0200 Subject: [PATCH 06/29] Filter push messages in a more stable way --- .../core/proactive/ProactiveWatchdogCommandHandler.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java index 48d845547..bb1ca7ef9 100644 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -1,6 +1,5 @@ package io.lettuce.core.proactive; -import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.push.PushListener; import io.lettuce.core.api.push.PushMessage; import io.lettuce.core.codec.RedisCodec; @@ -64,7 +63,12 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void onPushMessage(PushMessage message) { - List content = message.getContent().stream().map(ez -> StringCodec.UTF8.decodeKey((ByteBuffer) ez)) + if (!message.getType().equals("message")) { + return; + } + + List content = message.getContent().stream() + .map(ez -> ez instanceof ByteBuffer ? StringCodec.UTF8.decodeKey((ByteBuffer) ez) : ez.toString()) .collect(Collectors.toList()); if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { From 04112c2392231ed6ce4930985b04b19e4f17ef32 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 13 Mar 2025 16:04:54 +0200 Subject: [PATCH 07/29] (very hacky) Relax comand expire timers globbaly --- .../ProactiveWatchdogCommandHandler.java | 17 +++++------ .../core/protocol/CommandExpiryWriter.java | 28 +++++++++++++++++-- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java index bb1ca7ef9..c59375afe 100644 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -5,6 +5,7 @@ import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandExpiryWriter; import io.lettuce.core.protocol.CommandHandler; import io.lettuce.core.protocol.ConnectionWatchdog; import io.lettuce.core.pubsub.PubSubCommandHandler; @@ -47,8 +48,9 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { watchdog = pipeline.get(ConnectionWatchdog.class); context = ctx; - PubSubCommandHandler cmdhndlr = pipeline.get(PubSubCommandHandler.class); - cmdhndlr.getEndpoint().addListener(this); + PubSubCommandHandler commandHandler = pipeline.get(PubSubCommandHandler.class); + commandHandler.getEndpoint().addListener(this); + // Command rebind = // new Command<>(SUBSCRIBE, // new PubSubOutput<>(StringCodec.UTF8), @@ -73,14 +75,13 @@ public void onPushMessage(PushMessage message) { if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content) + "'"); + + // relax the command timeouts of the existing commands + CommandExpiryWriter.relaxTimeoutsGlobally = true; + + // disconnect the current channel and fire a re-bind event with the new address context.fireUserEventTriggered(new ProactiveRebindEvent(getRemoteAddress(content))); context.fireChannelInactive(); - - // context.channel().pipeline().get() - // - // - // StatefulRedisConnection connection = watchdog.getConnection(); - // connection.setTimeout(); } } diff --git a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java index 160deed18..823b5b6da 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java +++ b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java @@ -61,6 +61,8 @@ public class CommandExpiryWriter implements RedisChannelWriter { private volatile long timeout = -1; + public static boolean relaxTimeoutsGlobally = false; + /** * Create a new {@link CommandExpiryWriter}. * @@ -179,8 +181,14 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS Timeout commandTimeout = timer.newTimeout(t -> { if (!command.isDone()) { - executors.submit(() -> command.completeExceptionally(ExceptionFactory - .createTimeoutException(command.getType().toString(), Duration.ofNanos(timeUnit.toNanos(timeout))))); + executors.submit(() -> { + if (!relaxTimeoutsGlobally) { + command.completeExceptionally( + ExceptionFactory.createTimeoutException(command.getType().toString(), Duration.ofNanos(timeUnit.toNanos(timeout)))); + } else { + relaxedAttempt(command, executors); + } + }); } }, timeout, timeUnit); @@ -191,4 +199,20 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS } + // when relaxing the timeouts - instead of expiring immediately, we will start a new timer with 10 seconds + private void relaxedAttempt(RedisCommand command, ScheduledExecutorService executors) { + + Duration timeout = Duration.ofSeconds(10); + + Timeout commandTimeout = timer.newTimeout(t -> { + if (!command.isDone()) { + executors.submit(() -> command.completeExceptionally(ExceptionFactory.createTimeoutException(timeout))); + } + }, timeout.getSeconds(), TimeUnit.SECONDS); + + if (command instanceof CompleteableCommand) { + ((CompleteableCommand) command).onComplete((o, o2) -> commandTimeout.cancel()); + } + } + } From fc471556fb5ced60bfb33c8d78f14fade9db02af Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 13 Mar 2025 18:25:33 +0200 Subject: [PATCH 08/29] Configure if timeout relaxing should be applied --- .../core/protocol/CommandExpiryWriter.java | 8 ++++++- .../extensibility/LettuceProactiveDemo.java | 23 ++++++++++++++----- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java index 823b5b6da..646ab97c0 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java +++ b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java @@ -63,6 +63,8 @@ public class CommandExpiryWriter implements RedisChannelWriter { public static boolean relaxTimeoutsGlobally = false; + public static boolean enableProactive = false; + /** * Create a new {@link CommandExpiryWriter}. * @@ -182,7 +184,7 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS Timeout commandTimeout = timer.newTimeout(t -> { if (!command.isDone()) { executors.submit(() -> { - if (!relaxTimeoutsGlobally) { + if (shouldRelaxTimeoutsGlobally() ) { command.completeExceptionally( ExceptionFactory.createTimeoutException(command.getType().toString(), Duration.ofNanos(timeUnit.toNanos(timeout)))); } else { @@ -199,6 +201,10 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS } + public static boolean shouldRelaxTimeoutsGlobally() { + return relaxTimeoutsGlobally && enableProactive; + } + // when relaxing the timeouts - instead of expiring immediately, we will start a new timer with 10 seconds private void relaxedAttempt(RedisCommand command, ScheduledExecutorService executors) { diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java index 4b7d200df..72e154589 100644 --- a/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java +++ b/src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java @@ -1,20 +1,25 @@ package biz.paluch.redis.extensibility; +import io.lettuce.core.ClientOptions; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; +import io.lettuce.core.TimeoutOptions; import io.lettuce.core.api.push.PushListener; import io.lettuce.core.api.push.PushMessage; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.event.EventBus; import io.lettuce.core.proactive.ProactiveWatchdogCommandHandler; +import io.lettuce.core.protocol.CommandExpiryWriter; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; -import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.NettyCustomizer; import io.netty.channel.Channel; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -22,7 +27,7 @@ public class LettuceProactiveDemo { public static final Logger logger = Logger.getLogger(LettuceProactiveDemo.class.getName()); - public static void main(String[] args) { + public static void main(String[] args) throws ExecutionException, InterruptedException { ProactiveWatchdogCommandHandler proactiveHandler = new ProactiveWatchdogCommandHandler<>(); ClientResources resources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() { @@ -34,7 +39,13 @@ public void afterChannelInitialized(Channel channel) { }).build(); + TimeoutOptions timeoutOpts = TimeoutOptions.builder().timeoutCommands().fixedTimeout(Duration.ofMillis(1)).build(); + ClientOptions options = ClientOptions.builder().timeoutOptions(timeoutOpts).build(); + + CommandExpiryWriter.enableProactive = true; + RedisClient redisClient = RedisClient.create(resources, RedisURI.Builder.redis("localhost", 6379).build()); + redisClient.setOptions(options); // Monitor connection events EventBus eventBus = redisClient.getResources().eventBus(); @@ -44,8 +55,8 @@ public void afterChannelInitialized(Channel channel) { // Subscribe to __rebind channel StatefulRedisPubSubConnection redis = redisClient.connectPubSub(); - RedisPubSubCommands commands = redis.sync(); - commands.subscribe("__rebind"); + RedisPubSubAsyncCommands commands = redis.async(); + commands.subscribe("__rebind").get(); // Used to stop the demo by sending the following command: // publish __rebind "type=stop_demo" @@ -55,13 +66,13 @@ public void afterChannelInitialized(Channel channel) { // Used to initiate the proactive rebind by sending the following command // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10" - // NO LONGER NEEDED, HANDLER REGISTERES ITSELF + // NO LONGER NEEDED, HANDLER REGISTERS ITSELF // redis.addListener(proactiveHandler); while (control.shouldContinue) { try { logger.info("Sending PING"); - logger.info(commands.ping()); + logger.info(commands.ping().get()); Thread.sleep(2000); } catch (InterruptedException e) { logger.severe("InterruptedException: " + e.getMessage()); From bd85245541cb758cda0e2ac2201ad734b0b0270c Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Tue, 18 Mar 2025 16:57:22 +0200 Subject: [PATCH 09/29] Proper way to close channel --- .../lettuce/core/proactive/ProactiveWatchdogCommandHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java index c59375afe..d7f324da2 100644 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java @@ -81,7 +81,7 @@ public void onPushMessage(PushMessage message) { // disconnect the current channel and fire a re-bind event with the new address context.fireUserEventTriggered(new ProactiveRebindEvent(getRemoteAddress(content))); - context.fireChannelInactive(); + context.channel().close().awaitUninterruptibly(); } } From c3974196402803162337a37c81b1a168ddfeab95 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 27 Mar 2025 16:56:01 +0200 Subject: [PATCH 10/29] Configure the timneout relaxing --- .../java/io/lettuce/core/TimeoutOptions.java | 39 ++++++++++++++++++- .../core/protocol/CommandExpiryWriter.java | 15 +++---- .../core/protocol/DefaultEndpoint.java | 16 +------- 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/lettuce/core/TimeoutOptions.java b/src/main/java/io/lettuce/core/TimeoutOptions.java index a5754f181..1ee029fe7 100644 --- a/src/main/java/io/lettuce/core/TimeoutOptions.java +++ b/src/main/java/io/lettuce/core/TimeoutOptions.java @@ -24,18 +24,26 @@ @SuppressWarnings("serial") public class TimeoutOptions implements Serializable { + public static final Duration DISABLED_TIMEOUT = Duration.ZERO.minusSeconds(1); + public static final boolean DEFAULT_TIMEOUT_COMMANDS = false; + public static final Duration DEFAULT_RELAXED_TIMEOUT = DISABLED_TIMEOUT; + private final boolean timeoutCommands; private final boolean applyConnectionTimeout; + private final Duration relaxedTimeout; + private final TimeoutSource source; - private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source) { + private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source, + Duration relaxedTimeout) { this.timeoutCommands = timeoutCommands; this.applyConnectionTimeout = applyConnectionTimeout; + this.relaxedTimeout = relaxedTimeout; this.source = source; } @@ -84,6 +92,8 @@ public static class Builder { private boolean applyConnectionTimeout = false; + private Duration relaxedTimeout = DEFAULT_RELAXED_TIMEOUT; + private TimeoutSource source; /** @@ -107,6 +117,24 @@ public Builder timeoutCommands(boolean enabled) { return this; } + /** + * Enable proactive timeout relaxing. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}. + *

+ * If the Redis server supports this, the client could listen to notifications that the current endpoint is about to go + * down as part of some maintenance or failover activity. In such cases the driver could extend the existing timeout + * settings for existing commands to make sure they do not time out during this process either as part of the offline + * buffer or while waiting for a reply. + * + * @param duration {@link Duration} to relax timeouts proactively, must not be {@code null}. + * @return {@code this} + */ + public Builder proactiveTimeoutsRelaxing(Duration duration) { + LettuceAssert.notNull(duration, "Duration must not be null"); + + this.relaxedTimeout = duration; + return this; + } + /** * Set a fixed timeout for all commands. * @@ -158,7 +186,7 @@ public TimeoutOptions build() { } } - return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source); + return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source, relaxedTimeout); } } @@ -177,6 +205,13 @@ public boolean isApplyConnectionTimeout() { return applyConnectionTimeout; } + /** + * @return the {@link Duration} to relax timeouts proactively, {@link #DISABLED_TIMEOUT} if disabled. + */ + public Duration getRelaxedTimeout() { + return relaxedTimeout; + } + /** * @return the timeout source to determine the timeout for a {@link RedisCommand}. Can be {@code null} if * {@link #isTimeoutCommands()} is {@code false}. diff --git a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java index 646ab97c0..0070b5ada 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java +++ b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java @@ -59,6 +59,8 @@ public class CommandExpiryWriter implements RedisChannelWriter { private final boolean applyConnectionTimeout; + private final Duration relaxedTimeout; + private volatile long timeout = -1; public static boolean relaxTimeoutsGlobally = false; @@ -82,6 +84,7 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti this.delegate = delegate; this.source = timeoutOptions.getSource(); this.applyConnectionTimeout = timeoutOptions.isApplyConnectionTimeout(); + this.relaxedTimeout = timeoutOptions.getRelaxedTimeout(); this.timeUnit = source.getTimeUnit(); this.executorService = clientResources.eventExecutorGroup(); this.timer = clientResources.timer(); @@ -184,7 +187,7 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS Timeout commandTimeout = timer.newTimeout(t -> { if (!command.isDone()) { executors.submit(() -> { - if (shouldRelaxTimeoutsGlobally() ) { + if (shouldRelaxTimeoutsGlobally()) { command.completeExceptionally( ExceptionFactory.createTimeoutException(command.getType().toString(), Duration.ofNanos(timeUnit.toNanos(timeout)))); } else { @@ -201,20 +204,18 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS } - public static boolean shouldRelaxTimeoutsGlobally() { - return relaxTimeoutsGlobally && enableProactive; + public boolean shouldRelaxTimeoutsGlobally() { + return relaxTimeoutsGlobally && relaxedTimeout.isNegative(); } // when relaxing the timeouts - instead of expiring immediately, we will start a new timer with 10 seconds private void relaxedAttempt(RedisCommand command, ScheduledExecutorService executors) { - Duration timeout = Duration.ofSeconds(10); - Timeout commandTimeout = timer.newTimeout(t -> { if (!command.isDone()) { - executors.submit(() -> command.completeExceptionally(ExceptionFactory.createTimeoutException(timeout))); + executors.submit(() -> command.completeExceptionally(ExceptionFactory.createTimeoutException(relaxedTimeout))); } - }, timeout.getSeconds(), TimeUnit.SECONDS); + }, relaxedTimeout.toMillis(), TimeUnit.MILLISECONDS); if (command instanceof CompleteableCommand) { ((CompleteableCommand) command).onComplete((o, o2) -> commandTimeout.cancel()); diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 79f2f05f1..55d0be7d5 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -1054,20 +1054,8 @@ private void potentiallyRequeueCommands(Channel channel, RedisCommand s return; } - if (sentCommands != null) { - - boolean foundToSend = false; - - for (RedisCommand command : sentCommands) { - if (!command.isDone()) { - foundToSend = true; - break; - } - } - - if (!foundToSend) { - return; - } + if (sentCommands != null && sentCommands.stream().allMatch(RedisCommand::isDone)) { + return; } if (channel != null) { From 465e249d4fe57567cb15f583d8a3cf1b3a59de10 Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Tue, 26 Mar 2024 17:08:42 +0200 Subject: [PATCH 11/29] Sequential handover implemented --- .../java/io/lettuce/core/TimeoutOptions.java | 2 +- .../ProactiveWatchdogCommandHandler.java | 138 ------------------ .../core/protocol/CommandExpiryWriter.java | 32 +++- .../lettuce/core/protocol/CommandHandler.java | 17 +++ .../core/protocol/ConnectionWatchdog.java | 103 ++++++++++--- .../core/protocol/DefaultEndpoint.java | 21 ++- .../io/lettuce/core/protocol/Endpoint.java | 1 + .../core/rebind/RebindCompletedEvent.java | 27 ++++ .../RebindInitiatedEvent.java} | 8 +- .../io/lettuce/core/rebind/RebindState.java | 12 ++ .../extensibility/LettuceProactiveDemo.java | 103 ------------- .../extensibility/LettuceRebindDemo.java | 130 +++++++++++++++++ src/test/resources/log4j2-test.xml | 2 +- 13 files changed, 318 insertions(+), 278 deletions(-) delete mode 100644 src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java create mode 100644 src/main/java/io/lettuce/core/rebind/RebindCompletedEvent.java rename src/main/java/io/lettuce/core/{proactive/ProactiveRebindEvent.java => rebind/RebindInitiatedEvent.java} (83%) create mode 100644 src/main/java/io/lettuce/core/rebind/RebindState.java delete mode 100644 src/test/java/biz/paluch/redis/extensibility/LettuceProactiveDemo.java create mode 100644 src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java diff --git a/src/main/java/io/lettuce/core/TimeoutOptions.java b/src/main/java/io/lettuce/core/TimeoutOptions.java index 1ee029fe7..9502de5b3 100644 --- a/src/main/java/io/lettuce/core/TimeoutOptions.java +++ b/src/main/java/io/lettuce/core/TimeoutOptions.java @@ -121,7 +121,7 @@ public Builder timeoutCommands(boolean enabled) { * Enable proactive timeout relaxing. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}. *

* If the Redis server supports this, the client could listen to notifications that the current endpoint is about to go - * down as part of some maintenance or failover activity. In such cases the driver could extend the existing timeout + * down as part of some maintenance activity, for example. In such cases, the driver could extend the existing timeout * settings for existing commands to make sure they do not time out during this process either as part of the offline * buffer or while waiting for a reply. * diff --git a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java b/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java deleted file mode 100644 index d7f324da2..000000000 --- a/src/main/java/io/lettuce/core/proactive/ProactiveWatchdogCommandHandler.java +++ /dev/null @@ -1,138 +0,0 @@ -package io.lettuce.core.proactive; - -import io.lettuce.core.api.push.PushListener; -import io.lettuce.core.api.push.PushMessage; -import io.lettuce.core.codec.RedisCodec; -import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.protocol.CommandArgs; -import io.lettuce.core.protocol.CommandExpiryWriter; -import io.lettuce.core.protocol.CommandHandler; -import io.lettuce.core.protocol.ConnectionWatchdog; -import io.lettuce.core.pubsub.PubSubCommandHandler; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelPipeline; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -/** - * - * @param Key type. - * @param Value type. - * @author Will Glozer - */ -@ChannelHandler.Sharable -public class ProactiveWatchdogCommandHandler extends ChannelInboundHandlerAdapter implements PushListener { - - private static final Logger logger = Logger.getLogger(ProactiveWatchdogCommandHandler.class.getName()); - - private static final String REBIND_CHANNEL = "__rebind"; - - private ChannelHandlerContext context; - - private ConnectionWatchdog watchdog; - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - logger.info("Channel {} active"); - - ChannelPipeline pipeline = ctx.channel().pipeline(); - CommandHandler command = pipeline.get(CommandHandler.class); - watchdog = pipeline.get(ConnectionWatchdog.class); - context = ctx; - - PubSubCommandHandler commandHandler = pipeline.get(PubSubCommandHandler.class); - commandHandler.getEndpoint().addListener(this); - - // Command rebind = - // new Command<>(SUBSCRIBE, - // new PubSubOutput<>(StringCodec.UTF8), - // new PubSubCommandArgs<>(StringCodec.UTF8).addKey(REBIND_CHANNEL)); - // - // if (command != null) { - // ctx.write(rebind); - // } - - super.channelActive(ctx); - } - - @Override - public void onPushMessage(PushMessage message) { - if (!message.getType().equals("message")) { - return; - } - - List content = message.getContent().stream() - .map(ez -> ez instanceof ByteBuffer ? StringCodec.UTF8.decodeKey((ByteBuffer) ez) : ez.toString()) - .collect(Collectors.toList()); - - if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { - logger.info("Attempt to rebind to new endpoint '" + getRemoteAddress(content) + "'"); - - // relax the command timeouts of the existing commands - CommandExpiryWriter.relaxTimeoutsGlobally = true; - - // disconnect the current channel and fire a re-bind event with the new address - context.fireUserEventTriggered(new ProactiveRebindEvent(getRemoteAddress(content))); - context.channel().close().awaitUninterruptibly(); - } - } - - private SocketAddress getRemoteAddress(List messageContents) { - - final String payload = messageContents.stream().filter(c -> c.contains("to_ep")).findFirst() - .orElseThrow(() -> new IllegalArgumentException("to_ep not found")); - - final String toEndpoint = Arrays.stream(payload.split(";")).filter(c -> c.contains("to_ep")).findFirst() - .orElseThrow(() -> new IllegalArgumentException("to_ep not found")); - - final String addressAndPort = toEndpoint.split("=")[1]; - final String address = addressAndPort.split(":")[0]; - final int port = Integer.parseInt(addressAndPort.split(":")[1]); - - return new InetSocketAddress(address, port); - } - - /** - * - * Command args for Pub/Sub connections. This implementation hides the first key as PubSub keys are not keys from the - * key-space. - * - * @author Mark Paluch - * @since 4.2 - */ - static class PubSubCommandArgs extends CommandArgs { - - /** - * @param codec Codec used to encode/decode keys and values, must not be {@code null}. - */ - public PubSubCommandArgs(RedisCodec codec) { - super(codec); - } - - /** - * - * @return always {@code null}. - */ - @Override - public ByteBuffer getFirstEncodedKey() { - return null; - } - - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - logger.info("Channel inactive"); - - super.channelInactive(ctx); - } - -} diff --git a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java index 0070b5ada..36a5dfeb7 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java +++ b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java @@ -32,9 +32,12 @@ import io.lettuce.core.TimeoutOptions; import io.lettuce.core.internal.ExceptionFactory; import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.rebind.RebindCompletedEvent; +import io.lettuce.core.rebind.RebindInitiatedEvent; import io.lettuce.core.resource.ClientResources; import io.netty.util.Timeout; import io.netty.util.Timer; +import reactor.core.Disposable; /** * Extension to {@link RedisChannelWriter} that expires commands. Command timeout starts at the time the command is written @@ -61,11 +64,13 @@ public class CommandExpiryWriter implements RedisChannelWriter { private final Duration relaxedTimeout; - private volatile long timeout = -1; + private final Disposable rebindStatedListener; + + private final Disposable rebindEndedListener; - public static boolean relaxTimeoutsGlobally = false; + private volatile long timeout = -1; - public static boolean enableProactive = false; + private volatile boolean relaxTimeoutsGlobally = false; /** * Create a new {@link CommandExpiryWriter}. @@ -88,6 +93,18 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti this.timeUnit = source.getTimeUnit(); this.executorService = clientResources.eventExecutorGroup(); this.timer = clientResources.timer(); + + this.rebindStatedListener = clientResources.eventBus() + .get().filter(e -> e instanceof RebindInitiatedEvent) + .subscribe(e -> { + this.relaxTimeoutsGlobally = true; + }); + + this.rebindEndedListener = clientResources.eventBus() + .get().filter(e -> e instanceof RebindCompletedEvent) + .subscribe(e -> { + this.relaxTimeoutsGlobally = false; + }); } /** @@ -151,6 +168,8 @@ public void flushCommands() { @Override public void close() { + this.rebindStatedListener.dispose(); + this.rebindEndedListener.dispose(); delegate.close(); } @@ -161,6 +180,7 @@ public CompletableFuture closeAsync() { @Override public void reset() { + this.relaxTimeoutsGlobally = false; delegate.reset(); } @@ -188,10 +208,10 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS if (!command.isDone()) { executors.submit(() -> { if (shouldRelaxTimeoutsGlobally()) { + relaxedAttempt(command, executors); + } else { command.completeExceptionally( ExceptionFactory.createTimeoutException(command.getType().toString(), Duration.ofNanos(timeUnit.toNanos(timeout)))); - } else { - relaxedAttempt(command, executors); } }); @@ -205,7 +225,7 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS } public boolean shouldRelaxTimeoutsGlobally() { - return relaxTimeoutsGlobally && relaxedTimeout.isNegative(); + return relaxTimeoutsGlobally && !relaxedTimeout.isNegative(); } // when relaxing the timeouts - instead of expiring immediately, we will start a new timer with 10 seconds diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 59aee61e0..2ddcc3271 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -20,11 +20,13 @@ package io.lettuce.core.protocol; import static io.lettuce.core.ConnectionEvents.*; +import static io.lettuce.core.protocol.ConnectionWatchdog.REBIND_ATTRIBUTE; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.LocalTime; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -49,6 +51,8 @@ import io.lettuce.core.metrics.CommandLatencyRecorder; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.PushOutput; +import io.lettuce.core.rebind.RebindCompletedEvent; +import io.lettuce.core.rebind.RebindState; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.tracing.TraceContext; import io.lettuce.core.tracing.TraceContextProvider; @@ -625,6 +629,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { + final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE) + && ctx.channel().attr(REBIND_ATTRIBUTE).get() != null + && ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED); + if (debugEnabled && rebindInProgress) { + logger.debug("{} Processing command while rebind is in progress, stack has {} more to process", + logPrefix(), stack.size()); + } if (pristine) { @@ -711,6 +722,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup } } + if (rebindInProgress && stack.isEmpty()){ + logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now()); + ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED); + clientResources.eventBus().publish(new RebindCompletedEvent()); + } + decodeBufferPolicy.afterDecoding(buffer); } diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 06794cb40..45968bece 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -19,13 +19,25 @@ */ package io.lettuce.core.protocol; +import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.time.Duration; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - -import io.lettuce.core.proactive.ProactiveRebindEvent; +import java.util.stream.Collectors; + +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.pubsub.PubSubCommandHandler; +import io.lettuce.core.rebind.RebindInitiatedEvent; +import io.lettuce.core.rebind.RebindState; +import io.netty.util.AttributeKey; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import io.lettuce.core.ClientOptions; @@ -60,7 +72,9 @@ * @author Koji Lin */ @ChannelHandler.Sharable -public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { +public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements PushListener { + + public static final AttributeKey REBIND_ATTRIBUTE = AttributeKey.newInstance("rebindAddress"); private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS); @@ -181,8 +195,6 @@ void prepareClose() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - CommandHandler commandHandler = ctx.pipeline().get(CommandHandler.class); - reconnectSchedulerSync.set(false); channel = ctx.channel(); reconnectScheduleTimeout = null; @@ -193,6 +205,12 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { logPrefix = null; logger.debug("{} channelActive()", logPrefix()); + // FIXME (start) To be removed once the server starts sending the rebind message without needing a subscription. + ChannelPipeline pipeline = ctx.channel().pipeline(); + PubSubCommandHandler commandHandler = pipeline.get(PubSubCommandHandler.class); + commandHandler.getEndpoint().addListener(this); + // FIXME (end) + super.channelActive(ctx); } @@ -216,17 +234,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof ProactiveRebindEvent) { - ProactiveRebindEvent event = (ProactiveRebindEvent) evt; - logger.info("Proactive rebind to {}", event.getRemoteAddress()); - rebindAddress = event.getRemoteAddress(); - } - - super.userEventTriggered(ctx, evt); - } - /** * Enable {@link ConnectionWatchdog} to listen for disconnected events. */ @@ -289,7 +296,6 @@ public void scheduleReconnect() { * the same handler instances contained in the old channel's pipeline. * * @param attempt attempt counter - * * @throws Exception when reconnection fails. */ public void run(int attempt) throws Exception { @@ -302,7 +308,6 @@ public void run(int attempt) throws Exception { * * @param attempt attempt counter. * @param delay retry delay. - * * @throws Exception when reconnection fails. */ private void run(int attempt, Duration delay) throws Exception { @@ -344,14 +349,14 @@ private void run(int attempt, Duration delay) throws Exception { eventBus.publish(new ReconnectAttemptEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, attempt, delay)); logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress); - Tuple2, CompletableFuture> tuple = rebindAddress == null - ? reconnectionHandler.reconnect() - : reconnectionHandler.reconnect(rebindAddress); + Tuple2, CompletableFuture> tuple = + rebindAddress == null ? reconnectionHandler.reconnect() : reconnectionHandler.reconnect(rebindAddress); CompletableFuture future = tuple.getT1(); future.whenComplete((c, t) -> { if (c != null && t == null) { + this.channel.attr(REBIND_ATTRIBUTE).set(null); return; } @@ -452,4 +457,60 @@ private String logPrefix() { return logPrefix = buffer; } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + + if (ctx.channel() != null && ctx.channel().isActive() && ctx.channel().hasAttr(REBIND_ATTRIBUTE) + && ctx.channel().attr(REBIND_ATTRIBUTE).get() == RebindState.COMPLETED) { + logger.debug("Disconnecting at {}", LocalTime.now()); + ctx.channel().close().awaitUninterruptibly(); + } + + super.channelReadComplete(ctx); + } + + @Override + public void onPushMessage(PushMessage message) { + if (!message.getType().equals("message")) { + return; + } + + List content = message.getContent().stream() + .map(ez -> ez instanceof ByteBuffer ? StringCodec.UTF8.decodeKey((ByteBuffer) ez) : ez.toString()) + .collect(Collectors.toList()); + + if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { + logger.info("Attempting to rebind to new endpoint '{}'", getRemoteAddress(content)); + + channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED); + this.rebindAddress = getRemoteAddress(content); + + ChannelPipeline pipeline = channel.pipeline(); + PubSubCommandHandler commandHandler = pipeline.get(PubSubCommandHandler.class); + if (commandHandler.getStack().isEmpty()) { + channel.close().awaitUninterruptibly(); + channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED); + } else { + // FIXME this is currently only used to notify in an awkward way the ChannelExpiryWriter + RebindInitiatedEvent event = new RebindInitiatedEvent(getRemoteAddress(content)); + eventBus.publish(event); + } + } + } + + private SocketAddress getRemoteAddress(List messageContents) { + + final String payload = messageContents.stream().filter(c -> c.contains("to_ep")).findFirst() + .orElse("type=rebind;from_ep=localhost:6479;to_ep=localhost:6379;until_s=10"); + + final String toEndpoint = Arrays.stream(payload.split(";")).filter(c -> c.contains("to_ep")).findFirst() + .orElse("to_ep=localhost:6479"); + + final String addressAndPort = toEndpoint.split("=")[1]; + final String address = addressAndPort.split(":")[0]; + final int port = Integer.parseInt(addressAndPort.split(":")[1]); + + return new InetSocketAddress(address, port); + } + } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 55d0be7d5..7e059eec2 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -20,6 +20,7 @@ package io.lettuce.core.protocol; import static io.lettuce.core.protocol.CommandHandler.*; +import static io.lettuce.core.protocol.ConnectionWatchdog.REBIND_ATTRIBUTE; import java.io.IOException; import java.nio.channels.ClosedChannelException; @@ -45,6 +46,7 @@ import io.lettuce.core.internal.Futures; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceFactories; +import io.lettuce.core.rebind.RebindState; import io.lettuce.core.resource.ClientResources; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -68,11 +70,11 @@ public class DefaultEndpoint implements RedisChannelWriter, Endpoint, PushHandle private static final AtomicLong ENDPOINT_COUNTER = new AtomicLong(); - private static final AtomicIntegerFieldUpdater QUEUE_SIZE = AtomicIntegerFieldUpdater - .newUpdater(DefaultEndpoint.class, "queueSize"); + private static final AtomicIntegerFieldUpdater QUEUE_SIZE = AtomicIntegerFieldUpdater.newUpdater( + DefaultEndpoint.class, "queueSize"); - private static final AtomicIntegerFieldUpdater STATUS = AtomicIntegerFieldUpdater - .newUpdater(DefaultEndpoint.class, "status"); + private static final AtomicIntegerFieldUpdater STATUS = AtomicIntegerFieldUpdater.newUpdater( + DefaultEndpoint.class, "status"); private static final int ST_OPEN = 0; @@ -812,7 +814,16 @@ private void cancelCommands(String message, Iterable proactiveHandler = new ProactiveWatchdogCommandHandler<>(); - - ClientResources resources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() { - - @Override - public void afterChannelInitialized(Channel channel) { - channel.pipeline().addFirst(proactiveHandler); - } - - }).build(); - - TimeoutOptions timeoutOpts = TimeoutOptions.builder().timeoutCommands().fixedTimeout(Duration.ofMillis(1)).build(); - ClientOptions options = ClientOptions.builder().timeoutOptions(timeoutOpts).build(); - - CommandExpiryWriter.enableProactive = true; - - RedisClient redisClient = RedisClient.create(resources, RedisURI.Builder.redis("localhost", 6379).build()); - redisClient.setOptions(options); - - // Monitor connection events - EventBus eventBus = redisClient.getResources().eventBus(); - eventBus.get().subscribe(e -> { - logger.info(">>> Connection event: " + e); - }); - - // Subscribe to __rebind channel - StatefulRedisPubSubConnection redis = redisClient.connectPubSub(); - RedisPubSubAsyncCommands commands = redis.async(); - commands.subscribe("__rebind").get(); - - // Used to stop the demo by sending the following command: - // publish __rebind "type=stop_demo" - Control control = new Control(); - redis.addListener(control); - - // Used to initiate the proactive rebind by sending the following command - // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10" - - // NO LONGER NEEDED, HANDLER REGISTERS ITSELF - // redis.addListener(proactiveHandler); - - while (control.shouldContinue) { - try { - logger.info("Sending PING"); - logger.info(commands.ping().get()); - Thread.sleep(2000); - } catch (InterruptedException e) { - logger.severe("InterruptedException: " + e.getMessage()); - } - } - - redis.close(); - redisClient.shutdown(); - } - - static class Control implements PushListener { - - public boolean shouldContinue = true; - - @Override - public void onPushMessage(PushMessage message) { - List content = message.getContent().stream().map(ez -> StringCodec.UTF8.decodeKey((ByteBuffer) ez)) - .collect(Collectors.toList()); - - if (content.stream().anyMatch(c -> c.equals("type=stop_demo"))) { - logger.info("Control received message to stop the demo"); - shouldContinue = false; - } - } - - } - -} diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java new file mode 100644 index 000000000..0042207d7 --- /dev/null +++ b/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java @@ -0,0 +1,130 @@ +package biz.paluch.redis.extensibility; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.event.EventBus; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +public class LettuceRebindDemo { + + public static final Logger logger = Logger.getLogger(LettuceRebindDemo.class.getName()); + + public static final String KEY = "rebind:" + UUID.randomUUID().getLeastSignificantBits(); + + public static void main(String[] args) throws ExecutionException, InterruptedException { + + // NEW! No need for a custom handler + + TimeoutOptions timeoutOpts = TimeoutOptions.builder() + .timeoutCommands() + .fixedTimeout(Duration.ofMillis(50)) + // NEW! control that during timeouts we need to relax the timeouts + .proactiveTimeoutsRelaxing(Duration.ofMillis(500)) + .build(); + ClientOptions options = ClientOptions.builder().timeoutOptions(timeoutOpts).build(); + + RedisClient redisClient = RedisClient.create(RedisURI.Builder.redis("localhost", 6379).build()); + redisClient.setOptions(options); + + // Monitor connection events + EventBus eventBus = redisClient.getResources().eventBus(); + eventBus.get().subscribe(e -> { + logger.info(">>> Event bus received: {} " + e); + }); + + // Subscribe to __rebind channel (REMOVE ONCE WE START RECEIVING THESE WITHOUT SUBSCRIPTION) + StatefulRedisPubSubConnection redis = redisClient.connectPubSub(); + RedisPubSubAsyncCommands commands = redis.async(); + commands.subscribe("__rebind").get(); + + // Used to stop the demo by sending the following command: + // publish __rebind "type=stop_demo" + Control control = new Control(); + redis.addListener(control); + + // Used to initiate the proactive rebind by sending the following command + // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10" + + ExecutorService executorService = new ThreadPoolExecutor( + 5, // core pool size + 10, // maximum pool size + 60, TimeUnit.SECONDS, // idle thread keep-alive time + new ArrayBlockingQueue<>(20), // work queue size + new ThreadPoolExecutor.DiscardPolicy()); // rejection policy + + try { + while (control.shouldContinue) { + executorService.execute(new DemoWorker(commands)); + Thread.sleep(1); + } + + if(executorService.awaitTermination(5, TimeUnit.SECONDS)){ + logger.info("Executor service terminated"); + } else { + logger.warning("Executor service did not terminate in the specified time"); + } + + } finally { + executorService.shutdownNow(); + } + + redis.close(); + redisClient.shutdown(); + } + + static class DemoWorker implements Runnable { + private final RedisPubSubAsyncCommands commands; + + public DemoWorker(RedisPubSubAsyncCommands commands) { + this.commands = commands; + } + + @Override + public void run() { + try { + commands.incr(KEY).get(); + } catch (InterruptedException | ExecutionException e) { + logger.severe("ExecutionException: " + e.getMessage()); + } + } + } + + static class Control implements PushListener { + + public boolean shouldContinue = true; + + @Override + public void onPushMessage(PushMessage message) { + List content = message.getContent().stream() + .filter(ez -> ez instanceof ByteBuffer) + .map(ez -> StringCodec.UTF8.decodeKey((ByteBuffer) ez)) + .collect(Collectors.toList()); + + if (content.stream().anyMatch(c -> c.contains("type=stop_demo"))) { + logger.info("Control received message to stop the demo"); + shouldContinue = false; + } + } + + } + +} diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml index 981918b55..0276ab944 100644 --- a/src/test/resources/log4j2-test.xml +++ b/src/test/resources/log4j2-test.xml @@ -11,7 +11,7 @@ - +