Skip to content

Commit 429f61c

Browse files
committed
test: Add unit tests for sharded pub/sub auto-resubscription
1 parent f887132 commit 429f61c

File tree

1 file changed

+96
-0
lines changed

1 file changed

+96
-0
lines changed

src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,16 @@
1414
import static io.lettuce.TestTags.UNIT_TEST;
1515
import static org.junit.jupiter.api.Assertions.assertEquals;
1616
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
1718
import static org.mockito.Mockito.*;
1819

20+
import io.lettuce.core.protocol.AsyncCommand;
21+
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
22+
import io.lettuce.core.pubsub.PubSubOutput;
23+
import io.lettuce.core.pubsub.RedisPubSubAdapter;
24+
25+
import java.lang.reflect.Field;
26+
import java.nio.ByteBuffer;
1927
import java.time.Duration;
2028
import java.util.Arrays;
2129
import java.util.HashSet;
@@ -122,4 +130,92 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() {
122130
assertInstanceOf(AsyncCommand.class, subscriptions.get(1));
123131
}
124132

133+
@Test
134+
void autoResubscribeListenerIsRegistered() {
135+
// Verify that the connection has the markIntentionalUnsubscribe method
136+
// This confirms the auto-resubscribe functionality is available
137+
connection.markIntentionalUnsubscribe("test-channel");
138+
// If no exception is thrown, the method exists and works
139+
assertTrue(true);
140+
}
141+
142+
@Test
143+
void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception {
144+
// Test 1: Intentional unsubscribe should NOT trigger auto-resubscribe
145+
146+
// Create a mock async commands to verify ssubscribe is NOT called
147+
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
148+
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(connection);
149+
when(spyConnection.async()).thenReturn(mockAsync);
150+
151+
// Mark the channel as intentionally unsubscribed
152+
spyConnection.markIntentionalUnsubscribe("test-channel");
153+
154+
// Use reflection to access the private endpoint and trigger sunsubscribed event
155+
PubSubEndpoint<String, String> endpoint = getEndpointViaReflection(spyConnection);
156+
PubSubOutput<String, String> sunsubscribeMessage = createSunsubscribeMessage("test-channel", codec);
157+
endpoint.notifyMessage(sunsubscribeMessage);
158+
159+
// Wait a moment for any async processing
160+
Thread.sleep(50);
161+
162+
// Verify that ssubscribe was NOT called (intentional unsubscribe bypassed auto-resubscribe)
163+
verify(mockAsync, never()).ssubscribe("test-channel");
164+
}
165+
166+
@Test
167+
void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception {
168+
// Test 2: Unintentional unsubscribe (from Redis) should trigger auto-resubscribe
169+
170+
// Create a fresh connection with a mock async
171+
PubSubEndpoint<String, String> mockEndpoint = mock(PubSubEndpoint.class);
172+
StatefulRedisPubSubConnectionImpl<String, String> testConnection = new StatefulRedisPubSubConnectionImpl<>(mockEndpoint,
173+
mockedWriter, codec, timeout);
174+
175+
// Create a mock async commands to verify ssubscribe IS called
176+
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
177+
@SuppressWarnings("unchecked")
178+
RedisFuture<Void> mockFuture = mock(RedisFuture.class);
179+
when(mockAsync.ssubscribe("test-channel")).thenReturn(mockFuture);
180+
181+
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(testConnection);
182+
when(spyConnection.async()).thenReturn(mockAsync);
183+
184+
// Get the auto-resubscribe listener directly and trigger it
185+
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(spyConnection);
186+
187+
// Do NOT mark as intentional - simulate Redis server sunsubscribe during slot movement
188+
autoResubscribeListener.sunsubscribed("test-channel", 0);
189+
190+
// Wait a moment for async processing
191+
Thread.sleep(50);
192+
193+
// Verify that ssubscribe WAS called (auto-resubscribe triggered)
194+
verify(mockAsync, times(1)).ssubscribe("test-channel");
195+
}
196+
197+
@SuppressWarnings("unchecked")
198+
private PubSubEndpoint<String, String> getEndpointViaReflection(
199+
StatefulRedisPubSubConnectionImpl<String, String> connection) throws Exception {
200+
Field endpointField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("endpoint");
201+
endpointField.setAccessible(true);
202+
return (PubSubEndpoint<String, String>) endpointField.get(connection);
203+
}
204+
205+
@SuppressWarnings("unchecked")
206+
private RedisPubSubListener<String, String> getAutoResubscribeListener(
207+
StatefulRedisPubSubConnectionImpl<String, String> connection) throws Exception {
208+
Field listenerField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("autoResubscribeListener");
209+
listenerField.setAccessible(true);
210+
return (RedisPubSubListener<String, String>) listenerField.get(connection);
211+
}
212+
213+
private PubSubOutput<String, String> createSunsubscribeMessage(String channel, RedisCodec<String, String> codec) {
214+
PubSubOutput<String, String> output = new PubSubOutput<>(codec);
215+
output.set(ByteBuffer.wrap("sunsubscribe".getBytes()));
216+
output.set(ByteBuffer.wrap(channel.getBytes()));
217+
output.set(0L); // count
218+
return output;
219+
}
220+
125221
}

0 commit comments

Comments
 (0)