6
6
using System . Collections . Generic ;
7
7
using System . IO ;
8
8
using System . Net ;
9
+ using System . Reflection ;
10
+ using System . Threading ;
9
11
using System . Threading . Tasks ;
10
12
using StackExchange . Redis ;
11
13
using StackExchange . Redis . Profiling ;
@@ -73,11 +75,11 @@ public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved
73
75
remove { }
74
76
}
75
77
76
- private readonly ISubscriber _subscriber ;
78
+ private readonly TestRedisServer _server ;
77
79
78
80
public TestConnectionMultiplexer ( TestRedisServer server )
79
81
{
80
- _subscriber = new TestSubscriber ( server ) ;
82
+ _server = server ;
81
83
}
82
84
83
85
public void BeginProfiling ( object forContext )
@@ -167,7 +169,7 @@ public string GetStormLog()
167
169
168
170
public ISubscriber GetSubscriber ( object asyncState = null )
169
171
{
170
- return _subscriber ;
172
+ return new TestSubscriber ( _server ) ;
171
173
}
172
174
173
175
public int HashSlot ( RedisKey key )
@@ -223,14 +225,14 @@ public void ExportConfiguration(Stream destination, ExportOptions options = (Exp
223
225
224
226
public class TestRedisServer
225
227
{
226
- private readonly ConcurrentDictionary < RedisChannel , List < Action < RedisChannel , RedisValue > > > _subscriptions =
227
- new ConcurrentDictionary < RedisChannel , List < Action < RedisChannel , RedisValue > > > ( ) ;
228
+ private readonly ConcurrentDictionary < RedisChannel , List < ( int , Action < RedisChannel , RedisValue > ) > > _subscriptions =
229
+ new ConcurrentDictionary < RedisChannel , List < ( int , Action < RedisChannel , RedisValue > ) > > ( ) ;
228
230
229
231
public long Publish ( RedisChannel channel , RedisValue message , CommandFlags flags = CommandFlags . None )
230
232
{
231
233
if ( _subscriptions . TryGetValue ( channel , out var handlers ) )
232
234
{
233
- foreach ( var handler in handlers )
235
+ foreach ( var ( _ , handler ) in handlers )
234
236
{
235
237
handler ( channel , message ) ;
236
238
}
@@ -239,26 +241,37 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
239
241
return handlers != null ? handlers . Count : 0 ;
240
242
}
241
243
242
- public void Subscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler , CommandFlags flags = CommandFlags . None )
244
+ public void Subscribe ( ChannelMessageQueue messageQueue , int subscriberId , CommandFlags flags = CommandFlags . None )
243
245
{
244
- _subscriptions . AddOrUpdate ( channel , _ => new List < Action < RedisChannel , RedisValue >> { handler } , ( _ , list ) =>
246
+ Action < RedisChannel , RedisValue > handler = ( channel , value ) =>
247
+ {
248
+ // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
249
+ // ChannelMessageQueue isn't mockable currently, this works around that by using private reflection
250
+ typeof ( ChannelMessageQueue ) . GetMethod ( "Write" , BindingFlags . NonPublic | BindingFlags . Instance )
251
+ . Invoke ( messageQueue , new object [ ] { channel , value } ) ;
252
+ } ;
253
+
254
+ _subscriptions . AddOrUpdate ( messageQueue . Channel , _ => new List < ( int , Action < RedisChannel , RedisValue > ) > { ( subscriberId , handler ) } , ( _ , list ) =>
245
255
{
246
- list . Add ( handler ) ;
256
+ list . Add ( ( subscriberId , handler ) ) ;
247
257
return list ;
248
258
} ) ;
249
259
}
250
260
251
- public void Unsubscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler = null , CommandFlags flags = CommandFlags . None )
261
+ public void Unsubscribe ( RedisChannel channel , int subscriberId , CommandFlags flags = CommandFlags . None )
252
262
{
253
263
if ( _subscriptions . TryGetValue ( channel , out var list ) )
254
264
{
255
- list . Remove ( handler ) ;
265
+ list . RemoveAll ( ( item ) => item . Item1 == subscriberId ) ;
256
266
}
257
267
}
258
268
}
259
269
260
270
public class TestSubscriber : ISubscriber
261
271
{
272
+ private static int StaticId ;
273
+
274
+ private readonly int _id ;
262
275
private readonly TestRedisServer _server ;
263
276
public ConnectionMultiplexer Multiplexer => throw new NotImplementedException ( ) ;
264
277
@@ -267,6 +280,7 @@ public class TestSubscriber : ISubscriber
267
280
public TestSubscriber ( TestRedisServer server )
268
281
{
269
282
_server = server ;
283
+ _id = Interlocked . Increment ( ref StaticId ) ;
270
284
}
271
285
272
286
public EndPoint IdentifyEndpoint ( RedisChannel channel , CommandFlags flags = CommandFlags . None )
@@ -307,7 +321,7 @@ public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, C
307
321
308
322
public void Subscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler , CommandFlags flags = CommandFlags . None )
309
323
{
310
- _server . Subscribe ( channel , handler , flags ) ;
324
+ throw new NotImplementedException ( ) ;
311
325
}
312
326
313
327
public Task SubscribeAsync ( RedisChannel channel , Action < RedisChannel , RedisValue > handler , CommandFlags flags = CommandFlags . None )
@@ -328,7 +342,7 @@ public bool TryWait(Task task)
328
342
329
343
public void Unsubscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler = null , CommandFlags flags = CommandFlags . None )
330
344
{
331
- _server . Unsubscribe ( channel , handler , flags ) ;
345
+ _server . Unsubscribe ( channel , _id , flags ) ;
332
346
}
333
347
334
348
public void UnsubscribeAll ( CommandFlags flags = CommandFlags . None )
@@ -364,7 +378,15 @@ public void WaitAll(params Task[] tasks)
364
378
365
379
public ChannelMessageQueue Subscribe ( RedisChannel channel , CommandFlags flags = CommandFlags . None )
366
380
{
367
- throw new NotImplementedException ( ) ;
381
+ // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
382
+ var redisSubscriberType = typeof ( RedisChannel ) . Assembly . GetType ( "StackExchange.Redis.RedisSubscriber" ) ;
383
+ var ctor = typeof ( ChannelMessageQueue ) . GetConstructor ( BindingFlags . Instance | BindingFlags . NonPublic ,
384
+ binder : null ,
385
+ new Type [ ] { typeof ( RedisChannel ) . MakeByRefType ( ) , redisSubscriberType } , modifiers : null ) ;
386
+
387
+ var queue = ( ChannelMessageQueue ) ctor . Invoke ( new object [ ] { channel , null } ) ;
388
+ _server . Subscribe ( queue , _id ) ;
389
+ return queue ;
368
390
}
369
391
370
392
public Task < ChannelMessageQueue > SubscribeAsync ( RedisChannel channel , CommandFlags flags = CommandFlags . None )
0 commit comments