Skip to content

Support the XAUTOCLAIM command. #2095

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d3e19b3
Implement XAUTOCLAIM.
ttingen Apr 14, 2022
10f6a6b
Add deleted message IDs to the result (available in 7.0).
ttingen Apr 14, 2022
de1fd67
Added tests.
ttingen Apr 15, 2022
1b691ae
Merge branch 'StackExchange:main' into xautoclaim
ttingen Apr 15, 2022
7e18e19
Add release notes.
ttingen Apr 15, 2022
52ef6e7
Split the JUSTID option into a separate method and result.
ttingen Apr 16, 2022
1f2f070
Updated release notes with separate "IdsOnly" methods.
ttingen Apr 16, 2022
3d648f3
Merge remote-tracking branch 'origin/main' into pr/2095
NickCraver Apr 16, 2022
44f62a1
Renames, API moves, and simplify result processor
NickCraver Apr 16, 2022
a6c0396
Tidy API and add default results
NickCraver Apr 16, 2022
a24d767
Add a null key test for autoclaim
NickCraver Apr 16, 2022
5f2308c
Normalize processor, make docs a but more surfaced
NickCraver Apr 16, 2022
e1c8e87
Normalize to #2096
NickCraver Apr 16, 2022
a77fb43
Clean up comments from copy/paste.
ttingen Apr 16, 2022
a452921
Merge remote-tracking branch 'origin/main' into pr/2095
NickCraver Apr 17, 2022
3603627
Fix naming
NickCraver Apr 17, 2022
ed6fcdc
Merge branch 'xautoclaim' of https://github.com/ttingen/StackExchange…
NickCraver Apr 17, 2022
e3f2e06
Merge fix
NickCraver Apr 17, 2022
0d5fa0b
Merge remote-tracking branch 'origin/main' into pr/2095
NickCraver Apr 19, 2022
6786738
Merge remote-tracking branch 'origin/main' into pr/2095
NickCraver Apr 19, 2022
6edff01
Merge branch 'main' into pr/2095
NickCraver Apr 19, 2022
19240ea
Docs updates
NickCraver Apr 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- Adds: Support for `GEOSEARCH` with `.GeoSearch()`/`.GeoSearchAsync()` ([#2089 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2089))
- Adds: Support for `GEOSEARCHSTORE` with `.GeoSearchAndStore()`/`.GeoSearchAndStoreAsync()` ([#2089 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2089))
- Adds: Support for `HRANDFIELD` with `.HashRandomField()`/`.HashRandomFieldAsync()`, `.HashRandomFields()`/`.HashRandomFieldsAsync()`, and `.HashRandomFieldsWithValues()`/`.HashRandomFieldsWithValuesAsync()` ([#2090 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2090))
- Adds: Support for `XAUTOCLAIM` with `.StreamAutoClaim()`/.`StreamAutoClaimAsync()` and `.StreamAutoClaimIdsOnly()`/.`StreamAutoClaimIdsOnlyAsync()` ([#2095 by ttingen](https://github.com/StackExchange/StackExchange.Redis/pull/2095))

## 2.5.61

Expand Down
41 changes: 41 additions & 0 deletions src/StackExchange.Redis/APITypes/StreamAutoClaimIdsOnlyResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;

namespace StackExchange.Redis;

/// <summary>
/// Result of the <see href="https://redis.io/commands/xautoclaim/">XAUTOCLAIM</see> command with the <c>JUSTID</c> option.
/// </summary>
public readonly struct StreamAutoClaimIdsOnlyResult
{
internal StreamAutoClaimIdsOnlyResult(RedisValue nextStartId, RedisValue[] claimedIds, RedisValue[] deletedIds)
{
NextStartId = nextStartId;
ClaimedIds = claimedIds;
DeletedIds = deletedIds;
}

/// <summary>
/// A null <see cref="StreamAutoClaimIdsOnlyResult"/>, indicating no results.
/// </summary>
public static StreamAutoClaimIdsOnlyResult Null { get; } = new StreamAutoClaimIdsOnlyResult(RedisValue.Null, Array.Empty<RedisValue>(), Array.Empty<RedisValue>());

/// <summary>
/// Whether this object is null/empty.
/// </summary>
public bool IsNull => NextStartId.IsNull && ClaimedIds == Array.Empty<RedisValue>() && DeletedIds == Array.Empty<RedisValue>();

/// <summary>
/// The stream ID to be used in the next call to StreamAutoClaim.
/// </summary>
public RedisValue NextStartId { get; }

/// <summary>
/// Array of IDs claimed by the command.
/// </summary>
public RedisValue[] ClaimedIds { get; }

/// <summary>
/// Array of message IDs deleted from the stream.
/// </summary>
public RedisValue[] DeletedIds { get; }
}
41 changes: 41 additions & 0 deletions src/StackExchange.Redis/APITypes/StreamAutoClaimResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;

namespace StackExchange.Redis;

/// <summary>
/// Result of the <see href="https://redis.io/commands/xautoclaim/">XAUTOCLAIM</see> command.
/// </summary>
public readonly struct StreamAutoClaimResult
{
internal StreamAutoClaimResult(RedisValue nextStartId, StreamEntry[] claimedEntries, RedisValue[] deletedIds)
{
NextStartId = nextStartId;
ClaimedEntries = claimedEntries;
DeletedIds = deletedIds;
}

/// <summary>
/// A null <see cref="StreamAutoClaimResult"/>, indicating no results.
/// </summary>
public static StreamAutoClaimResult Null { get; } = new StreamAutoClaimResult(RedisValue.Null, Array.Empty<StreamEntry>(), Array.Empty<RedisValue>());

/// <summary>
/// Whether this object is null/empty.
/// </summary>
public bool IsNull => NextStartId.IsNull && ClaimedEntries == Array.Empty<StreamEntry>() && DeletedIds == Array.Empty<RedisValue>();

/// <summary>
/// The stream ID to be used in the next call to StreamAutoClaim.
/// </summary>
public RedisValue NextStartId { get; }

/// <summary>
/// An array of <see cref="StreamEntry"/> for the successfully claimed entries.
/// </summary>
public StreamEntry[] ClaimedEntries { get; }

/// <summary>
/// An array of message IDs deleted from the stream.
/// </summary>
public RedisValue[] DeletedIds { get; }
}
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ internal enum RedisCommand

XACK,
XADD,
XAUTOCLAIM,
XCLAIM,
XDEL,
XGROUP,
Expand Down Expand Up @@ -324,6 +325,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.UNLINK:
case RedisCommand.XACK:
case RedisCommand.XADD:
case RedisCommand.XAUTOCLAIM:
case RedisCommand.XCLAIM:
case RedisCommand.XDEL:
case RedisCommand.XGROUP:
Expand Down
31 changes: 31 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,37 @@ IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key,
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// Messages that have been idle for more than <paramref name="minIdleTimeInMs"/> will be claimed.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the messages(s).</param>
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
/// <param name="startAtId">The starting ID to scan for pending messages that have an idle time greater than <paramref name="minIdleTimeInMs"/>.</param>
/// <param name="count">The upper limit of the number of entries that the command attempts to claim. If <see langword="null"/>, Redis will default the value to 100.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamAutoClaimResult"/>.</returns>
/// <remarks><seealso href="https://redis.io/commands/xautoclaim"/></remarks>
StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// Messages that have been idle for more than <paramref name="minIdleTimeInMs"/> will be claimed.
/// The result will contain the claimed message IDs instead of a <see cref="StreamEntry"/> instance.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the messages(s).</param>
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
/// <param name="startAtId">The starting ID to scan for pending messages that have an idle time greater than <paramref name="minIdleTimeInMs"/>.</param>
/// <param name="count">The upper limit of the number of entries that the command attempts to claim. If <see langword="null"/>, Redis will default the value to 100.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamAutoClaimIdsOnlyResult"/>.</returns>
/// <remarks><seealso href="https://redis.io/commands/xautoclaim"/></remarks>
StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// This method returns the complete message for the claimed message(s).
Expand Down
31 changes: 31 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,37 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(RedisKey key,
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// Messages that have been idle for more than <paramref name="minIdleTimeInMs"/> will be claimed.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the messages(s).</param>
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
/// <param name="startAtId">The starting ID to scan for pending messages that have an idle time greater than <paramref name="minIdleTimeInMs"/>.</param>
/// <param name="count">The upper limit of the number of entries that the command attempts to claim. If <see langword="null"/>, Redis will default the value to 100.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamAutoClaimResult"/>.</returns>
/// <remarks><seealso href="https://redis.io/commands/xautoclaim"/></remarks>
Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// Messages that have been idle for more than <paramref name="minIdleTimeInMs"/> will be claimed.
/// The result will contain the claimed message IDs instead of a <see cref="StreamEntry"/> instance.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="consumerGroup">The consumer group.</param>
/// <param name="claimingConsumer">The consumer claiming the messages(s).</param>
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
/// <param name="startAtId">The starting ID to scan for pending messages that have an idle time greater than <paramref name="minIdleTimeInMs"/>.</param>
/// <param name="count">The upper limit of the number of entries that the command attempts to claim. If <see langword="null"/>, Redis will default the value to 100.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamAutoClaimIdsOnlyResult"/>.</returns>
/// <remarks><seealso href="https://redis.io/commands/xautoclaim"/></remarks>
Task<StreamAutoClaimIdsOnlyResult> StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// This method returns the complete message for the claimed message(s).
Expand Down
6 changes: 6 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/DatabaseWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);

public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);

public StreamAutoClaimIdsOnlyResult StreamAutoClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);

public StreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);

Expand Down
6 changes: 6 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/WrapperBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,12 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, Red
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);

public Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);

public Task<StreamAutoClaimIdsOnlyResult> StreamAutoClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);

public Task<StreamEntry[]> StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags);

Expand Down
Loading