From d3e19b3dfbcdacb872de983ef082b5908d8460e4 Mon Sep 17 00:00:00 2001 From: Todd Tingen Date: Thu, 14 Apr 2022 00:29:16 -0400 Subject: [PATCH 01/15] Implement XAUTOCLAIM. --- src/StackExchange.Redis/Enums/RedisCommand.cs | 1 + .../Interfaces/IDatabase.cs | 16 +++++ .../Interfaces/IDatabaseAsync.cs | 16 +++++ .../KeyspaceIsolation/DatabaseWrapper.cs | 3 + .../KeyspaceIsolation/WrapperBase.cs | 3 + src/StackExchange.Redis/PublicAPI.Shipped.txt | 6 ++ src/StackExchange.Redis/RedisDatabase.cs | 58 +++++++++++++++++++ src/StackExchange.Redis/ResultProcessor.cs | 51 ++++++++++++++++ .../StreamAutoClaimResult.cs | 26 +++++++++ 9 files changed, 180 insertions(+) create mode 100644 src/StackExchange.Redis/StreamAutoClaimResult.cs diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs index f40d3ff49..9f26a48b2 100644 --- a/src/StackExchange.Redis/Enums/RedisCommand.cs +++ b/src/StackExchange.Redis/Enums/RedisCommand.cs @@ -182,6 +182,7 @@ internal enum RedisCommand XACK, XADD, + XAUTOCLAIM, XCLAIM, XDEL, XGROUP, diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index d40721650..0582fd9fc 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -1914,6 +1914,22 @@ IEnumerable SortedSetScan(RedisKey key, /// https://redis.io/commands/xadd RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that + /// have been idle for more than will be claimed. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages that are currently pending and have an idle time greater than . + /// The minimum idle time for pending messages. + /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// Only return the for the claimed messages. If , the array will be empty. + /// The flags to use for this operation. + /// An instance of . + /// https://redis.io/commands/xautoclaim + StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None); + /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. /// This method returns the complete message for the claimed message(s). diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index d4b0dac74..10bdb85b0 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -1866,6 +1866,22 @@ IAsyncEnumerable SortedSetScanAsync(RedisKey key, /// https://redis.io/commands/xadd Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that + /// have been idle for more than will be claimed. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages that are currently pending and have an idle time greater than . + /// The minimum idle time for pending messages. + /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// Only return the for the claimed messages. If , the array will be empty. + /// The flags to use for this operation. + /// An instance of . + /// https://redis.io/commands/xautoclaim + Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None); + /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. /// This method returns the complete message for the claimed message(s). diff --git a/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs b/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs index 959ecaa70..4348f906b 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs @@ -460,6 +460,9 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly, flags); + public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs b/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs index b26a66622..9f850b8d9 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs @@ -476,6 +476,9 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly, flags); + public Task StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); diff --git a/src/StackExchange.Redis/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI.Shipped.txt index 91d53c8ad..8d6ebb30b 100644 --- a/src/StackExchange.Redis/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI.Shipped.txt @@ -629,6 +629,7 @@ StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAutoClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, bool idsOnly = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimResult StackExchange.Redis.IDatabase.StreamClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue[]! StackExchange.Redis.IDatabase.StreamConsumerGroupSetPosition(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue position, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool @@ -828,6 +829,7 @@ StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.Re StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAutoClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, bool idsOnly = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamConsumerGroupSetPositionAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue position, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! @@ -1352,6 +1354,10 @@ StackExchange.Redis.SortedSetOrder.ByScore = 1 -> StackExchange.Redis.SortedSetO StackExchange.Redis.SortType StackExchange.Redis.SortType.Alphabetic = 1 -> StackExchange.Redis.SortType StackExchange.Redis.SortType.Numeric = 0 -> StackExchange.Redis.SortType +StackExchange.Redis.StreamAutoClaimResult +StackExchange.Redis.StreamAutoClaimResult.ClaimedEntries.get -> StackExchange.Redis.StreamEntry[]! +StackExchange.Redis.StreamAutoClaimResult.NextStartId.get -> StackExchange.Redis.RedisValue +StackExchange.Redis.StreamAutoClaimResult.StreamAutoClaimResult() -> void StackExchange.Redis.StreamConsumer StackExchange.Redis.StreamConsumer.Name.get -> StackExchange.Redis.RedisValue StackExchange.Redis.StreamConsumer.PendingMessageCount.get -> int diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 2bfc4cbe1..e61b2d9f9 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2055,6 +2055,38 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair return ExecuteAsync(msg, ResultProcessor.RedisValue); } + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + startAtId, + count, + idsOnly, + flags); + + return idsOnly + ? ExecuteSync(msg, ResultProcessor.StreamAutoClaimIdsOnly) + : ExecuteSync(msg, ResultProcessor.StreamAutoClaim); + } + + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + startAtId, + count, + idsOnly, + flags); + + return idsOnly + ? ExecuteAsync(msg, ResultProcessor.StreamAutoClaimIdsOnly) + : ExecuteAsync(msg, ResultProcessor.StreamAutoClaim); + } + public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) { var msg = GetStreamClaimMessage(key, @@ -3543,6 +3575,32 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe return Message.Create(Database, flags, RedisCommand.XADD, key, values); } + private Message GetStreamAutoClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count, bool returnJustIds, CommandFlags flags) + { + // XAUTOCLAIM [COUNT count] [JUSTID] + var values = new RedisValue[4 + (count is null ? 0 : 2) + (returnJustIds ? 1 : 0)]; + + var offset = 0; + + values[offset++] = consumerGroup; + values[offset++] = assignToConsumer; + values[offset++] = minIdleTimeInMs; + values[offset++] = startAtId; + + if (count is not null) + { + values[offset++] = StreamConstants.Count; + values[offset++] = count.Value; + } + + if (returnJustIds) + { + values[offset++] = StreamConstants.JustId; + } + + return Message.Create(Database, flags, RedisCommand.XAUTOCLAIM, key, values); + } + private Message GetStreamClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue[] messageIds, bool returnJustIds, CommandFlags flags) { if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 9db002a0c..8a16e6dda 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -114,6 +114,12 @@ public static readonly SingleStreamProcessor public static readonly SingleStreamProcessor SingleStreamWithNameSkip = new SingleStreamProcessor(skipStreamName: true); + public static readonly StreamAutoClaimProcessor + StreamAutoClaim = new StreamAutoClaimProcessor(); + + public static readonly StreamAutoClaimProcessor + StreamAutoClaimIdsOnly = new StreamAutoClaimProcessor(processIdsOnly: true); + public static readonly StreamConsumerInfoProcessor StreamConsumerInfo = new StreamConsumerInfoProcessor(); @@ -1725,6 +1731,51 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + internal sealed class StreamAutoClaimProcessor : StreamProcessorBase + { + private readonly bool processIdsOnly; + + public StreamAutoClaimProcessor(bool processIdsOnly = false) + { + this.processIdsOnly = processIdsOnly; + } + + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + // See https://redis.io/commands/xautoclaim for command documentation. + + // NOTE: Starting in 7.0, a third array will be returned in the result. + // It will contain IDs of deleted messages from the pending entry list. + + if (!result.IsNull) + { + var items = result.GetItems(); + + // [0] will be the next start ID. + var nextStartId = items[0].AsRedisValue(); + + // [1] will be the array of either StreamEntry's or message IDs. + var entriesOrIds = items[1].GetItems(); + + var arr = new StreamEntry[entriesOrIds.Length]; + + for (var i = 0; i < arr.Length; i++) + { + var item = entriesOrIds[i]; + + arr[i] = processIdsOnly + ? new StreamEntry(item.AsRedisValue(), Array.Empty()) + : ParseRedisStreamEntry(entriesOrIds[i]); + } + + SetResult(message, new StreamAutoClaimResult(nextStartId, arr)); + return true; + } + + return false; + } + } + internal sealed class StreamConsumerInfoProcessor : InterleavedStreamInfoProcessorBase { protected override StreamConsumerInfo ParseItem(in RawResult result) diff --git a/src/StackExchange.Redis/StreamAutoClaimResult.cs b/src/StackExchange.Redis/StreamAutoClaimResult.cs new file mode 100644 index 000000000..16cb89fdf --- /dev/null +++ b/src/StackExchange.Redis/StreamAutoClaimResult.cs @@ -0,0 +1,26 @@ +using System; + +namespace StackExchange.Redis +{ + /// + /// The result of the XAUTOCLAIM command. + /// + public readonly struct StreamAutoClaimResult + { + internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries) + { + NextStartId = nextStartId; + ClaimedEntries = claimedEntries ?? Array.Empty(); + } + + /// + /// The stream ID to be used in the next call to StreamAutoClaim. + /// + public RedisValue NextStartId { get; } + + /// + /// An array of for the successfully claimed entries. + /// + public StreamEntry[] ClaimedEntries { get; } + } +} From 10f6a6ba9df7acd075750716fc23c7ab655bacc8 Mon Sep 17 00:00:00 2001 From: Todd Tingen Date: Thu, 14 Apr 2022 00:52:10 -0400 Subject: [PATCH 02/15] Add deleted message IDs to the result (available in 7.0). --- src/StackExchange.Redis/PublicAPI.Shipped.txt | 1 + src/StackExchange.Redis/ResultProcessor.cs | 15 +++++++++------ src/StackExchange.Redis/StreamAutoClaimResult.cs | 8 +++++++- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/StackExchange.Redis/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI.Shipped.txt index 8d6ebb30b..6e514ad9e 100644 --- a/src/StackExchange.Redis/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI.Shipped.txt @@ -1356,6 +1356,7 @@ StackExchange.Redis.SortType.Alphabetic = 1 -> StackExchange.Redis.SortType StackExchange.Redis.SortType.Numeric = 0 -> StackExchange.Redis.SortType StackExchange.Redis.StreamAutoClaimResult StackExchange.Redis.StreamAutoClaimResult.ClaimedEntries.get -> StackExchange.Redis.StreamEntry[]! +StackExchange.Redis.StreamAutoClaimResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! StackExchange.Redis.StreamAutoClaimResult.NextStartId.get -> StackExchange.Redis.RedisValue StackExchange.Redis.StreamAutoClaimResult.StreamAutoClaimResult() -> void StackExchange.Redis.StreamConsumer diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 8a16e6dda..997b8f953 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -1744,19 +1744,22 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { // See https://redis.io/commands/xautoclaim for command documentation. - // NOTE: Starting in 7.0, a third array will be returned in the result. - // It will contain IDs of deleted messages from the pending entry list. - if (!result.IsNull) { var items = result.GetItems(); - // [0] will be the next start ID. + // [0] The next start ID. var nextStartId = items[0].AsRedisValue(); - // [1] will be the array of either StreamEntry's or message IDs. + // [1] The array of either StreamEntry's or message IDs. var entriesOrIds = items[1].GetItems(); + // [2] Contains the list of message IDs deleted from the stream. + // This is available starting in 7.0. + var deletedIds = items.Length == 3 + ? items[2].GetItemsAsValues() ?? Array.Empty() + : Array.Empty(); + var arr = new StreamEntry[entriesOrIds.Length]; for (var i = 0; i < arr.Length; i++) @@ -1768,7 +1771,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes : ParseRedisStreamEntry(entriesOrIds[i]); } - SetResult(message, new StreamAutoClaimResult(nextStartId, arr)); + SetResult(message, new StreamAutoClaimResult(nextStartId, arr, deletedIds)); return true; } diff --git a/src/StackExchange.Redis/StreamAutoClaimResult.cs b/src/StackExchange.Redis/StreamAutoClaimResult.cs index 16cb89fdf..4dea00ad4 100644 --- a/src/StackExchange.Redis/StreamAutoClaimResult.cs +++ b/src/StackExchange.Redis/StreamAutoClaimResult.cs @@ -7,10 +7,11 @@ namespace StackExchange.Redis /// public readonly struct StreamAutoClaimResult { - internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries) + internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries, RedisValue[] deletedIds) { NextStartId = nextStartId; ClaimedEntries = claimedEntries ?? Array.Empty(); + DeletedIds = deletedIds ?? Array.Empty(); } /// @@ -22,5 +23,10 @@ internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntr /// An array of for the successfully claimed entries. /// public StreamEntry[] ClaimedEntries { get; } + + /// + /// An array of message IDs deleted from the stream. + /// + public RedisValue[] DeletedIds { get; } } } From de1fd67e8ffe7e72751a4c155d5b8db307b41fc2 Mon Sep 17 00:00:00 2001 From: Todd Tingen Date: Thu, 14 Apr 2022 22:54:24 -0400 Subject: [PATCH 03/15] Added tests. --- src/StackExchange.Redis/RedisDatabase.cs | 8 +- src/StackExchange.Redis/ResultProcessor.cs | 19 +- .../DatabaseWrapperTests.cs | 7 + tests/StackExchange.Redis.Tests/Streams.cs | 344 ++++++++++++++++++ .../WrapperBaseTests.cs | 7 + 5 files changed, 372 insertions(+), 13 deletions(-) diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index e61b2d9f9..847744791 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2066,9 +2066,7 @@ public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGr idsOnly, flags); - return idsOnly - ? ExecuteSync(msg, ResultProcessor.StreamAutoClaimIdsOnly) - : ExecuteSync(msg, ResultProcessor.StreamAutoClaim); + return ExecuteSync(msg, idsOnly ? ResultProcessor.StreamAutoClaimIdsOnly : ResultProcessor.StreamAutoClaim); } public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) @@ -2082,9 +2080,7 @@ public Task StreamAutoClaimAsync(RedisKey key, RedisValue idsOnly, flags); - return idsOnly - ? ExecuteAsync(msg, ResultProcessor.StreamAutoClaimIdsOnly) - : ExecuteAsync(msg, ResultProcessor.StreamAutoClaim); + return ExecuteAsync(msg, idsOnly ? ResultProcessor.StreamAutoClaimIdsOnly : ResultProcessor.StreamAutoClaim); } public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 997b8f953..a881f593b 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -1744,7 +1744,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { // See https://redis.io/commands/xautoclaim for command documentation. - if (!result.IsNull) + if (!result.IsNull && result.Type == ResultType.MultiBulk) { var items = result.GetItems(); @@ -1754,11 +1754,15 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes // [1] The array of either StreamEntry's or message IDs. var entriesOrIds = items[1].GetItems(); - // [2] Contains the list of message IDs deleted from the stream. - // This is available starting in 7.0. - var deletedIds = items.Length == 3 - ? items[2].GetItemsAsValues() ?? Array.Empty() - : Array.Empty(); + // [2] The array of message IDs deleted from the stream that were in the PEL. + // This is not available in 6.2 so we need to be defensive when reading + // this part of the response. + RedisValue[] deletedIds = null!; + + if (items.Length == 3) + { + deletedIds = items[2].GetItemsAsValues()!; + } var arr = new StreamEntry[entriesOrIds.Length]; @@ -1766,12 +1770,13 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { var item = entriesOrIds[i]; + // If JUSTID was sent with the request, only populate the ID of the StreamEntry. arr[i] = processIdsOnly ? new StreamEntry(item.AsRedisValue(), Array.Empty()) : ParseRedisStreamEntry(entriesOrIds[i]); } - SetResult(message, new StreamAutoClaimResult(nextStartId, arr, deletedIds)); + SetResult(message, new StreamAutoClaimResult(nextStartId, arr, deletedIds ?? Array.Empty())); return true; } diff --git a/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs b/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs index aaefbe2e2..0c37fc5b7 100644 --- a/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs +++ b/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs @@ -973,6 +973,13 @@ public void StreamAdd_2() mock.Verify(_ => _.StreamAdd("prefix:key", fields, "*", 1000, true, CommandFlags.None)); } + [Fact] + public void StreamAutoClaim() + { + wrapper.StreamAutoClaim("key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaim("prefix:key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None)); + } + [Fact] public void StreamClaimMessages() { diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index 0008291c1..a14c70ceb 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -120,6 +120,350 @@ public void StreamAddMultipleValuePairsWithManualId() } } + [Fact] + public void StreamAutoClaim_ClaimsPendingMessages() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + Assert.Equal("value2", result.ClaimedEntries[1].Values[0].Value); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsPendingMessagesAsync() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + Assert.Equal("value2", result.ClaimedEntries[1].Values[0].Value); + } + + [Fact] + public void StreamAutoClaim_ClaimsSingleMessageWithCountOption() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionAsync() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + } + + [Fact] + public void StreamAutoClaim_IncludesDeletedMessageId() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v7_0_0_rc1); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Delete one of the messages, it should be included in the deleted message ID array. + db.StreamDelete(key, new RedisValue[] { messageIds[0] }); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", count: 1); + + // Should be id2 from above. + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.True(result.DeletedIds.Length == 1); + Assert.Equal(messageIds[0], result.DeletedIds[0]); + } + + [Fact] + public async Task StreamAutoClaim_IncludesDeletedMessageIdAsync() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v7_0_0_rc1); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Delete one of the messages, it should be included in the deleted message ID array. + db.StreamDelete(key, new RedisValue[] { messageIds[0] }); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", count: 1); + + // Should be id2 from above. + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.True(result.DeletedIds.Length == 1); + Assert.Equal(messageIds[0], result.DeletedIds[0]); + } + + [Fact] + public void StreamAutoClaim_NoMessagesToClaim() + { + var key = Me(); + var group = "consumerGroup"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // **Don't add any messages to the stream** + + // Claim any pending messages (there aren't any) and reassign them to consumer2. + var result = db.StreamAutoClaim(key, group, "consumer1", 0, "0-0"); + + // Claimed entries should be empty + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_NoMessagesToClaimAsync() + { + var key = Me(); + var group = "consumerGroup"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // **Don't add any messages to the stream** + + // Claim any pending messages (there aren't any) and reassign them to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, "consumer1", 0, "0-0"); + + // Claimed entries should be empty + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_NoMessageMeetsMinIdleTime() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim messages idle for more than 5 minutes, should return an empty array. + var result = db.StreamAutoClaim(key, group, consumer2, 300000, "0-0", idsOnly: true); + + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_NoMessageMeetsMinIdleTimeAsync() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim messages idle for more than 5 minutes, should return an empty array. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 300000, "0-0", idsOnly: true); + + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_ReturnsStreamIdOnly() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", idsOnly: true); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedEntries[0].Id); + Assert.Equal(messageIds[1], result.ClaimedEntries[1].Id); + Assert.Empty(result.ClaimedEntries[0].Values); + Assert.Empty(result.ClaimedEntries[1].Values); + } + + [Fact] + public async Task StreamAutoClaim_ReturnsStreamIdOnlyAsync() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", idsOnly: true); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedEntries[0].Id); + Assert.Equal(messageIds[1], result.ClaimedEntries[1].Id); + Assert.Empty(result.ClaimedEntries[0].Values); + Assert.Empty(result.ClaimedEntries[1].Values); + } + + private RedisValue[] StreamAutoClaim_PrepareTestData(IDatabase db, RedisKey key, RedisValue group, RedisValue consumer) + { + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // Add some messages + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + + // Read the messages into the "c1" + db.StreamReadGroup(key, group, consumer); + + return new RedisValue[2] { id1, id2 }; + } + [Fact] public void StreamConsumerGroupSetId() { diff --git a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs index 4017fc7cf..804149fb0 100644 --- a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs +++ b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs @@ -905,6 +905,13 @@ public void StreamAddAsync_2() mock.Verify(_ => _.StreamAddAsync("prefix:key", fields, "*", 1000, true, CommandFlags.None)); } + [Fact] + public void StreamAutoClaim() + { + wrapper.StreamAutoClaimAsync("key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaimAsync("prefix:key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None)); + } + [Fact] public void StreamClaimMessagesAsync() { From 7e18e198d90a9aa38a21b4074de60fbae460490e Mon Sep 17 00:00:00 2001 From: Todd Tingen Date: Thu, 14 Apr 2022 23:28:35 -0400 Subject: [PATCH 04/15] Add release notes. --- docs/ReleaseNotes.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 1ddfeab3a..46373f7d7 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -19,6 +19,7 @@ - Adds: Support for `OBJECT REFCOUNT` with `.KeyRefCount()`/`.KeyRefCountAsync()` ([#2087 by Avital-Fine](https://github.com/StackExchange/StackExchange.Redis/pull/2087)) - Adds: Support for `OBJECT ENCODING` with `.KeyEncoding()`/`.KeyEncodingAsync()` ([#2088 by Avital-Fine](https://github.com/StackExchange/StackExchange.Redis/pull/2088)) - Adds: Support for `HRANDFIELD` with `.HashRandomField()`/`.HashRandomFieldAsync()`, `.HashRandomFields()`/`.HashRandomFieldsAsync()`, and `.HashRandomFieldsWithValues()`/`.HashRandomFieldsWithValuesAsync()` ([#2090 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2090)) +- Adds: Support for `XAUTOCLAIM` with `.StreamAutoClaim()`/.`StreamAutoClaimAsync()` ([#2095 by ttingen](https://github.com/StackExchange/StackExchange.Redis/pull/2095)) ## 2.5.61 From 52ef6e74cea360dab7b779a6a388c91f1e6ee162 Mon Sep 17 00:00:00 2001 From: Todd Tingen Date: Fri, 15 Apr 2022 22:41:47 -0400 Subject: [PATCH 05/15] Split the JUSTID option into a separate method and result. --- .../Interfaces/IDatabase.cs | 19 +++- .../Interfaces/IDatabaseAsync.cs | 19 +++- .../KeyspaceIsolation/DatabaseWrapper.cs | 7 +- .../KeyspaceIsolation/WrapperBase.cs | 7 +- src/StackExchange.Redis/PublicAPI.Shipped.txt | 11 ++- src/StackExchange.Redis/RedisDatabase.cs | 40 +++++++-- src/StackExchange.Redis/ResultProcessor.cs | 67 ++++++++++----- .../StreamAutoClaimIdsOnlyResult.cs | 30 +++++++ .../StreamAutoClaimResult.cs | 8 +- .../DatabaseWrapperTests.cs | 11 ++- tests/StackExchange.Redis.Tests/Streams.cs | 86 +++++++++++++++---- .../WrapperBaseTests.cs | 11 ++- 12 files changed, 253 insertions(+), 63 deletions(-) create mode 100644 src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 32fd5bb2e..ae21e9715 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -1976,11 +1976,26 @@ IEnumerable SortedSetScan(RedisKey key, /// The minimum idle time for pending messages. /// The starting ID to scan for pending messsages that have an idle time greater than . /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. - /// Only return the for the claimed messages. If , the array will be empty. /// The flags to use for this operation. /// An instance of . /// https://redis.io/commands/xautoclaim - StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None); + StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that + /// have been idle for more than will be claimed. The result will contain + /// the claimed message IDs instead of a instance. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages that are currently pending and have an idle time greater than . + /// The minimum idle time for pending messages. + /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// The flags to use for this operation. + /// An instance of . + /// https://redis.io/commands/xautoclaim + StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 82c2bb98e..df2e03169 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -1928,11 +1928,26 @@ IAsyncEnumerable SortedSetScanAsync(RedisKey key, /// The minimum idle time for pending messages. /// The starting ID to scan for pending messsages that have an idle time greater than . /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. - /// Only return the for the claimed messages. If , the array will be empty. /// The flags to use for this operation. /// An instance of . /// https://redis.io/commands/xautoclaim - Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None); + Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that + /// have been idle for more than will be claimed. The result will contain + /// the claimed message IDs instead of a instance. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages that are currently pending and have an idle time greater than . + /// The minimum idle time for pending messages. + /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// The flags to use for this operation. + /// An instance of . + /// https://redis.io/commands/xautoclaim + Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. diff --git a/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs b/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs index 6c7fc6d2f..7363219a8 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs @@ -475,8 +475,11 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); - public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) => - Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly, flags); + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); + + public StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs b/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs index 6a13d3702..6bcc6bf49 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs @@ -492,8 +492,11 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); - public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) => - Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly, flags); + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); + + public Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); public Task StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); diff --git a/src/StackExchange.Redis/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI.Shipped.txt index 8a72397cb..a6f84ddb0 100644 --- a/src/StackExchange.Redis/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI.Shipped.txt @@ -634,7 +634,8 @@ StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue -StackExchange.Redis.IDatabase.StreamAutoClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, bool idsOnly = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimResult +StackExchange.Redis.IDatabase.StreamAutoClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimResult +StackExchange.Redis.IDatabase.StreamAutoClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimIdsOnlyResult StackExchange.Redis.IDatabase.StreamClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue[]! StackExchange.Redis.IDatabase.StreamConsumerGroupSetPosition(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue position, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool @@ -839,7 +840,8 @@ StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.Re StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamAutoClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, bool idsOnly = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAutoClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAutoClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamConsumerGroupSetPositionAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue position, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! @@ -1364,6 +1366,11 @@ StackExchange.Redis.SortedSetOrder.ByScore = 1 -> StackExchange.Redis.SortedSetO StackExchange.Redis.SortType StackExchange.Redis.SortType.Alphabetic = 1 -> StackExchange.Redis.SortType StackExchange.Redis.SortType.Numeric = 0 -> StackExchange.Redis.SortType +StackExchange.Redis.StreamAutoClaimIdsOnlyResult +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.ClaimedEntryIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.NextStartId.get -> StackExchange.Redis.RedisValue +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.StreamAutoClaimIdsOnlyResult() -> void StackExchange.Redis.StreamAutoClaimResult StackExchange.Redis.StreamAutoClaimResult.ClaimedEntries.get -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.StreamAutoClaimResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 1ef337d6f..3f22d23e8 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2115,7 +2115,7 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair return ExecuteAsync(msg, ResultProcessor.RedisValue); } - public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAutoClaimMessage(key, consumerGroup, @@ -2123,13 +2123,27 @@ public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGr minIdleTimeInMs, startAtId, count, - idsOnly, + returnJustIds: false, + flags); + + return ExecuteSync(msg, ResultProcessor.StreamAutoClaim); + } + + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + startAtId, + count, + returnJustIds: false, flags); - return ExecuteSync(msg, idsOnly ? ResultProcessor.StreamAutoClaimIdsOnly : ResultProcessor.StreamAutoClaim); + return ExecuteAsync(msg, ResultProcessor.StreamAutoClaim); } - public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, bool idsOnly = false, CommandFlags flags = CommandFlags.None) + public StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAutoClaimMessage(key, consumerGroup, @@ -2137,10 +2151,24 @@ public Task StreamAutoClaimAsync(RedisKey key, RedisValue minIdleTimeInMs, startAtId, count, - idsOnly, + returnJustIds: true, + flags); + + return ExecuteSync(msg, ResultProcessor.StreamAutoClaimIdsOnly); + } + + public Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + startAtId, + count, + returnJustIds: true, flags); - return ExecuteAsync(msg, idsOnly ? ResultProcessor.StreamAutoClaimIdsOnly : ResultProcessor.StreamAutoClaim); + return ExecuteAsync(msg, ResultProcessor.StreamAutoClaimIdsOnly); } public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index bd65ebeab..9f8594977 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -121,8 +121,8 @@ public static readonly SingleStreamProcessor public static readonly StreamAutoClaimProcessor StreamAutoClaim = new StreamAutoClaimProcessor(); - public static readonly StreamAutoClaimProcessor - StreamAutoClaimIdsOnly = new StreamAutoClaimProcessor(processIdsOnly: true); + public static readonly StreamAutoClaimIdsOnlyProcessor + StreamAutoClaimIdsOnly = new StreamAutoClaimIdsOnlyProcessor(); public static readonly StreamConsumerInfoProcessor StreamConsumerInfo = new StreamConsumerInfoProcessor(); @@ -1751,13 +1751,6 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes internal sealed class StreamAutoClaimProcessor : StreamProcessorBase { - private readonly bool processIdsOnly; - - public StreamAutoClaimProcessor(bool processIdsOnly = false) - { - this.processIdsOnly = processIdsOnly; - } - protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) { // See https://redis.io/commands/xautoclaim for command documentation. @@ -1769,29 +1762,24 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes // [0] The next start ID. var nextStartId = items[0].AsRedisValue(); - // [1] The array of either StreamEntry's or message IDs. - var entriesOrIds = items[1].GetItems(); + // [1] The array of StreamEntry's. + var entries = items[1].GetItems(); // [2] The array of message IDs deleted from the stream that were in the PEL. // This is not available in 6.2 so we need to be defensive when reading // this part of the response. - RedisValue[] deletedIds = null!; + RedisValue[]? deletedIds = null; if (items.Length == 3) { - deletedIds = items[2].GetItemsAsValues()!; + deletedIds = items[2].GetItemsAsValues(); } - var arr = new StreamEntry[entriesOrIds.Length]; + var arr = new StreamEntry[entries.Length]; for (var i = 0; i < arr.Length; i++) { - var item = entriesOrIds[i]; - - // If JUSTID was sent with the request, only populate the ID of the StreamEntry. - arr[i] = processIdsOnly - ? new StreamEntry(item.AsRedisValue(), Array.Empty()) - : ParseRedisStreamEntry(entriesOrIds[i]); + arr[i] = ParseRedisStreamEntry(entries[i]); } SetResult(message, new StreamAutoClaimResult(nextStartId, arr, deletedIds ?? Array.Empty())); @@ -1802,6 +1790,45 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + internal sealed class StreamAutoClaimIdsOnlyProcessor : ResultProcessor + { + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + // Process the result when the command was sent the JUSTID option. + + // See https://redis.io/commands/xautoclaim for command documentation. + + if (!result.IsNull && result.Type == ResultType.MultiBulk) + { + var items = result.GetItems(); + + // [0] The next start ID. + var nextStartId = items[0].AsRedisValue(); + + // [1] The array of claimed message IDs. + var claimedIds = items[1].GetItemsAsValues(); + + // [2] The array of message IDs deleted from the stream that were in the PEL. + // This is not available in 6.2 so we need to be defensive when reading + // this part of the response. + RedisValue[]? deletedIds = null; + + if (items.Length == 3) + { + deletedIds = items[2].GetItemsAsValues(); + } + + SetResult(message, new StreamAutoClaimIdsOnlyResult(nextStartId, + claimedIds ?? Array.Empty(), + deletedIds ?? Array.Empty())); + + return true; + } + + return false; + } + } + internal sealed class StreamConsumerInfoProcessor : InterleavedStreamInfoProcessorBase { protected override StreamConsumerInfo ParseItem(in RawResult result) diff --git a/src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs new file mode 100644 index 000000000..1d72993bb --- /dev/null +++ b/src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs @@ -0,0 +1,30 @@ +namespace StackExchange.Redis +{ + /// + /// The result of the XAUTOCLAIM command with the JUSTID option. + /// + public readonly struct StreamAutoClaimIdsOnlyResult + { + internal StreamAutoClaimIdsOnlyResult(RedisValue nextStartId, RedisValue[] claimedEntryIds, RedisValue[] deletedIds) + { + NextStartId = nextStartId; + ClaimedEntryIds = claimedEntryIds; + DeletedIds = deletedIds; + } + + /// + /// The stream ID to be used in the next call to StreamAutoClaim. + /// + public RedisValue NextStartId { get; } + + /// + /// Array of IDs claimed by the command. + /// + public RedisValue[] ClaimedEntryIds { get; } + + /// + /// Array of message IDs deleted from the stream. + /// + public RedisValue[] DeletedIds { get; } + } +} diff --git a/src/StackExchange.Redis/StreamAutoClaimResult.cs b/src/StackExchange.Redis/StreamAutoClaimResult.cs index 4dea00ad4..3b50c4e8b 100644 --- a/src/StackExchange.Redis/StreamAutoClaimResult.cs +++ b/src/StackExchange.Redis/StreamAutoClaimResult.cs @@ -1,6 +1,4 @@ -using System; - -namespace StackExchange.Redis +namespace StackExchange.Redis { /// /// The result of the XAUTOCLAIM command. @@ -10,8 +8,8 @@ public readonly struct StreamAutoClaimResult internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries, RedisValue[] deletedIds) { NextStartId = nextStartId; - ClaimedEntries = claimedEntries ?? Array.Empty(); - DeletedIds = deletedIds ?? Array.Empty(); + ClaimedEntries = claimedEntries; + DeletedIds = deletedIds; } /// diff --git a/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs b/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs index fa867c910..debc6b245 100644 --- a/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs +++ b/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs @@ -990,8 +990,15 @@ public void StreamAdd_2() [Fact] public void StreamAutoClaim() { - wrapper.StreamAutoClaim("key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None); - mock.Verify(_ => _.StreamAutoClaim("prefix:key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None)); + wrapper.StreamAutoClaim("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaim("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); + } + + [Fact] + public void StreamAutoClaimIdsOnly() + { + wrapper.StreamAutoClaimIdsOnly("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaimIdsOnly("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index a14c70ceb..77db4f15c 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -201,6 +201,33 @@ public void StreamAutoClaim_ClaimsSingleMessageWithCountOption() Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); } + [Fact] + public void StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnly() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaimIdsOnly(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntryIds); + Assert.True(result.ClaimedEntryIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); + Assert.Empty(result.DeletedIds); + } + [Fact] public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionAsync() { @@ -228,6 +255,33 @@ public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionAsync() Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); } + [Fact] + public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnlyAsync() + { + var key = Me(); + var group = "consumerGroup"; + var consumer1 = "c1"; + var consumer2 = "c2"; + + using var conn = Create(); + Skip.IfBelow(conn, RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimIdsOnlyAsync(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntryIds); + Assert.True(result.ClaimedEntryIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); + Assert.Empty(result.DeletedIds); + } + [Fact] public void StreamAutoClaim_IncludesDeletedMessageId() { @@ -359,7 +413,7 @@ public void StreamAutoClaim_NoMessageMeetsMinIdleTime() _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); // Claim messages idle for more than 5 minutes, should return an empty array. - var result = db.StreamAutoClaim(key, group, consumer2, 300000, "0-0", idsOnly: true); + var result = db.StreamAutoClaim(key, group, consumer2, 300000, "0-0"); Assert.Equal("0-0", result.NextStartId); Assert.Empty(result.ClaimedEntries); @@ -383,7 +437,7 @@ public async Task StreamAutoClaim_NoMessageMeetsMinIdleTimeAsync() _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); // Claim messages idle for more than 5 minutes, should return an empty array. - var result = await db.StreamAutoClaimAsync(key, group, consumer2, 300000, "0-0", idsOnly: true); + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 300000, "0-0"); Assert.Equal("0-0", result.NextStartId); Assert.Empty(result.ClaimedEntries); @@ -391,7 +445,7 @@ public async Task StreamAutoClaim_NoMessageMeetsMinIdleTimeAsync() } [Fact] - public void StreamAutoClaim_ReturnsStreamIdOnly() + public void StreamAutoClaim_ReturnsMessageIdOnly() { var key = Me(); var group = "consumerGroup"; @@ -407,20 +461,18 @@ public void StreamAutoClaim_ReturnsStreamIdOnly() var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); // Claim any pending messages and reassign them to consumer2. - var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", idsOnly: true); + var result = db.StreamAutoClaimIdsOnly(key, group, consumer2, 0, "0-0"); Assert.Equal("0-0", result.NextStartId); - Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.ClaimedEntryIds); Assert.Empty(result.DeletedIds); - Assert.True(result.ClaimedEntries.Length == 2); - Assert.Equal(messageIds[0], result.ClaimedEntries[0].Id); - Assert.Equal(messageIds[1], result.ClaimedEntries[1].Id); - Assert.Empty(result.ClaimedEntries[0].Values); - Assert.Empty(result.ClaimedEntries[1].Values); + Assert.True(result.ClaimedEntryIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); + Assert.Equal(messageIds[1], result.ClaimedEntryIds[1]); } [Fact] - public async Task StreamAutoClaim_ReturnsStreamIdOnlyAsync() + public async Task StreamAutoClaim_ReturnsMessageIdOnlyAsync() { var key = Me(); var group = "consumerGroup"; @@ -436,16 +488,14 @@ public async Task StreamAutoClaim_ReturnsStreamIdOnlyAsync() var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); // Claim any pending messages and reassign them to consumer2. - var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", idsOnly: true); + var result = await db.StreamAutoClaimIdsOnlyAsync(key, group, consumer2, 0, "0-0"); Assert.Equal("0-0", result.NextStartId); - Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.ClaimedEntryIds); Assert.Empty(result.DeletedIds); - Assert.True(result.ClaimedEntries.Length == 2); - Assert.Equal(messageIds[0], result.ClaimedEntries[0].Id); - Assert.Equal(messageIds[1], result.ClaimedEntries[1].Id); - Assert.Empty(result.ClaimedEntries[0].Values); - Assert.Empty(result.ClaimedEntries[1].Values); + Assert.True(result.ClaimedEntryIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); + Assert.Equal(messageIds[1], result.ClaimedEntryIds[1]); } private RedisValue[] StreamAutoClaim_PrepareTestData(IDatabase db, RedisKey key, RedisValue group, RedisValue consumer) diff --git a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs index 1e134a475..87a373d27 100644 --- a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs +++ b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs @@ -923,8 +923,15 @@ public void StreamAddAsync_2() [Fact] public void StreamAutoClaim() { - wrapper.StreamAutoClaimAsync("key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None); - mock.Verify(_ => _.StreamAutoClaimAsync("prefix:key", "group", "consumer", 0, "0-0", 100, true, CommandFlags.None)); + wrapper.StreamAutoClaimAsync("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaimAsync("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); + } + + [Fact] + public void StreamAutoClaimIdsOnly() + { + wrapper.StreamAutoClaimIdsOnlyAsync("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaimIdsOnlyAsync("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); } [Fact] From 1f2f070076893434537f6cd87d02278c24ee1f01 Mon Sep 17 00:00:00 2001 From: Todd Tingen Date: Fri, 15 Apr 2022 23:04:02 -0400 Subject: [PATCH 06/15] Updated release notes with separate "IdsOnly" methods. --- docs/ReleaseNotes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 46373f7d7..47609a47e 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -19,7 +19,7 @@ - Adds: Support for `OBJECT REFCOUNT` with `.KeyRefCount()`/`.KeyRefCountAsync()` ([#2087 by Avital-Fine](https://github.com/StackExchange/StackExchange.Redis/pull/2087)) - Adds: Support for `OBJECT ENCODING` with `.KeyEncoding()`/`.KeyEncodingAsync()` ([#2088 by Avital-Fine](https://github.com/StackExchange/StackExchange.Redis/pull/2088)) - Adds: Support for `HRANDFIELD` with `.HashRandomField()`/`.HashRandomFieldAsync()`, `.HashRandomFields()`/`.HashRandomFieldsAsync()`, and `.HashRandomFieldsWithValues()`/`.HashRandomFieldsWithValuesAsync()` ([#2090 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2090)) -- Adds: Support for `XAUTOCLAIM` with `.StreamAutoClaim()`/.`StreamAutoClaimAsync()` ([#2095 by ttingen](https://github.com/StackExchange/StackExchange.Redis/pull/2095)) +- Adds: Support for `XAUTOCLAIM` with `.StreamAutoClaim()`/.`StreamAutoClaimAsync()` and `.StreamAutoClaimIdsOnly()`/.`StreamAutoClaimIdsOnlyAsync()` ([#2095 by ttingen](https://github.com/StackExchange/StackExchange.Redis/pull/2095)) ## 2.5.61 From 44f62a12a7fc2e2b7935bd418699dba287927a96 Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 16 Apr 2022 10:43:02 -0400 Subject: [PATCH 07/15] Renames, API moves, and simplify result processor --- .../Interfaces/IDatabase.cs | 22 ++++----- .../Interfaces/IDatabaseAsync.cs | 22 ++++----- src/StackExchange.Redis/PublicAPI.Shipped.txt | 2 +- src/StackExchange.Redis/ResultProcessor.cs | 45 ++++--------------- .../Results/StreamAutoClaimIdsOnlyResult.cs | 29 ++++++++++++ .../Results/StreamAutoClaimResult.cs | 29 ++++++++++++ .../StreamAutoClaimIdsOnlyResult.cs | 30 ------------- .../StreamAutoClaimResult.cs | 30 ------------- tests/StackExchange.Redis.Tests/Streams.cs | 28 ++++++------ 9 files changed, 103 insertions(+), 134 deletions(-) create mode 100644 src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs create mode 100644 src/StackExchange.Redis/Results/StreamAutoClaimResult.cs delete mode 100644 src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs delete mode 100644 src/StackExchange.Redis/StreamAutoClaimResult.cs diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 813613a1d..b69ed151c 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2075,14 +2075,14 @@ IEnumerable SortedSetScan(RedisKey key, RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); /// - /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that - /// have been idle for more than will be claimed. + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. /// /// The key of the stream. /// The consumer group. - /// The consumer claiming the messages that are currently pending and have an idle time greater than . - /// The minimum idle time for pending messages. - /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . @@ -2090,15 +2090,15 @@ IEnumerable SortedSetScan(RedisKey key, StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); /// - /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that - /// have been idle for more than will be claimed. The result will contain - /// the claimed message IDs instead of a instance. + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. + /// The result will contain the claimed message IDs instead of a instance. /// /// The key of the stream. /// The consumer group. - /// The consumer claiming the messages that are currently pending and have an idle time greater than . - /// The minimum idle time for pending messages. - /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index d8b48ba43..8fbe8fef3 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -2029,14 +2029,14 @@ IAsyncEnumerable SortedSetScanAsync(RedisKey key, Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); /// - /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that - /// have been idle for more than will be claimed. + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. /// /// The key of the stream. /// The consumer group. - /// The consumer claiming the messages that are currently pending and have an idle time greater than . - /// The minimum idle time for pending messages. - /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . @@ -2044,15 +2044,15 @@ IAsyncEnumerable SortedSetScanAsync(RedisKey key, Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); /// - /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. Messages that - /// have been idle for more than will be claimed. The result will contain - /// the claimed message IDs instead of a instance. + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. + /// The result will contain the claimed message IDs instead of a instance. /// /// The key of the stream. /// The consumer group. - /// The consumer claiming the messages that are currently pending and have an idle time greater than . - /// The minimum idle time for pending messages. - /// The starting ID to scan for pending messsages that have an idle time greater than . + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . diff --git a/src/StackExchange.Redis/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI.Shipped.txt index dc1c55920..77ed0512d 100644 --- a/src/StackExchange.Redis/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI.Shipped.txt @@ -1394,7 +1394,7 @@ StackExchange.Redis.SortType StackExchange.Redis.SortType.Alphabetic = 1 -> StackExchange.Redis.SortType StackExchange.Redis.SortType.Numeric = 0 -> StackExchange.Redis.SortType StackExchange.Redis.StreamAutoClaimIdsOnlyResult -StackExchange.Redis.StreamAutoClaimIdsOnlyResult.ClaimedEntryIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.ClaimedIds.get -> StackExchange.Redis.RedisValue[]! StackExchange.Redis.StreamAutoClaimIdsOnlyResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! StackExchange.Redis.StreamAutoClaimIdsOnlyResult.NextStartId.get -> StackExchange.Redis.RedisValue StackExchange.Redis.StreamAutoClaimIdsOnlyResult.StreamAutoClaimIdsOnlyResult() -> void diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index c62751b65..b36539d0f 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -1786,35 +1786,19 @@ internal sealed class StreamAutoClaimProcessor : StreamProcessorBase(); - SetResult(message, new StreamAutoClaimResult(nextStartId, arr, deletedIds ?? Array.Empty())); + SetResult(message, new StreamAutoClaimResult(nextStartId, entries, deletedIds)); return true; } @@ -1827,33 +1811,20 @@ internal sealed class StreamAutoClaimIdsOnlyProcessor : ResultProcessor(); // [2] The array of message IDs deleted from the stream that were in the PEL. - // This is not available in 6.2 so we need to be defensive when reading - // this part of the response. - RedisValue[]? deletedIds = null; - - if (items.Length == 3) - { - deletedIds = items[2].GetItemsAsValues(); - } - - SetResult(message, new StreamAutoClaimIdsOnlyResult(nextStartId, - claimedIds ?? Array.Empty(), - deletedIds ?? Array.Empty())); + // This is not available in 6.2 so we need to be defensive when reading this part of the response. + var deletedIds = (items.Length == 3 ? items[2].GetItemsAsValues() : null) ?? Array.Empty(); + SetResult(message, new StreamAutoClaimIdsOnlyResult(nextStartId, claimedIds, deletedIds)); return true; } diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs new file mode 100644 index 000000000..09acd91e2 --- /dev/null +++ b/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs @@ -0,0 +1,29 @@ +namespace StackExchange.Redis; + +/// +/// The result of the XAUTOCLAIM command with the JUSTID option. +/// +public readonly struct StreamAutoClaimIdsOnlyResult +{ + internal StreamAutoClaimIdsOnlyResult(RedisValue nextStartId, RedisValue[] claimedIds, RedisValue[] deletedIds) + { + NextStartId = nextStartId; + ClaimedIds = claimedIds; + DeletedIds = deletedIds; + } + + /// + /// The stream ID to be used in the next call to StreamAutoClaim. + /// + public RedisValue NextStartId { get; } + + /// + /// Array of IDs claimed by the command. + /// + public RedisValue[] ClaimedIds { get; } + + /// + /// Array of message IDs deleted from the stream. + /// + public RedisValue[] DeletedIds { get; } +} diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs b/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs new file mode 100644 index 000000000..ba00205b2 --- /dev/null +++ b/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs @@ -0,0 +1,29 @@ +namespace StackExchange.Redis; + +/// +/// The result of the XAUTOCLAIM command. +/// +public readonly struct StreamAutoClaimResult +{ + internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries, RedisValue[] deletedIds) + { + NextStartId = nextStartId; + ClaimedEntries = claimedEntries; + DeletedIds = deletedIds; + } + + /// + /// The stream ID to be used in the next call to StreamAutoClaim. + /// + public RedisValue NextStartId { get; } + + /// + /// An array of for the successfully claimed entries. + /// + public StreamEntry[] ClaimedEntries { get; } + + /// + /// An array of message IDs deleted from the stream. + /// + public RedisValue[] DeletedIds { get; } +} diff --git a/src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs deleted file mode 100644 index 1d72993bb..000000000 --- a/src/StackExchange.Redis/StreamAutoClaimIdsOnlyResult.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace StackExchange.Redis -{ - /// - /// The result of the XAUTOCLAIM command with the JUSTID option. - /// - public readonly struct StreamAutoClaimIdsOnlyResult - { - internal StreamAutoClaimIdsOnlyResult(RedisValue nextStartId, RedisValue[] claimedEntryIds, RedisValue[] deletedIds) - { - NextStartId = nextStartId; - ClaimedEntryIds = claimedEntryIds; - DeletedIds = deletedIds; - } - - /// - /// The stream ID to be used in the next call to StreamAutoClaim. - /// - public RedisValue NextStartId { get; } - - /// - /// Array of IDs claimed by the command. - /// - public RedisValue[] ClaimedEntryIds { get; } - - /// - /// Array of message IDs deleted from the stream. - /// - public RedisValue[] DeletedIds { get; } - } -} diff --git a/src/StackExchange.Redis/StreamAutoClaimResult.cs b/src/StackExchange.Redis/StreamAutoClaimResult.cs deleted file mode 100644 index 3b50c4e8b..000000000 --- a/src/StackExchange.Redis/StreamAutoClaimResult.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace StackExchange.Redis -{ - /// - /// The result of the XAUTOCLAIM command. - /// - public readonly struct StreamAutoClaimResult - { - internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries, RedisValue[] deletedIds) - { - NextStartId = nextStartId; - ClaimedEntries = claimedEntries; - DeletedIds = deletedIds; - } - - /// - /// The stream ID to be used in the next call to StreamAutoClaim. - /// - public RedisValue NextStartId { get; } - - /// - /// An array of for the successfully claimed entries. - /// - public StreamEntry[] ClaimedEntries { get; } - - /// - /// An array of message IDs deleted from the stream. - /// - public RedisValue[] DeletedIds { get; } - } -} diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index 77db4f15c..d0df152f6 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -222,9 +222,9 @@ public void StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnly() // Should be the second message ID from the call to prepare. Assert.Equal(messageIds[1], result.NextStartId); - Assert.NotEmpty(result.ClaimedEntryIds); - Assert.True(result.ClaimedEntryIds.Length == 1); - Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); + Assert.NotEmpty(result.ClaimedIds); + Assert.True(result.ClaimedIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); Assert.Empty(result.DeletedIds); } @@ -276,9 +276,9 @@ public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnlyAsync // Should be the second message ID from the call to prepare. Assert.Equal(messageIds[1], result.NextStartId); - Assert.NotEmpty(result.ClaimedEntryIds); - Assert.True(result.ClaimedEntryIds.Length == 1); - Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); + Assert.NotEmpty(result.ClaimedIds); + Assert.True(result.ClaimedIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); Assert.Empty(result.DeletedIds); } @@ -464,11 +464,11 @@ public void StreamAutoClaim_ReturnsMessageIdOnly() var result = db.StreamAutoClaimIdsOnly(key, group, consumer2, 0, "0-0"); Assert.Equal("0-0", result.NextStartId); - Assert.NotEmpty(result.ClaimedEntryIds); + Assert.NotEmpty(result.ClaimedIds); Assert.Empty(result.DeletedIds); - Assert.True(result.ClaimedEntryIds.Length == 2); - Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); - Assert.Equal(messageIds[1], result.ClaimedEntryIds[1]); + Assert.True(result.ClaimedIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Equal(messageIds[1], result.ClaimedIds[1]); } [Fact] @@ -491,11 +491,11 @@ public async Task StreamAutoClaim_ReturnsMessageIdOnlyAsync() var result = await db.StreamAutoClaimIdsOnlyAsync(key, group, consumer2, 0, "0-0"); Assert.Equal("0-0", result.NextStartId); - Assert.NotEmpty(result.ClaimedEntryIds); + Assert.NotEmpty(result.ClaimedIds); Assert.Empty(result.DeletedIds); - Assert.True(result.ClaimedEntryIds.Length == 2); - Assert.Equal(messageIds[0], result.ClaimedEntryIds[0]); - Assert.Equal(messageIds[1], result.ClaimedEntryIds[1]); + Assert.True(result.ClaimedIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Equal(messageIds[1], result.ClaimedIds[1]); } private RedisValue[] StreamAutoClaim_PrepareTestData(IDatabase db, RedisKey key, RedisValue group, RedisValue consumer) From a6c0396c97d5c46a24e16602dbaacbada6c634ac Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 16 Apr 2022 11:06:15 -0400 Subject: [PATCH 08/15] Tidy API and add default results These are subtly needed but not obvious from the previous result processors because they always processed regardless of results. Need to ensure sanity checks in tests on how we handle a key that isn't there as well. --- src/StackExchange.Redis/PublicAPI.Shipped.txt | 4 + src/StackExchange.Redis/RedisDatabase.cs | 54 ++---- .../Results/StreamAutoClaimIdsOnlyResult.cs | 14 +- .../Results/StreamAutoClaimResult.cs | 14 +- tests/StackExchange.Redis.Tests/Streams.cs | 160 ++++++++---------- 5 files changed, 107 insertions(+), 139 deletions(-) diff --git a/src/StackExchange.Redis/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI.Shipped.txt index 77ed0512d..8347638de 100644 --- a/src/StackExchange.Redis/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI.Shipped.txt @@ -1396,11 +1396,13 @@ StackExchange.Redis.SortType.Numeric = 0 -> StackExchange.Redis.SortType StackExchange.Redis.StreamAutoClaimIdsOnlyResult StackExchange.Redis.StreamAutoClaimIdsOnlyResult.ClaimedIds.get -> StackExchange.Redis.RedisValue[]! StackExchange.Redis.StreamAutoClaimIdsOnlyResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.IsNull.get -> bool StackExchange.Redis.StreamAutoClaimIdsOnlyResult.NextStartId.get -> StackExchange.Redis.RedisValue StackExchange.Redis.StreamAutoClaimIdsOnlyResult.StreamAutoClaimIdsOnlyResult() -> void StackExchange.Redis.StreamAutoClaimResult StackExchange.Redis.StreamAutoClaimResult.ClaimedEntries.get -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.StreamAutoClaimResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimResult.IsNull.get -> bool StackExchange.Redis.StreamAutoClaimResult.NextStartId.get -> StackExchange.Redis.RedisValue StackExchange.Redis.StreamAutoClaimResult.StreamAutoClaimResult() -> void StackExchange.Redis.StreamConsumer @@ -1670,6 +1672,8 @@ static StackExchange.Redis.SortedSetEntry.implicit operator StackExchange.Redis. static StackExchange.Redis.SortedSetEntry.implicit operator System.Collections.Generic.KeyValuePair(StackExchange.Redis.SortedSetEntry value) -> System.Collections.Generic.KeyValuePair static StackExchange.Redis.SortedSetEntry.operator !=(StackExchange.Redis.SortedSetEntry x, StackExchange.Redis.SortedSetEntry y) -> bool static StackExchange.Redis.SortedSetEntry.operator ==(StackExchange.Redis.SortedSetEntry x, StackExchange.Redis.SortedSetEntry y) -> bool +static StackExchange.Redis.StreamAutoClaimIdsOnlyResult.Null.get -> StackExchange.Redis.StreamAutoClaimIdsOnlyResult +static StackExchange.Redis.StreamAutoClaimResult.Null.get -> StackExchange.Redis.StreamAutoClaimResult static StackExchange.Redis.StreamEntry.Null.get -> StackExchange.Redis.StreamEntry static StackExchange.Redis.StreamPosition.Beginning.get -> StackExchange.Redis.RedisValue static StackExchange.Redis.StreamPosition.NewMessages.get -> StackExchange.Redis.RedisValue diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 9b744aafd..0924ed977 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2225,58 +2225,26 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) { - var msg = GetStreamAutoClaimMessage(key, - consumerGroup, - claimingConsumer, - minIdleTimeInMs, - startAtId, - count, - returnJustIds: false, - flags); - - return ExecuteSync(msg, ResultProcessor.StreamAutoClaim); + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: false, flags); + return ExecuteSync(msg, ResultProcessor.StreamAutoClaim, defaultValue: StreamAutoClaimResult.Null); } public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) { - var msg = GetStreamAutoClaimMessage(key, - consumerGroup, - claimingConsumer, - minIdleTimeInMs, - startAtId, - count, - returnJustIds: false, - flags); - - return ExecuteAsync(msg, ResultProcessor.StreamAutoClaim); + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: false, flags); + return ExecuteAsync(msg, ResultProcessor.StreamAutoClaim, defaultValue: StreamAutoClaimResult.Null); } public StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) { - var msg = GetStreamAutoClaimMessage(key, - consumerGroup, - claimingConsumer, - minIdleTimeInMs, - startAtId, - count, - returnJustIds: true, - flags); - - return ExecuteSync(msg, ResultProcessor.StreamAutoClaimIdsOnly); + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: true, flags); + return ExecuteSync(msg, ResultProcessor.StreamAutoClaimIdsOnly, defaultValue: StreamAutoClaimIdsOnlyResult.Null); } public Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) { - var msg = GetStreamAutoClaimMessage(key, - consumerGroup, - claimingConsumer, - minIdleTimeInMs, - startAtId, - count, - returnJustIds: true, - flags); - - return ExecuteAsync(msg, ResultProcessor.StreamAutoClaimIdsOnly); + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: true, flags); + return ExecuteAsync(msg, ResultProcessor.StreamAutoClaimIdsOnly, defaultValue: StreamAutoClaimIdsOnlyResult.Null); } public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) @@ -3786,10 +3754,10 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe return Message.Create(Database, flags, RedisCommand.XADD, key, values); } - private Message GetStreamAutoClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count, bool returnJustIds, CommandFlags flags) + private Message GetStreamAutoClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count, bool idsOnly, CommandFlags flags) { // XAUTOCLAIM [COUNT count] [JUSTID] - var values = new RedisValue[4 + (count is null ? 0 : 2) + (returnJustIds ? 1 : 0)]; + var values = new RedisValue[4 + (count is null ? 0 : 2) + (idsOnly ? 1 : 0)]; var offset = 0; @@ -3804,7 +3772,7 @@ private Message GetStreamAutoClaimMessage(RedisKey key, RedisValue consumerGroup values[offset++] = count.Value; } - if (returnJustIds) + if (idsOnly) { values[offset++] = StreamConstants.JustId; } diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs index 09acd91e2..a20c0f2be 100644 --- a/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs +++ b/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs @@ -1,4 +1,6 @@ -namespace StackExchange.Redis; +using System; + +namespace StackExchange.Redis; /// /// The result of the XAUTOCLAIM command with the JUSTID option. @@ -12,6 +14,16 @@ internal StreamAutoClaimIdsOnlyResult(RedisValue nextStartId, RedisValue[] claim DeletedIds = deletedIds; } + /// + /// A null , indicating no results. + /// + public static StreamAutoClaimIdsOnlyResult Null { get; } = new StreamAutoClaimIdsOnlyResult(RedisValue.Null, Array.Empty(), Array.Empty()); + + /// + /// Whether this object is null/empty. + /// + public bool IsNull => NextStartId.IsNull && ClaimedIds == Array.Empty() && DeletedIds == Array.Empty(); + /// /// The stream ID to be used in the next call to StreamAutoClaim. /// diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs b/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs index ba00205b2..68f139498 100644 --- a/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs +++ b/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs @@ -1,4 +1,6 @@ -namespace StackExchange.Redis; +using System; + +namespace StackExchange.Redis; /// /// The result of the XAUTOCLAIM command. @@ -12,6 +14,16 @@ internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntr DeletedIds = deletedIds; } + /// + /// A null , indicating no results. + /// + public static StreamAutoClaimResult Null { get; } = new StreamAutoClaimResult(RedisValue.Null, Array.Empty(), Array.Empty()); + + /// + /// Whether this object is null/empty. + /// + public bool IsNull => NextStartId.IsNull && ClaimedEntries == Array.Empty() && DeletedIds == Array.Empty(); + /// /// The stream ID to be used in the next call to StreamAutoClaim. /// diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index d0df152f6..25d004c37 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -123,15 +123,13 @@ public void StreamAddMultipleValuePairsWithManualId() [Fact] public void StreamAutoClaim_ClaimsPendingMessages() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -150,15 +148,13 @@ public void StreamAutoClaim_ClaimsPendingMessages() [Fact] public async Task StreamAutoClaim_ClaimsPendingMessagesAsync() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -177,15 +173,13 @@ public async Task StreamAutoClaim_ClaimsPendingMessagesAsync() [Fact] public void StreamAutoClaim_ClaimsSingleMessageWithCountOption() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -204,15 +198,13 @@ public void StreamAutoClaim_ClaimsSingleMessageWithCountOption() [Fact] public void StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnly() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -231,15 +223,13 @@ public void StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnly() [Fact] public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionAsync() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -258,15 +248,13 @@ public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionAsync() [Fact] public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnlyAsync() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -285,15 +273,13 @@ public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnlyAsync [Fact] public void StreamAutoClaim_IncludesDeletedMessageId() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v7_0_0_rc1); + using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -316,15 +302,13 @@ public void StreamAutoClaim_IncludesDeletedMessageId() [Fact] public async Task StreamAutoClaim_IncludesDeletedMessageIdAsync() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v7_0_0_rc1); + using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -347,13 +331,11 @@ public async Task StreamAutoClaim_IncludesDeletedMessageIdAsync() [Fact] public void StreamAutoClaim_NoMessagesToClaim() { - var key = Me(); - var group = "consumerGroup"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup"; // Create the group. db.KeyDelete(key); @@ -373,13 +355,11 @@ public void StreamAutoClaim_NoMessagesToClaim() [Fact] public async Task StreamAutoClaim_NoMessagesToClaimAsync() { - var key = Me(); - var group = "consumerGroup"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup"; // Create the group. db.KeyDelete(key); @@ -399,15 +379,13 @@ public async Task StreamAutoClaim_NoMessagesToClaimAsync() [Fact] public void StreamAutoClaim_NoMessageMeetsMinIdleTime() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -423,15 +401,13 @@ public void StreamAutoClaim_NoMessageMeetsMinIdleTime() [Fact] public async Task StreamAutoClaim_NoMessageMeetsMinIdleTimeAsync() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -447,15 +423,13 @@ public async Task StreamAutoClaim_NoMessageMeetsMinIdleTimeAsync() [Fact] public void StreamAutoClaim_ReturnsMessageIdOnly() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); @@ -474,15 +448,13 @@ public void StreamAutoClaim_ReturnsMessageIdOnly() [Fact] public async Task StreamAutoClaim_ReturnsMessageIdOnlyAsync() { - var key = Me(); - var group = "consumerGroup"; - var consumer1 = "c1"; - var consumer2 = "c2"; - - using var conn = Create(); - Skip.IfBelow(conn, RedisFeatures.v6_2_0); + using var conn = Create(require: RedisFeatures.v6_2_0); + var key = Me(); var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; // Create Consumer Group, add messages, and read messages into a consumer. var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); From a24d767de765f229ca7944635cec51a4857c2035 Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 16 Apr 2022 11:14:38 -0400 Subject: [PATCH 09/15] Add a null key test for autoclaim --- tests/StackExchange.Redis.Tests/Streams.cs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index 25d004c37..f35be183a 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -120,6 +120,25 @@ public void StreamAddMultipleValuePairsWithManualId() } } + [Fact] + public async Task StreamAutoClaim_MissingKey() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer = "consumer"; + + db.KeyDelete(key); + + var ex = Assert.Throws(() => db.StreamAutoClaim(key, group, consumer, 0, "0-0")); + Assert.StartsWith("NOGROUP No such key", ex.Message); + + ex = await Assert.ThrowsAsync(() => db.StreamAutoClaimAsync(key, group, consumer, 0, "0-0")); + Assert.StartsWith("NOGROUP No such key", ex.Message); + } + [Fact] public void StreamAutoClaim_ClaimsPendingMessages() { From 5f2308c1fdc969a4f5db27eeab0777a14ad5b75c Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 16 Apr 2022 11:26:41 -0400 Subject: [PATCH 10/15] Normalize processor, make docs a but more surfaced --- src/StackExchange.Redis/ResultProcessor.cs | 13 ++++++++++--- .../Results/StreamAutoClaimIdsOnlyResult.cs | 2 +- .../Results/StreamAutoClaimResult.cs | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index b36539d0f..38e60f796 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -1781,12 +1781,16 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + /// + /// This processor is for *without* the option. + /// internal sealed class StreamAutoClaimProcessor : StreamProcessorBase { protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) { // See https://redis.io/commands/xautoclaim for command documentation. - if (!result.IsNull && result.Type == ResultType.MultiBulk) + // Note that the result should never be null, so intentionally treating it as a failure to parse here + if (result.Type == ResultType.MultiBulk && !result.IsNull) { var items = result.GetItems(); @@ -1806,13 +1810,16 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + /// + /// This processor is for *with* the option. + /// internal sealed class StreamAutoClaimIdsOnlyProcessor : ResultProcessor { protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) { - // Process the result when the command was sent the JUSTID option. // See https://redis.io/commands/xautoclaim for command documentation. - if (!result.IsNull && result.Type == ResultType.MultiBulk) + // Note that the result should never be null, so intentionally treating it as a failure to parse here + if (result.Type == ResultType.MultiBulk && !result.IsNull) { var items = result.GetItems(); diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs index a20c0f2be..b4960d0ed 100644 --- a/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs +++ b/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs @@ -3,7 +3,7 @@ namespace StackExchange.Redis; /// -/// The result of the XAUTOCLAIM command with the JUSTID option. +/// Result of the XAUTOCLAIM command with the JUSTID option. /// public readonly struct StreamAutoClaimIdsOnlyResult { diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs b/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs index 68f139498..de900739b 100644 --- a/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs +++ b/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs @@ -3,7 +3,7 @@ namespace StackExchange.Redis; /// -/// The result of the XAUTOCLAIM command. +/// Result of the XAUTOCLAIM command. /// public readonly struct StreamAutoClaimResult { From e1c8e87c5c28606cd6979bd38f1878165cc989ae Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 16 Apr 2022 11:57:37 -0400 Subject: [PATCH 11/15] Normalize to #2096 --- .../{Results => APITypes}/StreamAutoClaimIdsOnlyResult.cs | 0 .../{Results => APITypes}/StreamAutoClaimResult.cs | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/StackExchange.Redis/{Results => APITypes}/StreamAutoClaimIdsOnlyResult.cs (100%) rename src/StackExchange.Redis/{Results => APITypes}/StreamAutoClaimResult.cs (100%) diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs similarity index 100% rename from src/StackExchange.Redis/Results/StreamAutoClaimIdsOnlyResult.cs rename to src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs diff --git a/src/StackExchange.Redis/Results/StreamAutoClaimResult.cs b/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs similarity index 100% rename from src/StackExchange.Redis/Results/StreamAutoClaimResult.cs rename to src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs From a77fb431b00372d05a772f23a1b4279c808dcda5 Mon Sep 17 00:00:00 2001 From: Todd Tingen Date: Sat, 16 Apr 2022 12:15:11 -0400 Subject: [PATCH 12/15] Clean up comments from copy/paste. --- tests/StackExchange.Redis.Tests/Streams.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index f35be183a..303d56837 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -309,7 +309,6 @@ public void StreamAutoClaim_IncludesDeletedMessageId() // Claim a single pending message and reassign it to consumer2. var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", count: 1); - // Should be id2 from above. Assert.Equal("0-0", result.NextStartId); Assert.NotEmpty(result.ClaimedEntries); Assert.NotEmpty(result.DeletedIds); @@ -338,7 +337,6 @@ public async Task StreamAutoClaim_IncludesDeletedMessageIdAsync() // Claim a single pending message and reassign it to consumer2. var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", count: 1); - // Should be id2 from above. Assert.Equal("0-0", result.NextStartId); Assert.NotEmpty(result.ClaimedEntries); Assert.NotEmpty(result.DeletedIds); From 3603627fbc4b7fa711ac5508849990154dda68d5 Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 16 Apr 2022 20:07:54 -0400 Subject: [PATCH 13/15] Fix naming --- tests/StackExchange.Redis.Tests/WrapperBaseTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs index 154b4aadf..1d1058008 100644 --- a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs +++ b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs @@ -944,14 +944,14 @@ public void StreamAddAsync_2() } [Fact] - public void StreamAutoClaim() + public void StreamAutoClaimAsync() { wrapper.StreamAutoClaimAsync("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); mock.Verify(_ => _.StreamAutoClaimAsync("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); } [Fact] - public void StreamAutoClaimIdsOnly() + public void StreamAutoClaimIdsOnlyAsync() { wrapper.StreamAutoClaimIdsOnlyAsync("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); mock.Verify(_ => _.StreamAutoClaimIdsOnlyAsync("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); From e3f2e061ff45bd680ddea8821715f02bfd5fb252 Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 16 Apr 2022 20:10:31 -0400 Subject: [PATCH 14/15] Merge fix --- tests/StackExchange.Redis.Tests/Streams.cs | 383 +++++++++++++++++++++ 1 file changed, 383 insertions(+) diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index adb5511dd..3ae1a19ae 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -103,6 +103,389 @@ public void StreamAddMultipleValuePairsWithManualId() Assert.Equal(id, entries[0].Id); } + [Fact] + public async Task StreamAutoClaim_MissingKey() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer = "consumer"; + + db.KeyDelete(key); + + var ex = Assert.Throws(() => db.StreamAutoClaim(key, group, consumer, 0, "0-0")); + Assert.StartsWith("NOGROUP No such key", ex.Message); + + ex = await Assert.ThrowsAsync(() => db.StreamAutoClaimAsync(key, group, consumer, 0, "0-0")); + Assert.StartsWith("NOGROUP No such key", ex.Message); + } + + [Fact] + public void StreamAutoClaim_ClaimsPendingMessages() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + Assert.Equal("value2", result.ClaimedEntries[1].Values[0].Value); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsPendingMessagesAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + Assert.Equal("value2", result.ClaimedEntries[1].Values[0].Value); + } + + [Fact] + public void StreamAutoClaim_ClaimsSingleMessageWithCountOption() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + } + + [Fact] + public void StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnly() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaimIdsOnly(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.True(result.ClaimedIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnlyAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimIdsOnlyAsync(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.True(result.ClaimedIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_IncludesDeletedMessageId() + { + using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Delete one of the messages, it should be included in the deleted message ID array. + db.StreamDelete(key, new RedisValue[] { messageIds[0] }); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", count: 1); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.True(result.DeletedIds.Length == 1); + Assert.Equal(messageIds[0], result.DeletedIds[0]); + } + + [Fact] + public async Task StreamAutoClaim_IncludesDeletedMessageIdAsync() + { + using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Delete one of the messages, it should be included in the deleted message ID array. + db.StreamDelete(key, new RedisValue[] { messageIds[0] }); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", count: 1); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.True(result.DeletedIds.Length == 1); + Assert.Equal(messageIds[0], result.DeletedIds[0]); + } + + [Fact] + public void StreamAutoClaim_NoMessagesToClaim() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup"; + + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // **Don't add any messages to the stream** + + // Claim any pending messages (there aren't any) and reassign them to consumer2. + var result = db.StreamAutoClaim(key, group, "consumer1", 0, "0-0"); + + // Claimed entries should be empty + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_NoMessagesToClaimAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup"; + + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // **Don't add any messages to the stream** + + // Claim any pending messages (there aren't any) and reassign them to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, "consumer1", 0, "0-0"); + + // Claimed entries should be empty + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_NoMessageMeetsMinIdleTime() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim messages idle for more than 5 minutes, should return an empty array. + var result = db.StreamAutoClaim(key, group, consumer2, 300000, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_NoMessageMeetsMinIdleTimeAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim messages idle for more than 5 minutes, should return an empty array. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 300000, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_ReturnsMessageIdOnly() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = db.StreamAutoClaimIdsOnly(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Equal(messageIds[1], result.ClaimedIds[1]); + } + + [Fact] + public async Task StreamAutoClaim_ReturnsMessageIdOnlyAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = await db.StreamAutoClaimIdsOnlyAsync(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Equal(messageIds[1], result.ClaimedIds[1]); + } + + private RedisValue[] StreamAutoClaim_PrepareTestData(IDatabase db, RedisKey key, RedisValue group, RedisValue consumer) + { + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // Add some messages + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + + // Read the messages into the "c1" + db.StreamReadGroup(key, group, consumer); + + return new RedisValue[2] { id1, id2 }; + } + [Fact] public void StreamConsumerGroupSetId() { From 19240eafcab01582c9688487dc0f0019a86defaf Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Tue, 19 Apr 2022 12:12:36 -0400 Subject: [PATCH 15/15] Docs updates --- .../APITypes/StreamAutoClaimIdsOnlyResult.cs | 2 +- src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs | 2 +- src/StackExchange.Redis/Interfaces/IDatabase.cs | 4 ++-- src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs index b4960d0ed..114763129 100644 --- a/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs +++ b/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs @@ -3,7 +3,7 @@ namespace StackExchange.Redis; /// -/// Result of the XAUTOCLAIM command with the JUSTID option. +/// Result of the XAUTOCLAIM command with the JUSTID option. /// public readonly struct StreamAutoClaimIdsOnlyResult { diff --git a/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs b/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs index de900739b..09f607f3d 100644 --- a/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs +++ b/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs @@ -3,7 +3,7 @@ namespace StackExchange.Redis; /// -/// Result of the XAUTOCLAIM command. +/// Result of the XAUTOCLAIM command. /// public readonly struct StreamAutoClaimResult { diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 78221ec69..16975beb5 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2175,7 +2175,7 @@ IEnumerable SortedSetScan(RedisKey key, /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . - /// https://redis.io/commands/xautoclaim + /// StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); /// @@ -2191,7 +2191,7 @@ IEnumerable SortedSetScan(RedisKey key, /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . - /// https://redis.io/commands/xautoclaim + /// StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); /// diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 01178ab08..846e5ba2f 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -2128,7 +2128,7 @@ IAsyncEnumerable SortedSetScanAsync(RedisKey key, /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . - /// https://redis.io/commands/xautoclaim + /// Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); /// @@ -2144,7 +2144,7 @@ IAsyncEnumerable SortedSetScanAsync(RedisKey key, /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. /// The flags to use for this operation. /// An instance of . - /// https://redis.io/commands/xautoclaim + /// Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); ///