diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 857cd5166..8cec3ee88 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -23,6 +23,7 @@ - Adds: Support for `GEOSEARCH` with `.GeoSearch()`/`.GeoSearchAsync()` ([#2089 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2089)) - Adds: Support for `GEOSEARCHSTORE` with `.GeoSearchAndStore()`/`.GeoSearchAndStoreAsync()` ([#2089 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2089)) - Adds: Support for `HRANDFIELD` with `.HashRandomField()`/`.HashRandomFieldAsync()`, `.HashRandomFields()`/`.HashRandomFieldsAsync()`, and `.HashRandomFieldsWithValues()`/`.HashRandomFieldsWithValuesAsync()` ([#2090 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2090)) +- Adds: Support for `XAUTOCLAIM` with `.StreamAutoClaim()`/.`StreamAutoClaimAsync()` and `.StreamAutoClaimIdsOnly()`/.`StreamAutoClaimIdsOnlyAsync()` ([#2095 by ttingen](https://github.com/StackExchange/StackExchange.Redis/pull/2095)) ## 2.5.61 diff --git a/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs b/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs new file mode 100644 index 000000000..114763129 --- /dev/null +++ b/src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs @@ -0,0 +1,41 @@ +using System; + +namespace StackExchange.Redis; + +/// +/// Result of the XAUTOCLAIM command with the JUSTID option. +/// +public readonly struct StreamAutoClaimIdsOnlyResult +{ + internal StreamAutoClaimIdsOnlyResult(RedisValue nextStartId, RedisValue[] claimedIds, RedisValue[] deletedIds) + { + NextStartId = nextStartId; + ClaimedIds = claimedIds; + DeletedIds = deletedIds; + } + + /// + /// A null , indicating no results. + /// + public static StreamAutoClaimIdsOnlyResult Null { get; } = new StreamAutoClaimIdsOnlyResult(RedisValue.Null, Array.Empty(), Array.Empty()); + + /// + /// Whether this object is null/empty. + /// + public bool IsNull => NextStartId.IsNull && ClaimedIds == Array.Empty() && DeletedIds == Array.Empty(); + + /// + /// The stream ID to be used in the next call to StreamAutoClaim. + /// + public RedisValue NextStartId { get; } + + /// + /// Array of IDs claimed by the command. + /// + public RedisValue[] ClaimedIds { get; } + + /// + /// Array of message IDs deleted from the stream. + /// + public RedisValue[] DeletedIds { get; } +} diff --git a/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs b/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs new file mode 100644 index 000000000..09f607f3d --- /dev/null +++ b/src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs @@ -0,0 +1,41 @@ +using System; + +namespace StackExchange.Redis; + +/// +/// Result of the XAUTOCLAIM command. +/// +public readonly struct StreamAutoClaimResult +{ + internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries, RedisValue[] deletedIds) + { + NextStartId = nextStartId; + ClaimedEntries = claimedEntries; + DeletedIds = deletedIds; + } + + /// + /// A null , indicating no results. + /// + public static StreamAutoClaimResult Null { get; } = new StreamAutoClaimResult(RedisValue.Null, Array.Empty(), Array.Empty()); + + /// + /// Whether this object is null/empty. + /// + public bool IsNull => NextStartId.IsNull && ClaimedEntries == Array.Empty() && DeletedIds == Array.Empty(); + + /// + /// The stream ID to be used in the next call to StreamAutoClaim. + /// + public RedisValue NextStartId { get; } + + /// + /// An array of for the successfully claimed entries. + /// + public StreamEntry[] ClaimedEntries { get; } + + /// + /// An array of message IDs deleted from the stream. + /// + public RedisValue[] DeletedIds { get; } +} diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs index e2cc9603c..6ec77ea01 100644 --- a/src/StackExchange.Redis/Enums/RedisCommand.cs +++ b/src/StackExchange.Redis/Enums/RedisCommand.cs @@ -189,6 +189,7 @@ internal enum RedisCommand XACK, XADD, + XAUTOCLAIM, XCLAIM, XDEL, XGROUP, @@ -324,6 +325,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command) case RedisCommand.UNLINK: case RedisCommand.XACK: case RedisCommand.XADD: + case RedisCommand.XAUTOCLAIM: case RedisCommand.XCLAIM: case RedisCommand.XDEL: case RedisCommand.XGROUP: diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index f95c06325..16975beb5 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2163,6 +2163,37 @@ IEnumerable SortedSetScan(RedisKey key, /// RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// The flags to use for this operation. + /// An instance of . + /// + StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. + /// The result will contain the claimed message IDs instead of a instance. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// The flags to use for this operation. + /// An instance of . + /// + StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); + /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. /// This method returns the complete message for the claimed message(s). diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 3400010cf..846e5ba2f 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -2116,6 +2116,37 @@ IAsyncEnumerable SortedSetScanAsync(RedisKey key, /// Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// The flags to use for this operation. + /// An instance of . + /// + Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. + /// Messages that have been idle for more than will be claimed. + /// The result will contain the claimed message IDs instead of a instance. + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the messages(s). + /// The minimum idle time threshold for pending messages to be claimed. + /// The starting ID to scan for pending messages that have an idle time greater than . + /// The upper limit of the number of entries that the command attempts to claim. If , Redis will default the value to 100. + /// The flags to use for this operation. + /// An instance of . + /// + Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); + /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. /// This method returns the complete message for the claimed message(s). diff --git a/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs b/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs index 7def559b2..d08834c08 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs @@ -496,6 +496,12 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); + + public StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); + public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs b/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs index 5a1437e33..0ccaed1fa 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs @@ -513,6 +513,12 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); + + public Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamAutoClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); + public Task StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); diff --git a/src/StackExchange.Redis/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI.Shipped.txt index 7b4dfb2df..8347638de 100644 --- a/src/StackExchange.Redis/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI.Shipped.txt @@ -654,6 +654,8 @@ StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAutoClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimResult +StackExchange.Redis.IDatabase.StreamAutoClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimIdsOnlyResult StackExchange.Redis.IDatabase.StreamClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue[]! StackExchange.Redis.IDatabase.StreamConsumerGroupSetPosition(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue position, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool @@ -865,6 +867,8 @@ StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.Re StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAutoClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAutoClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamConsumerGroupSetPositionAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue position, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! @@ -1389,6 +1393,18 @@ StackExchange.Redis.SortedSetOrder.ByScore = 1 -> StackExchange.Redis.SortedSetO StackExchange.Redis.SortType StackExchange.Redis.SortType.Alphabetic = 1 -> StackExchange.Redis.SortType StackExchange.Redis.SortType.Numeric = 0 -> StackExchange.Redis.SortType +StackExchange.Redis.StreamAutoClaimIdsOnlyResult +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.ClaimedIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.IsNull.get -> bool +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.NextStartId.get -> StackExchange.Redis.RedisValue +StackExchange.Redis.StreamAutoClaimIdsOnlyResult.StreamAutoClaimIdsOnlyResult() -> void +StackExchange.Redis.StreamAutoClaimResult +StackExchange.Redis.StreamAutoClaimResult.ClaimedEntries.get -> StackExchange.Redis.StreamEntry[]! +StackExchange.Redis.StreamAutoClaimResult.DeletedIds.get -> StackExchange.Redis.RedisValue[]! +StackExchange.Redis.StreamAutoClaimResult.IsNull.get -> bool +StackExchange.Redis.StreamAutoClaimResult.NextStartId.get -> StackExchange.Redis.RedisValue +StackExchange.Redis.StreamAutoClaimResult.StreamAutoClaimResult() -> void StackExchange.Redis.StreamConsumer StackExchange.Redis.StreamConsumer.Name.get -> StackExchange.Redis.RedisValue StackExchange.Redis.StreamConsumer.PendingMessageCount.get -> int @@ -1656,6 +1672,8 @@ static StackExchange.Redis.SortedSetEntry.implicit operator StackExchange.Redis. static StackExchange.Redis.SortedSetEntry.implicit operator System.Collections.Generic.KeyValuePair(StackExchange.Redis.SortedSetEntry value) -> System.Collections.Generic.KeyValuePair static StackExchange.Redis.SortedSetEntry.operator !=(StackExchange.Redis.SortedSetEntry x, StackExchange.Redis.SortedSetEntry y) -> bool static StackExchange.Redis.SortedSetEntry.operator ==(StackExchange.Redis.SortedSetEntry x, StackExchange.Redis.SortedSetEntry y) -> bool +static StackExchange.Redis.StreamAutoClaimIdsOnlyResult.Null.get -> StackExchange.Redis.StreamAutoClaimIdsOnlyResult +static StackExchange.Redis.StreamAutoClaimResult.Null.get -> StackExchange.Redis.StreamAutoClaimResult static StackExchange.Redis.StreamEntry.Null.get -> StackExchange.Redis.StreamEntry static StackExchange.Redis.StreamPosition.Beginning.get -> StackExchange.Redis.RedisValue static StackExchange.Redis.StreamPosition.NewMessages.get -> StackExchange.Redis.RedisValue diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 7cdf173a3..7d05e46da 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2223,6 +2223,30 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair return ExecuteAsync(msg, ResultProcessor.RedisValue); } + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: false, flags); + return ExecuteSync(msg, ResultProcessor.StreamAutoClaim, defaultValue: StreamAutoClaimResult.Null); + } + + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: false, flags); + return ExecuteAsync(msg, ResultProcessor.StreamAutoClaim, defaultValue: StreamAutoClaimResult.Null); + } + + public StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: true, flags); + return ExecuteSync(msg, ResultProcessor.StreamAutoClaimIdsOnly, defaultValue: StreamAutoClaimIdsOnlyResult.Null); + } + + public Task StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: true, flags); + return ExecuteAsync(msg, ResultProcessor.StreamAutoClaimIdsOnly, defaultValue: StreamAutoClaimIdsOnlyResult.Null); + } + public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) { var msg = GetStreamClaimMessage(key, @@ -3731,6 +3755,32 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe return Message.Create(Database, flags, RedisCommand.XADD, key, values); } + private Message GetStreamAutoClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count, bool idsOnly, CommandFlags flags) + { + // XAUTOCLAIM [COUNT count] [JUSTID] + var values = new RedisValue[4 + (count is null ? 0 : 2) + (idsOnly ? 1 : 0)]; + + var offset = 0; + + values[offset++] = consumerGroup; + values[offset++] = assignToConsumer; + values[offset++] = minIdleTimeInMs; + values[offset++] = startAtId; + + if (count is not null) + { + values[offset++] = StreamConstants.Count; + values[offset++] = count.Value; + } + + if (idsOnly) + { + values[offset++] = StreamConstants.JustId; + } + + return Message.Create(Database, flags, RedisCommand.XAUTOCLAIM, key, values); + } + private Message GetStreamClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue[] messageIds, bool returnJustIds, CommandFlags flags) { if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index f75c7045a..9ae426e7d 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -122,6 +122,12 @@ public static readonly SingleStreamProcessor public static readonly SingleStreamProcessor SingleStreamWithNameSkip = new SingleStreamProcessor(skipStreamName: true); + public static readonly StreamAutoClaimProcessor + StreamAutoClaim = new StreamAutoClaimProcessor(); + + public static readonly StreamAutoClaimIdsOnlyProcessor + StreamAutoClaimIdsOnly = new StreamAutoClaimIdsOnlyProcessor(); + public static readonly StreamConsumerInfoProcessor StreamConsumerInfo = new StreamConsumerInfoProcessor(); @@ -1776,6 +1782,64 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + /// + /// This processor is for *without* the option. + /// + internal sealed class StreamAutoClaimProcessor : StreamProcessorBase + { + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + // See https://redis.io/commands/xautoclaim for command documentation. + // Note that the result should never be null, so intentionally treating it as a failure to parse here + if (result.Type == ResultType.MultiBulk && !result.IsNull) + { + var items = result.GetItems(); + + // [0] The next start ID. + var nextStartId = items[0].AsRedisValue(); + // [1] The array of StreamEntry's. + var entries = ParseRedisStreamEntries(items[1]); + // [2] The array of message IDs deleted from the stream that were in the PEL. + // This is not available in 6.2 so we need to be defensive when reading this part of the response. + var deletedIds = (items.Length == 3 ? items[2].GetItemsAsValues() : null) ?? Array.Empty(); + + SetResult(message, new StreamAutoClaimResult(nextStartId, entries, deletedIds)); + return true; + } + + return false; + } + } + + /// + /// This processor is for *with* the option. + /// + internal sealed class StreamAutoClaimIdsOnlyProcessor : ResultProcessor + { + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + // See https://redis.io/commands/xautoclaim for command documentation. + // Note that the result should never be null, so intentionally treating it as a failure to parse here + if (result.Type == ResultType.MultiBulk && !result.IsNull) + { + var items = result.GetItems(); + + // [0] The next start ID. + var nextStartId = items[0].AsRedisValue(); + // [1] The array of claimed message IDs. + var claimedIds = items[1].GetItemsAsValues() ?? Array.Empty(); + // [2] The array of message IDs deleted from the stream that were in the PEL. + // This is not available in 6.2 so we need to be defensive when reading this part of the response. + var deletedIds = (items.Length == 3 ? items[2].GetItemsAsValues() : null) ?? Array.Empty(); + + SetResult(message, new StreamAutoClaimIdsOnlyResult(nextStartId, claimedIds, deletedIds)); + return true; + } + + return false; + } + } + internal sealed class StreamConsumerInfoProcessor : InterleavedStreamInfoProcessorBase { protected override StreamConsumerInfo ParseItem(in RawResult result) diff --git a/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs b/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs index e51805da3..f73162634 100644 --- a/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs +++ b/tests/StackExchange.Redis.Tests/DatabaseWrapperTests.cs @@ -1010,6 +1010,20 @@ public void StreamAdd_2() mock.Verify(_ => _.StreamAdd("prefix:key", fields, "*", 1000, true, CommandFlags.None)); } + [Fact] + public void StreamAutoClaim() + { + wrapper.StreamAutoClaim("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaim("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); + } + + [Fact] + public void StreamAutoClaimIdsOnly() + { + wrapper.StreamAutoClaimIdsOnly("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaimIdsOnly("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); + } + [Fact] public void StreamClaimMessages() { diff --git a/tests/StackExchange.Redis.Tests/Streams.cs b/tests/StackExchange.Redis.Tests/Streams.cs index c946b1cf7..b24d7cfd7 100644 --- a/tests/StackExchange.Redis.Tests/Streams.cs +++ b/tests/StackExchange.Redis.Tests/Streams.cs @@ -191,6 +191,389 @@ public void StreamAddMultipleValuePairsWithManualId() Assert.Equal(id, entries[0].Id); } + [Fact] + public async Task StreamAutoClaim_MissingKey() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer = "consumer"; + + db.KeyDelete(key); + + var ex = Assert.Throws(() => db.StreamAutoClaim(key, group, consumer, 0, "0-0")); + Assert.StartsWith("NOGROUP No such key", ex.Message); + + ex = await Assert.ThrowsAsync(() => db.StreamAutoClaimAsync(key, group, consumer, 0, "0-0")); + Assert.StartsWith("NOGROUP No such key", ex.Message); + } + + [Fact] + public void StreamAutoClaim_ClaimsPendingMessages() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + Assert.Equal("value2", result.ClaimedEntries[1].Values[0].Value); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsPendingMessagesAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 2); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + Assert.Equal("value2", result.ClaimedEntries[1].Values[0].Value); + } + + [Fact] + public void StreamAutoClaim_ClaimsSingleMessageWithCountOption() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + } + + [Fact] + public void StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnly() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaimIdsOnly(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.True(result.ClaimedIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.Equal("value1", result.ClaimedEntries[0].Values[0].Value); + } + + [Fact] + public async Task StreamAutoClaim_ClaimsSingleMessageWithCountOptionIdsOnlyAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimIdsOnlyAsync(key, group, consumer2, 0, "0-0", count: 1); + + // Should be the second message ID from the call to prepare. + Assert.Equal(messageIds[1], result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.True(result.ClaimedIds.Length == 1); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_IncludesDeletedMessageId() + { + using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Delete one of the messages, it should be included in the deleted message ID array. + db.StreamDelete(key, new RedisValue[] { messageIds[0] }); + + // Claim a single pending message and reassign it to consumer2. + var result = db.StreamAutoClaim(key, group, consumer2, 0, "0-0", count: 1); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.True(result.DeletedIds.Length == 1); + Assert.Equal(messageIds[0], result.DeletedIds[0]); + } + + [Fact] + public async Task StreamAutoClaim_IncludesDeletedMessageIdAsync() + { + using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Delete one of the messages, it should be included in the deleted message ID array. + db.StreamDelete(key, new RedisValue[] { messageIds[0] }); + + // Claim a single pending message and reassign it to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 0, "0-0", count: 1); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedEntries); + Assert.NotEmpty(result.DeletedIds); + Assert.True(result.ClaimedEntries.Length == 1); + Assert.True(result.DeletedIds.Length == 1); + Assert.Equal(messageIds[0], result.DeletedIds[0]); + } + + [Fact] + public void StreamAutoClaim_NoMessagesToClaim() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup"; + + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // **Don't add any messages to the stream** + + // Claim any pending messages (there aren't any) and reassign them to consumer2. + var result = db.StreamAutoClaim(key, group, "consumer1", 0, "0-0"); + + // Claimed entries should be empty + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_NoMessagesToClaimAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup"; + + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // **Don't add any messages to the stream** + + // Claim any pending messages (there aren't any) and reassign them to consumer2. + var result = await db.StreamAutoClaimAsync(key, group, "consumer1", 0, "0-0"); + + // Claimed entries should be empty + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_NoMessageMeetsMinIdleTime() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim messages idle for more than 5 minutes, should return an empty array. + var result = db.StreamAutoClaim(key, group, consumer2, 300000, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public async Task StreamAutoClaim_NoMessageMeetsMinIdleTimeAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + _ = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim messages idle for more than 5 minutes, should return an empty array. + var result = await db.StreamAutoClaimAsync(key, group, consumer2, 300000, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.Empty(result.ClaimedEntries); + Assert.Empty(result.DeletedIds); + } + + [Fact] + public void StreamAutoClaim_ReturnsMessageIdOnly() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = db.StreamAutoClaimIdsOnly(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Equal(messageIds[1], result.ClaimedIds[1]); + } + + [Fact] + public async Task StreamAutoClaim_ReturnsMessageIdOnlyAsync() + { + using var conn = Create(require: RedisFeatures.v6_2_0); + + var key = Me(); + var db = conn.GetDatabase(); + const string group = "consumerGroup", + consumer1 = "c1", + consumer2 = "c2"; + + // Create Consumer Group, add messages, and read messages into a consumer. + var messageIds = StreamAutoClaim_PrepareTestData(db, key, group, consumer1); + + // Claim any pending messages and reassign them to consumer2. + var result = await db.StreamAutoClaimIdsOnlyAsync(key, group, consumer2, 0, "0-0"); + + Assert.Equal("0-0", result.NextStartId); + Assert.NotEmpty(result.ClaimedIds); + Assert.Empty(result.DeletedIds); + Assert.True(result.ClaimedIds.Length == 2); + Assert.Equal(messageIds[0], result.ClaimedIds[0]); + Assert.Equal(messageIds[1], result.ClaimedIds[1]); + } + + private RedisValue[] StreamAutoClaim_PrepareTestData(IDatabase db, RedisKey key, RedisValue group, RedisValue consumer) + { + // Create the group. + db.KeyDelete(key); + db.StreamCreateConsumerGroup(key, group, createStream: true); + + // Add some messages + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + + // Read the messages into the "c1" + db.StreamReadGroup(key, group, consumer); + + return new RedisValue[2] { id1, id2 }; + } + [Fact] public void StreamConsumerGroupSetId() { diff --git a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs index b29d36b1a..1d1058008 100644 --- a/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs +++ b/tests/StackExchange.Redis.Tests/WrapperBaseTests.cs @@ -943,6 +943,20 @@ public void StreamAddAsync_2() mock.Verify(_ => _.StreamAddAsync("prefix:key", fields, "*", 1000, true, CommandFlags.None)); } + [Fact] + public void StreamAutoClaimAsync() + { + wrapper.StreamAutoClaimAsync("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaimAsync("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); + } + + [Fact] + public void StreamAutoClaimIdsOnlyAsync() + { + wrapper.StreamAutoClaimIdsOnlyAsync("key", "group", "consumer", 0, "0-0", 100, CommandFlags.None); + mock.Verify(_ => _.StreamAutoClaimIdsOnlyAsync("prefix:key", "group", "consumer", 0, "0-0", 100, CommandFlags.None)); + } + [Fact] public void StreamClaimMessagesAsync() {