diff --git a/command.go b/command.go index 3253af6cc..5fa347f43 100644 --- a/command.go +++ b/command.go @@ -2104,7 +2104,9 @@ type XInfoGroup struct { Pending int64 LastDeliveredID string EntriesRead int64 - Lag int64 + // Lag represents the number of pending messages in the stream not yet + // delivered to this consumer group. Returns -1 when the lag cannot be determined. + Lag int64 } var _ Cmder = (*XInfoGroupsCmd)(nil) @@ -2187,8 +2189,11 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error { // lag: the number of entries in the stream that are still waiting to be delivered // to the group's consumers, or a NULL(Nil) when that number can't be determined. + // In that case, we return -1. if err != nil && err != Nil { return err + } else if err == Nil { + group.Lag = -1 } default: return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key) diff --git a/commands_test.go b/commands_test.go index 8b2aa37d4..5256a6fbf 100644 --- a/commands_test.go +++ b/commands_test.go @@ -6772,6 +6772,36 @@ var _ = Describe("Commands", func() { })) }) + It("should return -1 for nil lag in XINFO GROUPS", func() { + _, err := client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}}).Result() + Expect(err).NotTo(HaveOccurred()) + + client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}}) + Expect(err).NotTo(HaveOccurred()) + client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}}) + Expect(err).NotTo(HaveOccurred()) + + err = client.XGroupCreate(ctx, "s", "g", "0").Err() + Expect(err).NotTo(HaveOccurred()) + err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err() + Expect(err).NotTo(HaveOccurred()) + + client.XDel(ctx, "s", "0-2") + + res, err := client.XInfoGroups(ctx, "s").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XInfoGroup{ + { + Name: "g", + Consumers: 1, + Pending: 1, + LastDeliveredID: "0-1", + EntriesRead: 1, + Lag: -1, // nil lag from Redis is reported as -1 + }, + })) + }) + It("should XINFO CONSUMERS", func() { res, err := client.XInfoConsumers(ctx, "stream", "group1").Result() Expect(err).NotTo(HaveOccurred())