From 9c39409e57e77e48ba33349fbd4d9d6c2ddde770 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Thu, 19 Nov 2020 09:24:41 -0800 Subject: [PATCH 1/4] Enable Redis unit tests --- .../src/HubLifetimeManagerTestBase.cs | 12 ++--- .../test/RedisHubLifetimeManagerTests.cs | 23 +++++++-- .../test/TestConnectionMultiplexer.cs | 47 +++++++++++++------ 3 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs b/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs index 10699dd8f7ec..ba1908c9a35d 100644 --- a/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs +++ b/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs @@ -47,12 +47,12 @@ public async Task SendAllAsyncWritesToAllConnectionsOutput() await manager.SendAllAsync("Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(client1.TryRead()); + var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); - message = Assert.IsType(client2.TryRead()); + message = Assert.IsType(await client2.ReadAsync().OrTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -80,7 +80,7 @@ public async Task SendAllAsyncDoesNotWriteToDisconnectedConnectionsOutput() await manager.SendAllAsync("Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(client1.TryRead()); + var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -110,7 +110,7 @@ public async Task SendGroupAsyncWritesToAllConnectionsInGroupOutput() await manager.SendGroupAsync("group", "Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(client1.TryRead()); + var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -141,7 +141,7 @@ public async Task SendGroupExceptAsyncDoesNotWriteToExcludedConnections() await manager.SendGroupExceptAsync("group1", "Hello", new object[] { "World" }, new[] { connection2.ConnectionId }).DefaultTimeout(); - var message = Assert.IsType(client1.TryRead()); + var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -166,7 +166,7 @@ public async Task SendConnectionAsyncWritesToConnectionOutput() await manager.SendConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(client.TryRead()); + var message = Assert.IsType(await client.ReadAsync().OrTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); diff --git a/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs b/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs index 02ce85234ecf..9d85edc8fba8 100644 --- a/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs +++ b/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs @@ -14,9 +14,10 @@ namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests { - // Add ScaleoutHubLifetimeManagerTests back after https://github.com/aspnet/SignalR/issues/3088 - public class RedisHubLifetimeManagerTests + public class RedisHubLifetimeManagerTests : ScaleoutHubLifetimeManagerTests { + private TestRedisServer _server; + public class TestObject { public string TestProperty { get; set; } @@ -37,7 +38,7 @@ private RedisHubLifetimeManager CreateLifetimeManager(TestRedisServer serve }, NullLogger.Instance)); } - [Fact(Skip = "https://github.com/aspnet/SignalR/issues/3088")] + [Fact] public async Task CamelCasedJsonIsPreservedAcrossRedisBoundary() { var server = new TestRedisServer(); @@ -80,5 +81,21 @@ public async Task CamelCasedJsonIsPreservedAcrossRedisBoundary() }); } } + + public override TestRedisServer CreateBackplane() + { + return new TestRedisServer(); + } + + public override HubLifetimeManager CreateNewHubLifetimeManager() + { + _server = new TestRedisServer(); + return CreateLifetimeManager(_server); + } + + public override HubLifetimeManager CreateNewHubLifetimeManager(TestRedisServer backplane) + { + return CreateLifetimeManager(backplane); + } } } diff --git a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs index 664316c469a8..e1d2c3f90cef 100644 --- a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs +++ b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs @@ -6,6 +6,8 @@ using System.Collections.Generic; using System.IO; using System.Net; +using System.Reflection; +using System.Threading; using System.Threading.Tasks; using StackExchange.Redis; using StackExchange.Redis.Profiling; @@ -73,11 +75,11 @@ public event EventHandler HashSlotMoved remove { } } - private readonly ISubscriber _subscriber; + private readonly TestRedisServer _server; public TestConnectionMultiplexer(TestRedisServer server) { - _subscriber = new TestSubscriber(server); + _server = server; } public void BeginProfiling(object forContext) @@ -167,7 +169,7 @@ public string GetStormLog() public ISubscriber GetSubscriber(object asyncState = null) { - return _subscriber; + return new TestSubscriber(_server); } public int HashSlot(RedisKey key) @@ -223,14 +225,14 @@ public void ExportConfiguration(Stream destination, ExportOptions options = (Exp public class TestRedisServer { - private readonly ConcurrentDictionary>> _subscriptions = - new ConcurrentDictionary>>(); + private readonly ConcurrentDictionary)>> _subscriptions = + new ConcurrentDictionary)>>(); public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (_subscriptions.TryGetValue(channel, out var handlers)) { - foreach (var handler in handlers) + foreach (var (_, handler) in handlers) { handler(channel, message); } @@ -239,26 +241,35 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags return handlers != null ? handlers.Count : 0; } - public void Subscribe(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) + public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, CommandFlags flags = CommandFlags.None) { - _subscriptions.AddOrUpdate(channel, _ => new List> { handler }, (_, list) => + Action handler = (channel, value) => + { + typeof(ChannelMessageQueue).GetMethod("HandleMessage", BindingFlags.NonPublic | BindingFlags.Instance) + .Invoke(messageQueue, new object[] { channel, value }); + }; + + _subscriptions.AddOrUpdate(messageQueue.Channel, _ => new List<(int, Action)> { (subscriberId, handler) }, (_, list) => { - list.Add(handler); + list.Add((subscriberId, handler)); return list; }); } - public void Unsubscribe(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) + public void Unsubscribe(RedisChannel channel, int subscriberId, CommandFlags flags = CommandFlags.None) { if (_subscriptions.TryGetValue(channel, out var list)) { - list.Remove(handler); + list.RemoveAll((item) => item.Item1 == subscriberId); } } } public class TestSubscriber : ISubscriber { + private static int StaticId; + + private readonly int _id; private readonly TestRedisServer _server; public ConnectionMultiplexer Multiplexer => throw new NotImplementedException(); @@ -267,6 +278,7 @@ public class TestSubscriber : ISubscriber public TestSubscriber(TestRedisServer server) { _server = server; + _id = Interlocked.Increment(ref StaticId); } public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) @@ -307,7 +319,7 @@ public async Task PublishAsync(RedisChannel channel, RedisValue message, C public void Subscribe(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) { - _server.Subscribe(channel, handler, flags); + throw new NotImplementedException(); } public Task SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) @@ -328,7 +340,7 @@ public bool TryWait(Task task) public void Unsubscribe(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) { - _server.Unsubscribe(channel, handler, flags); + _server.Unsubscribe(channel, _id, flags); } public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) @@ -364,7 +376,14 @@ public void WaitAll(params Task[] tasks) public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None) { - throw new NotImplementedException(); + var redisSubscriberType = typeof(RedisChannel).Assembly.GetType("StackExchange.Redis.RedisSubscriber"); + var ctor = typeof(ChannelMessageQueue).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic, + binder: null, + new Type[] { typeof(RedisChannel).MakeByRefType(), redisSubscriberType }, modifiers: null); + + var queue = (ChannelMessageQueue)ctor.Invoke(new object[] { channel, null }); + _server.Subscribe(queue, _id); + return queue; } public Task SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) From c4467f4f08c5340df91c4e688dcc54925c91b4c6 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Fri, 30 Apr 2021 15:41:46 -0700 Subject: [PATCH 2/4] fixups --- .../src/HubLifetimeManagerTestBase.cs | 12 ++++++------ .../test/RedisHubLifetimeManagerTests.cs | 5 +++-- .../test/TestConnectionMultiplexer.cs | 2 ++ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs b/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs index ba1908c9a35d..26bf3cad6b93 100644 --- a/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs +++ b/src/SignalR/server/Specification.Tests/src/HubLifetimeManagerTestBase.cs @@ -47,12 +47,12 @@ public async Task SendAllAsyncWritesToAllConnectionsOutput() await manager.SendAllAsync("Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); + var message = Assert.IsType(await client1.ReadAsync().DefaultTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); - message = Assert.IsType(await client2.ReadAsync().OrTimeout()); + message = Assert.IsType(await client2.ReadAsync().DefaultTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -80,7 +80,7 @@ public async Task SendAllAsyncDoesNotWriteToDisconnectedConnectionsOutput() await manager.SendAllAsync("Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); + var message = Assert.IsType(await client1.ReadAsync().DefaultTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -110,7 +110,7 @@ public async Task SendGroupAsyncWritesToAllConnectionsInGroupOutput() await manager.SendGroupAsync("group", "Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); + var message = Assert.IsType(await client1.ReadAsync().DefaultTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -141,7 +141,7 @@ public async Task SendGroupExceptAsyncDoesNotWriteToExcludedConnections() await manager.SendGroupExceptAsync("group1", "Hello", new object[] { "World" }, new[] { connection2.ConnectionId }).DefaultTimeout(); - var message = Assert.IsType(await client1.ReadAsync().OrTimeout()); + var message = Assert.IsType(await client1.ReadAsync().DefaultTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); @@ -166,7 +166,7 @@ public async Task SendConnectionAsyncWritesToConnectionOutput() await manager.SendConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).DefaultTimeout(); - var message = Assert.IsType(await client.ReadAsync().OrTimeout()); + var message = Assert.IsType(await client.ReadAsync().DefaultTimeout()); Assert.Equal("Hello", message.Target); Assert.Single(message.Arguments); Assert.Equal("World", (string)message.Arguments[0]); diff --git a/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs b/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs index 9d85edc8fba8..646fbb9c2ac9 100644 --- a/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs +++ b/src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Internal; using Microsoft.AspNetCore.SignalR.Protocol; +using Microsoft.AspNetCore.SignalR.Specification.Tests; using Microsoft.AspNetCore.SignalR.Tests; using Microsoft.AspNetCore.Testing; using Microsoft.Extensions.Logging.Abstractions; @@ -87,13 +88,13 @@ public override TestRedisServer CreateBackplane() return new TestRedisServer(); } - public override HubLifetimeManager CreateNewHubLifetimeManager() + public override HubLifetimeManager CreateNewHubLifetimeManager() { _server = new TestRedisServer(); return CreateLifetimeManager(_server); } - public override HubLifetimeManager CreateNewHubLifetimeManager(TestRedisServer backplane) + public override HubLifetimeManager CreateNewHubLifetimeManager(TestRedisServer backplane) { return CreateLifetimeManager(backplane); } diff --git a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs index e1d2c3f90cef..d39ede22a6b9 100644 --- a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs +++ b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs @@ -245,6 +245,8 @@ public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, Comman { Action handler = (channel, value) => { + // ChannelMessageQueue isn't mockable currently, this works around that by using private reflection + // Change to `Write` when Redis is updated, the private method got renamed typeof(ChannelMessageQueue).GetMethod("HandleMessage", BindingFlags.NonPublic | BindingFlags.Instance) .Invoke(messageQueue, new object[] { channel, value }); }; From e64251fea4dc166608a854d970ee311d93601f95 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 11 May 2021 16:24:26 -0700 Subject: [PATCH 3/4] update --- .../StackExchangeRedis/test/TestConnectionMultiplexer.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs index d39ede22a6b9..19e82996b27c 100644 --- a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs +++ b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs @@ -246,8 +246,7 @@ public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, Comman Action handler = (channel, value) => { // ChannelMessageQueue isn't mockable currently, this works around that by using private reflection - // Change to `Write` when Redis is updated, the private method got renamed - typeof(ChannelMessageQueue).GetMethod("HandleMessage", BindingFlags.NonPublic | BindingFlags.Instance) + typeof(ChannelMessageQueue).GetMethod("Write", BindingFlags.NonPublic | BindingFlags.Instance) .Invoke(messageQueue, new object[] { channel, value }); }; From 47694e3eb54fe5790e57161c02506567a2d4c407 Mon Sep 17 00:00:00 2001 From: Brennan Date: Wed, 12 May 2021 18:20:29 -0700 Subject: [PATCH 4/4] Apply suggestions from code review --- .../server/StackExchangeRedis/test/TestConnectionMultiplexer.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs index 19e82996b27c..1cfeb6237a77 100644 --- a/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs +++ b/src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs @@ -245,6 +245,7 @@ public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, Comman { Action handler = (channel, value) => { + // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969 // ChannelMessageQueue isn't mockable currently, this works around that by using private reflection typeof(ChannelMessageQueue).GetMethod("Write", BindingFlags.NonPublic | BindingFlags.Instance) .Invoke(messageQueue, new object[] { channel, value }); @@ -377,6 +378,7 @@ public void WaitAll(params Task[] tasks) public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None) { + // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969 var redisSubscriberType = typeof(RedisChannel).Assembly.GetType("StackExchange.Redis.RedisSubscriber"); var ctor = typeof(ChannelMessageQueue).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic, binder: null,