diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 1bb9bf808..d4cd15b5d 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -11,6 +11,7 @@ Current package versions: - Adds: `last-in` and `cur-in` (bytes) to timeout exceptions to help identify timeouts that were just-behind another large payload off the wire ([#2276 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2276)) - Adds: general-purpose tunnel support, with HTTP proxy "connect" support included ([#2274 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2274)) - Removes: Package dependency (`System.Diagnostics.PerformanceCounter`) ([#2285 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2285)) +- Adds: `.WithChannelPrefix()` to `ISubscriber` for key-preifixing channels like the existing `.WithKeyPreifx()` on `IDatabase` ([#896 by pecanw](https://github.com/StackExchange/StackExchange.Redis/pull/896)) ## 2.6.70 diff --git a/src/StackExchange.Redis/KeyspaceIsolation/DatabaseExtension.cs b/src/StackExchange.Redis/KeyspaceIsolation/DatabaseExtensions.cs similarity index 100% rename from src/StackExchange.Redis/KeyspaceIsolation/DatabaseExtension.cs rename to src/StackExchange.Redis/KeyspaceIsolation/DatabaseExtensions.cs diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs new file mode 100644 index 000000000..b4e1b97b9 --- /dev/null +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs @@ -0,0 +1,122 @@ +using System; +using System.Net; +using System.Threading.Tasks; + +namespace StackExchange.Redis.KeyspaceIsolation +{ + internal class KeyPrefixedSubscriber : ISubscriber + { + public ISubscriber Inner { get; } + + internal RedisChannel Prefix { get; } + + public KeyPrefixedSubscriber(ISubscriber inner, byte[] prefix) + { + Inner = inner; + Prefix = new RedisChannel(prefix, RedisChannel.PatternMode.Literal); + } + + public IConnectionMultiplexer Multiplexer => Inner.Multiplexer; + + public EndPoint? IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpoint(ToInner(channel), flags); + + public Task IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpointAsync(ToInner(channel), flags); + + public bool IsConnected(RedisChannel channel = default(RedisChannel)) => Inner.IsConnected(ToInner(channel)); + + public TimeSpan Ping(CommandFlags flags = CommandFlags.None) => Inner.Ping(flags); + + public Task PingAsync(CommandFlags flags = CommandFlags.None) => Inner.PingAsync(flags); + + public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.Publish(ToInner(channel), message, flags); + + public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.PublishAsync(ToInner(channel), message, flags); + + public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None) => + Inner.Subscribe(ToInner(channel), flags); + + public void Subscribe(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) => + Inner.Subscribe(ToInner(channel), ToInner(handler), flags); + + public Task SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => + Inner.SubscribeAsync(ToInner(channel), flags); + + public Task SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) => + Inner.SubscribeAsync(ToInner(channel), ToInner(handler), flags); + + public EndPoint? SubscribedEndpoint(RedisChannel channel) => Inner.SubscribedEndpoint(ToInner(channel)); + + public bool TryWait(Task task) => Inner.TryWait(task); + + public void Unsubscribe(RedisChannel channel, Action? handler = null, CommandFlags flags = CommandFlags.None) => + Inner.Unsubscribe(ToInner(channel), ToInner(handler), flags); + + public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) + { + if (Prefix.IsNullOrEmpty) + { + Inner.UnsubscribeAll(flags); + } + else + { + Inner.Unsubscribe(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); + } + } + + public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) => Prefix.IsNullOrEmpty + ? Inner.UnsubscribeAllAsync(flags) + : Inner.UnsubscribeAsync(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); + + public Task UnsubscribeAsync(RedisChannel channel, Action? handler = null, CommandFlags flags = CommandFlags.None) => + Inner.UnsubscribeAsync(ToInner(channel), ToInner(handler), flags); + + public void Wait(Task task) => Inner.Wait(task); + + public T Wait(Task task) => Inner.Wait(task); + + public void WaitAll(params Task[] tasks) => Inner.WaitAll(tasks); + + protected Action ToInner(Action? handler) => (channel, value) => handler?.Invoke(ToOuter(channel), value); + + public RedisChannel ToInner(RedisChannel outer) + { + if (Prefix.IsNullOrEmpty) return outer; + + if (outer.IsNullOrEmpty) return Prefix; + + byte[] outerArr = outer!; + byte[] prefixArr = Prefix!; + + var innerArr = new byte[prefixArr.Length + outerArr.Length]; + Buffer.BlockCopy(prefixArr, 0, innerArr, 0, prefixArr.Length); + Buffer.BlockCopy(outerArr, 0, innerArr, prefixArr.Length, outerArr.Length); + + var patternMode = outer.IsPatternBased ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal; + + return new RedisChannel(innerArr, patternMode); + } + + public RedisChannel ToOuter(RedisChannel inner) + { + if (Prefix.IsNullOrEmpty || inner.IsNullOrEmpty) return inner; + + byte[] innerArr = inner!; + byte[] prefixArr = Prefix!; + + if (innerArr.Length <= prefixArr.Length) return inner; + + for (var i = 0; i < prefixArr.Length; i++) + { + if (prefixArr[i] != innerArr[i]) return inner; + } + + var outerLength = innerArr.Length - prefixArr.Length; + var outerArr = new byte[outerLength]; + Buffer.BlockCopy(innerArr, prefixArr.Length, outerArr, 0, outerLength); + + var patternMode = inner.IsPatternBased ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal; + + return new RedisChannel(outerArr, patternMode); + } + } +} diff --git a/src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs b/src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs new file mode 100644 index 000000000..727ed4535 --- /dev/null +++ b/src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs @@ -0,0 +1,47 @@ +using System; + +namespace StackExchange.Redis.KeyspaceIsolation +{ + /// + /// Provides the extension method to . + /// + public static class SubscriberExtensions + { + /// + /// Creates a new instance that provides an isolated channel namespace + /// of the specified underlying subscriber instance. + /// + /// + /// The underlying subscriber instance that the returned instance shall use. + /// + /// + /// The prefix that defines a channel namespace isolation for the returned subscriber instance. + /// + /// + /// A new instance that invokes the specified underlying + /// but prepends the specified + /// to all channel names and thus forms a logical channel namespace isolation. + /// + public static ISubscriber WithChannelPrefix(this ISubscriber subscriber, RedisChannel channelPrefix) + { + if (subscriber == null) + { + throw new ArgumentNullException(nameof(subscriber)); + } + + if (channelPrefix.IsNullOrEmpty) + { + throw new ArgumentNullException(nameof(channelPrefix)); + } + + if (subscriber is KeyPrefixedSubscriber wrapper) + { + // combine the channel prefix in advance to minimize indirection + channelPrefix = wrapper.ToInner(channelPrefix); + subscriber = wrapper.Inner; + } + + return new KeyPrefixedSubscriber(subscriber, channelPrefix!); + } + } +} diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index f1201d35f..1fe21b556 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1126,6 +1126,7 @@ StackExchange.Redis.ITransaction.AddCondition(StackExchange.Redis.Condition! con StackExchange.Redis.ITransaction.Execute(StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool StackExchange.Redis.ITransaction.ExecuteAsync(StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions +StackExchange.Redis.KeyspaceIsolation.SubscriberExtensions StackExchange.Redis.LatencyHistoryEntry StackExchange.Redis.LatencyHistoryEntry.DurationMilliseconds.get -> int StackExchange.Redis.LatencyHistoryEntry.LatencyHistoryEntry() -> void @@ -1629,6 +1630,7 @@ static StackExchange.Redis.HashEntry.implicit operator System.Collections.Generi static StackExchange.Redis.HashEntry.operator !=(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool static StackExchange.Redis.HashEntry.operator ==(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool static StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions.WithKeyPrefix(this StackExchange.Redis.IDatabase! database, StackExchange.Redis.RedisKey keyPrefix) -> StackExchange.Redis.IDatabase! +static StackExchange.Redis.KeyspaceIsolation.SubscriberExtensions.WithChannelPrefix(this StackExchange.Redis.ISubscriber! subscriber, StackExchange.Redis.RedisChannel channelPrefix) -> StackExchange.Redis.ISubscriber! static StackExchange.Redis.Lease.Create(int length, bool clear = true) -> StackExchange.Redis.Lease! static StackExchange.Redis.Lease.Empty.get -> StackExchange.Redis.Lease! static StackExchange.Redis.ListPopResult.Null.get -> StackExchange.Redis.ListPopResult diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs new file mode 100644 index 000000000..8ce7494c8 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs @@ -0,0 +1,76 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using StackExchange.Redis.KeyspaceIsolation; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests +{ + public class KeyPrefixedSubscriberTests : TestBase + { + public KeyPrefixedSubscriberTests(ITestOutputHelper output) : base(output) + {} + + [Fact] + public async Task UsePrefixForChannel() + { + using (var client = Create(allowAdmin: true)) + { + const string prefix1 = "(p1)-"; + const string prefix2 = "(p2)-"; + + var s1 = client.GetSubscriber().WithChannelPrefix(prefix1); + var s12 = client.GetSubscriber().WithChannelPrefix(prefix1); + var s2 = client.GetSubscriber().WithChannelPrefix(prefix2); + var s = client.GetSubscriber(); + + var l1 = new List(); + var l12 = new List(); + var l2 = new List(); + var l = new List(); + var lAll = new List(); + var lT1 = new List(); + var c1 = new List(); + + const string channelName = "test-channel"; + s1.Subscribe(channelName, (channel, value) => + { + c1.Add(channel!); + l1.Add(value!); + }); + s12.Subscribe(channelName, (_channel, value) => l12.Add(value!)); + s2.Subscribe(channelName, (_channel, value) => l2.Add(value!)); + s.Subscribe(channelName, (_channel, value) => l.Add(value!)); + s.Subscribe("*" + channelName, (_channel, value) => lAll.Add(value!)); + s.Subscribe(prefix1 + channelName, (_channel, value) => lT1.Add(value!)); + + s1.Publish(channelName, "value1"); + s.Publish(channelName, "value"); + + // Give some time to pub-sub + await Task.Delay(500); + + Assert.Single(l1); + Assert.Equal("value1",l1[0]); + + Assert.Single(l12); + Assert.Equal("value1",l12[0]); + + Assert.Empty(l2); + + Assert.Single(l); + Assert.Equal("value",l[0]); + + Assert.Equal(2, lAll.Count); + Assert.Contains("value", lAll); + Assert.Contains("value1", lAll); + + Assert.Single(lT1); + Assert.Equal("value1",lT1[0]); + + Assert.Single(c1); + Assert.Equal(channelName,c1[0]); + } + } + } +}