diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index 44108e4942..1cc6512e1c 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -114,6 +114,14 @@ public RedisFuture ssubscribe(K... channels) { @Override @SuppressWarnings("unchecked") public RedisFuture sunsubscribe(K... channels) { + // Mark these channels as intentionally unsubscribed to prevent auto-resubscription + StatefulRedisPubSubConnection connection = getStatefulConnection(); + if (connection instanceof StatefulRedisPubSubConnectionImpl) { + StatefulRedisPubSubConnectionImpl impl = (StatefulRedisPubSubConnectionImpl) connection; + for (K channel : channels) { + impl.markIntentionalUnsubscribe(channel); + } + } return (RedisFuture) dispatch(commandBuilder.sunsubscribe(channels)); } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index 54774b7269..a21d39015f 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -171,6 +171,14 @@ public Mono ssubscribe(K... shardChannels) { @Override public Mono sunsubscribe(K... shardChannels) { + // Mark these channels as intentionally unsubscribed to prevent auto-resubscription + StatefulRedisPubSubConnection connection = getStatefulConnection(); + if (connection instanceof StatefulRedisPubSubConnectionImpl) { + StatefulRedisPubSubConnectionImpl impl = (StatefulRedisPubSubConnectionImpl) connection; + for (K channel : shardChannels) { + impl.markIntentionalUnsubscribe(channel); + } + } return createFlux(() -> commandBuilder.sunsubscribe(shardChannels)).then(); } diff --git a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java index 91ef777f5a..66f1cbf170 100644 --- a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.RedisCommandExecutionException; @@ -53,6 +55,8 @@ public class StatefulRedisPubSubConnectionImpl extends StatefulRedisConnec private final PubSubEndpoint endpoint; + private final ShardedPubSubAutoResubscribeListener autoResubscribeListener; + /** * Initialize a new connection. * @@ -67,6 +71,10 @@ public StatefulRedisPubSubConnectionImpl(PubSubEndpoint endpoint, RedisCha super(writer, endpoint, codec, timeout, DEFAULT_JSON_PARSER); this.endpoint = endpoint; endpoint.setConnectionState(getConnectionState()); + + // Add internal listener for auto-resubscription on sunsubscribe events + this.autoResubscribeListener = new ShardedPubSubAutoResubscribeListener(); + endpoint.addListener(autoResubscribeListener); } /** @@ -163,4 +171,48 @@ public void activated() { } } + /** + * Internal listener that handles automatic resubscription for sharded pub/sub channels when they are unsubscribed due to + * slot rebalancing. + */ + private class ShardedPubSubAutoResubscribeListener extends RedisPubSubAdapter { + + private final Set intentionalUnsubscriptions = ConcurrentHashMap.newKeySet(); + + @Override + public void sunsubscribed(K shardChannel, long count) { + if (intentionalUnsubscriptions.remove(shardChannel)) { + return; + } + + if (shardChannel != null) { + InternalLoggerFactory.getInstance(getClass()).debug( + "Triggering auto-resubscribe to generate MovedRedirectionEvent for shard channel: {}", shardChannel); + RedisFuture resubscribeResult = async().ssubscribe(shardChannel); + resubscribeResult.exceptionally(throwable -> { + InternalLoggerFactory.getInstance(getClass()).debug( + "Auto-resubscribe triggered cluster redirection for shard channel {}: {}", shardChannel, + throwable.getMessage()); + return null; + }); + } + } + + /** + * Mark a channel as intentionally unsubscribed to prevent auto-resubscription + */ + public void markIntentionalUnsubscribe(K shardChannel) { + intentionalUnsubscriptions.add(shardChannel); + } + + } + + /** + * Mark a channel as intentionally unsubscribed to prevent auto-resubscription. This method is called by + * RedisPubSubAsyncCommandsImpl when sunsubscribe is explicitly called. + */ + public void markIntentionalUnsubscribe(K shardChannel) { + autoResubscribeListener.markIntentionalUnsubscribe(shardChannel); + } + } diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java index 3472660281..a657ad35a3 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java @@ -611,4 +611,68 @@ void echoAllowedInSubscriptionState() { pubsub.unsubscribe(channel); } + @Test + void autoResubscribeOnShardChannelUnsubscribed() throws Exception { + final BlockingQueue subscribedChannels = LettuceFactories.newBlockingQueue(); + final BlockingQueue unsubscribedChannels = LettuceFactories.newBlockingQueue(); + + RedisPubSubListener listener = new RedisPubSubAdapter() { + + @Override + public void ssubscribed(String channel, long count) { + subscribedChannels.add(channel); + } + + @Override + public void sunsubscribed(String channel, long count) { + unsubscribedChannels.add(channel); + } + + }; + + pubsub.getStatefulConnection().addListener(listener); + pubsub.ssubscribe(shardChannel); + + assertThat(subscribedChannels.take()).isEqualTo(shardChannel); + + pubsub.sunsubscribe(shardChannel); + + assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel); + assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull(); + + pubsub.getStatefulConnection().removeListener(listener); + } + + @Test + void noAutoResubscribeOnIntentionalUnsubscribe() throws Exception { + final BlockingQueue subscribedChannels = LettuceFactories.newBlockingQueue(); + final BlockingQueue unsubscribedChannels = LettuceFactories.newBlockingQueue(); + + RedisPubSubListener listener = new RedisPubSubAdapter() { + + @Override + public void ssubscribed(String channel, long count) { + subscribedChannels.add(channel); + } + + @Override + public void sunsubscribed(String channel, long count) { + unsubscribedChannels.add(channel); + } + + }; + + pubsub.getStatefulConnection().addListener(listener); + pubsub.ssubscribe(shardChannel); + + assertThat(subscribedChannels.take()).isEqualTo(shardChannel); + + pubsub.sunsubscribe(shardChannel); + assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel); + + assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull(); + + pubsub.getStatefulConnection().removeListener(listener); + } + } diff --git a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java index 9eb5528244..5228050f20 100644 --- a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java @@ -14,8 +14,10 @@ import static io.lettuce.TestTags.UNIT_TEST; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; +import java.lang.reflect.Field; import java.time.Duration; import java.util.Arrays; import java.util.HashSet; @@ -122,4 +124,36 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() { assertInstanceOf(AsyncCommand.class, subscriptions.get(1)); } + @Test + void autoResubscribeListenerIsRegistered() { + connection.markIntentionalUnsubscribe("test-channel"); + assertTrue(true); + } + + @Test + void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception { + connection.markIntentionalUnsubscribe("test-channel"); + + RedisPubSubListener autoResubscribeListener = getAutoResubscribeListener(connection); + + autoResubscribeListener.sunsubscribed("test-channel", 0); + verify(mockedWriter, never()).write(any(io.lettuce.core.protocol.RedisCommand.class)); + } + + @Test + void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception { + RedisPubSubListener autoResubscribeListener = getAutoResubscribeListener(connection); + + autoResubscribeListener.sunsubscribed("test-channel", 0); + + verify(mockedWriter, times(1)).write(any(io.lettuce.core.protocol.RedisCommand.class)); + } + + @SuppressWarnings("unchecked") + private RedisPubSubListener getAutoResubscribeListener( + StatefulRedisPubSubConnectionImpl connection) throws Exception { + Field listenerField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("autoResubscribeListener"); + listenerField.setAccessible(true); + return (RedisPubSubListener) listenerField.get(connection); + } }