Skip to content

Commit f2fc1ee

Browse files
committed
test: Add integration tests for sharded pub/sub auto-resubscription
1 parent 429f61c commit f2fc1ee

File tree

2 files changed

+71
-48
lines changed

2 files changed

+71
-48
lines changed

src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,4 +611,68 @@ void echoAllowedInSubscriptionState() {
611611
pubsub.unsubscribe(channel);
612612
}
613613

614+
@Test
615+
void autoResubscribeOnShardChannelUnsubscribed() throws Exception {
616+
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
617+
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();
618+
619+
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
620+
621+
@Override
622+
public void ssubscribed(String channel, long count) {
623+
subscribedChannels.add(channel);
624+
}
625+
626+
@Override
627+
public void sunsubscribed(String channel, long count) {
628+
unsubscribedChannels.add(channel);
629+
}
630+
631+
};
632+
633+
pubsub.getStatefulConnection().addListener(listener);
634+
pubsub.ssubscribe(shardChannel);
635+
636+
assertThat(subscribedChannels.take()).isEqualTo(shardChannel);
637+
638+
pubsub.sunsubscribe(shardChannel);
639+
640+
assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);
641+
assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();
642+
643+
pubsub.getStatefulConnection().removeListener(listener);
644+
}
645+
646+
@Test
647+
void noAutoResubscribeOnIntentionalUnsubscribe() throws Exception {
648+
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
649+
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();
650+
651+
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
652+
653+
@Override
654+
public void ssubscribed(String channel, long count) {
655+
subscribedChannels.add(channel);
656+
}
657+
658+
@Override
659+
public void sunsubscribed(String channel, long count) {
660+
unsubscribedChannels.add(channel);
661+
}
662+
663+
};
664+
665+
pubsub.getStatefulConnection().addListener(listener);
666+
pubsub.ssubscribe(shardChannel);
667+
668+
assertThat(subscribedChannels.take()).isEqualTo(shardChannel);
669+
670+
pubsub.sunsubscribe(shardChannel);
671+
assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);
672+
673+
assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();
674+
675+
pubsub.getStatefulConnection().removeListener(listener);
676+
}
677+
614678
}

src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java

Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717
import static org.junit.jupiter.api.Assertions.assertTrue;
1818
import static org.mockito.Mockito.*;
1919

20-
import io.lettuce.core.protocol.AsyncCommand;
2120
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
22-
import io.lettuce.core.pubsub.PubSubOutput;
23-
import io.lettuce.core.pubsub.RedisPubSubAdapter;
2421

2522
import java.lang.reflect.Field;
2623
import java.nio.ByteBuffer;
2724
import java.time.Duration;
2825
import java.util.Arrays;
2926
import java.util.HashSet;
3027
import java.util.List;
28+
import java.util.Set;
3129

3230
@Tag(UNIT_TEST)
3331
class StatefulRedisPubSubConnectionImplUnitTests {
@@ -132,66 +130,27 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() {
132130

133131
@Test
134132
void autoResubscribeListenerIsRegistered() {
135-
// Verify that the connection has the markIntentionalUnsubscribe method
136-
// This confirms the auto-resubscribe functionality is available
137133
connection.markIntentionalUnsubscribe("test-channel");
138-
// If no exception is thrown, the method exists and works
139134
assertTrue(true);
140135
}
141136

142137
@Test
143138
void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception {
144-
// Test 1: Intentional unsubscribe should NOT trigger auto-resubscribe
145-
146-
// Create a mock async commands to verify ssubscribe is NOT called
147-
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
148-
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(connection);
149-
when(spyConnection.async()).thenReturn(mockAsync);
150-
151-
// Mark the channel as intentionally unsubscribed
152-
spyConnection.markIntentionalUnsubscribe("test-channel");
153-
154-
// Use reflection to access the private endpoint and trigger sunsubscribed event
155-
PubSubEndpoint<String, String> endpoint = getEndpointViaReflection(spyConnection);
156-
PubSubOutput<String, String> sunsubscribeMessage = createSunsubscribeMessage("test-channel", codec);
157-
endpoint.notifyMessage(sunsubscribeMessage);
139+
connection.markIntentionalUnsubscribe("test-channel");
158140

159-
// Wait a moment for any async processing
160-
Thread.sleep(50);
141+
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(connection);
161142

162-
// Verify that ssubscribe was NOT called (intentional unsubscribe bypassed auto-resubscribe)
163-
verify(mockAsync, never()).ssubscribe("test-channel");
143+
autoResubscribeListener.sunsubscribed("test-channel", 0);
144+
verify(mockedWriter, never()).write(any(io.lettuce.core.protocol.RedisCommand.class));
164145
}
165146

166147
@Test
167148
void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception {
168-
// Test 2: Unintentional unsubscribe (from Redis) should trigger auto-resubscribe
149+
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(connection);
169150

170-
// Create a fresh connection with a mock async
171-
PubSubEndpoint<String, String> mockEndpoint = mock(PubSubEndpoint.class);
172-
StatefulRedisPubSubConnectionImpl<String, String> testConnection = new StatefulRedisPubSubConnectionImpl<>(mockEndpoint,
173-
mockedWriter, codec, timeout);
174-
175-
// Create a mock async commands to verify ssubscribe IS called
176-
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
177-
@SuppressWarnings("unchecked")
178-
RedisFuture<Void> mockFuture = mock(RedisFuture.class);
179-
when(mockAsync.ssubscribe("test-channel")).thenReturn(mockFuture);
180-
181-
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(testConnection);
182-
when(spyConnection.async()).thenReturn(mockAsync);
183-
184-
// Get the auto-resubscribe listener directly and trigger it
185-
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(spyConnection);
186-
187-
// Do NOT mark as intentional - simulate Redis server sunsubscribe during slot movement
188151
autoResubscribeListener.sunsubscribed("test-channel", 0);
189152

190-
// Wait a moment for async processing
191-
Thread.sleep(50);
192-
193-
// Verify that ssubscribe WAS called (auto-resubscribe triggered)
194-
verify(mockAsync, times(1)).ssubscribe("test-channel");
153+
verify(mockedWriter, times(1)).write(any(io.lettuce.core.protocol.RedisCommand.class));
195154
}
196155

197156
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)