From 2affa1351224642f9086b778e3a38be47eb4ac28 Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Tue, 28 Mar 2023 12:13:10 +0530 Subject: [PATCH 1/3] feat: Adding support for CLUSTER SHARDS command --- cluster_test.go | 31 ++++++++++ command.go | 156 ++++++++++++++++++++++++++++++++++++++++++++++++ commands.go | 7 +++ 3 files changed, 194 insertions(+) diff --git a/cluster_test.go b/cluster_test.go index bfc1c2598..924716476 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -677,6 +677,37 @@ var _ = Describe("ClusterClient", func() { Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) }) + It("should CLUSTER SHARDS", func() { + res, err := client.ClusterShards(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).NotTo(BeEmpty()) + + // Iterate over the ClusterShard results and validate the fields. + for _, shard := range res { + Expect(shard.Slots).NotTo(BeEmpty()) + for _, slotRange := range shard.Slots { + Expect(slotRange.Start).To(BeNumerically(">=", 0)) + Expect(slotRange.End).To(BeNumerically(">=", slotRange.Start)) + } + + Expect(shard.Nodes).NotTo(BeEmpty()) + for _, node := range shard.Nodes { + Expect(node.ID).NotTo(BeEmpty()) + Expect(node.Endpoint).NotTo(BeEmpty()) + Expect(node.IP).NotTo(BeEmpty()) + Expect(node.Port).To(BeNumerically(">", 0)) + + validRoles := []string{"master", "slave", "replica"} + Expect(validRoles).To(ContainElement(node.Role)) + + Expect(node.ReplicationOffset).To(BeNumerically(">=", 0)) + + validHealthStatuses := []string{"online", "failed", "loading"} + Expect(validHealthStatuses).To(ContainElement(node.Health)) + } + } + }) + It("should CLUSTER LINKS", func() { res, err := client.ClusterLinks(ctx).Result() Expect(err).NotTo(HaveOccurred()) diff --git a/command.go b/command.go index 04aebe93d..d65491cfe 100644 --- a/command.go +++ b/command.go @@ -4335,3 +4335,159 @@ func (cmd *ClusterLinksCmd) readReply(rd *proto.Reader) error { return nil } + +// ------------------------------------------------------------------------------------------------------------------ + +type SlotRange struct { + Start int64 + End int64 +} + +type Node struct { + ID string + Endpoint string + IP string + Hostname string + Port int64 + TLSPort int64 + Role string + ReplicationOffset int64 + Health string +} + +type ClusterShard struct { + Slots []SlotRange + Nodes []Node +} + +type ClusterShardsCmd struct { + baseCmd + + val []ClusterShard +} + +var _ Cmder = (*ClusterShardsCmd)(nil) + +func NewClusterShardsCmd(ctx context.Context, args ...interface{}) *ClusterShardsCmd { + return &ClusterShardsCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *ClusterShardsCmd) SetVal(val []ClusterShard) { + cmd.val = val +} + +func (cmd *ClusterShardsCmd) Val() []ClusterShard { + return cmd.val +} + +func (cmd *ClusterShardsCmd) Result() ([]ClusterShard, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *ClusterShardsCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val = make([]ClusterShard, n) + + for i := 0; i < n; i++ { + m, err := rd.ReadArrayLen() + if err != nil { + return err + } + + slots := make([]SlotRange, 0) + nodes := make([]Node, 0) + + for j := 0; j < m; j++ { + key, err := rd.ReadString() + if err != nil { + return err + } + + switch key { + case "slots": + slotLen, err := rd.ReadArrayLen() + if err != nil { + return err + } + for k := 0; k < slotLen; k += 2 { + slotStart, err := rd.ReadInt() + if err != nil { + return err + } + slotEnd, err := rd.ReadInt() + if err != nil { + return err + } + slots = append(slots, SlotRange{Start: slotStart, End: slotEnd}) + } + case "nodes": + nodesLen, err := rd.ReadArrayLen() + if err != nil { + return err + } + nodes = make([]Node, nodesLen) + for k := 0; k < nodesLen; k++ { + nodeMapLen, err := rd.ReadMapLen() + if err != nil { + return err + } + + for l := 0; l < nodeMapLen; l++ { + nodeKey, err := rd.ReadString() + if err != nil { + return err + } + + switch nodeKey { + case "id": + nodes[k].ID, err = rd.ReadString() + case "endpoint": + nodes[k].Endpoint, err = rd.ReadString() + case "ip": + nodes[k].IP, err = rd.ReadString() + case "hostname": + nodes[k].Hostname, err = rd.ReadString() + case "port": + nodes[k].Port, err = rd.ReadInt() + case "tls-port": + nodes[k].TLSPort, err = rd.ReadInt() + case "role": + nodes[k].Role, err = rd.ReadString() + case "replication-offset": + nodes[k].ReplicationOffset, err = rd.ReadInt() + case "health": + nodes[k].Health, err = rd.ReadString() + default: + return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS node reply", nodeKey) + } + + if err != nil { + return err + } + } + } + default: + return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS reply", key) + } + } + + cmd.val[i] = ClusterShard{ + Slots: slots, + Nodes: nodes, + } + } + + return nil +} diff --git a/commands.go b/commands.go index a494fdd00..6a6455675 100644 --- a/commands.go +++ b/commands.go @@ -422,6 +422,7 @@ type Cmdable interface { PubSubShardNumSub(ctx context.Context, channels ...string) *MapStringIntCmd ClusterSlots(ctx context.Context) *ClusterSlotsCmd + ClusterShards(ctx context.Context) *ClusterShardsCmd ClusterLinks(ctx context.Context) *ClusterLinksCmd ClusterNodes(ctx context.Context) *StringCmd ClusterMeet(ctx context.Context, host, port string) *StatusCmd @@ -3498,6 +3499,12 @@ func (c cmdable) ClusterSlots(ctx context.Context) *ClusterSlotsCmd { return cmd } +func (c cmdable) ClusterShards(ctx context.Context) *ClusterShardsCmd { + cmd := NewClusterShardsCmd(ctx, "cluster", "shards") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ClusterLinks(ctx context.Context) *ClusterLinksCmd { cmd := NewClusterLinksCmd(ctx, "cluster", "links") _ = c(ctx, cmd) From ff0eda1b2f098af82f3fea2ea19ab9996e10de8c Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Tue, 28 Mar 2023 13:14:24 +0530 Subject: [PATCH 2/3] fix: read reply --- command.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/command.go b/command.go index d65491cfe..01af93e7a 100644 --- a/command.go +++ b/command.go @@ -4401,7 +4401,7 @@ func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { cmd.val = make([]ClusterShard, n) for i := 0; i < n; i++ { - m, err := rd.ReadArrayLen() + m, err := rd.ReadMapLen() if err != nil { return err } @@ -4417,20 +4417,23 @@ func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { switch key { case "slots": - slotLen, err := rd.ReadArrayLen() + l, err := rd.ReadArrayLen() if err != nil { return err } - for k := 0; k < slotLen; k += 2 { - slotStart, err := rd.ReadInt() + + for k := 0; k < l; k += 2 { + start, err := rd.ReadInt() if err != nil { return err } - slotEnd, err := rd.ReadInt() + + end, err := rd.ReadInt() if err != nil { return err } - slots = append(slots, SlotRange{Start: slotStart, End: slotEnd}) + + slots = append(slots, SlotRange{Start: start, End: end}) } case "nodes": nodesLen, err := rd.ReadArrayLen() From 1ecb2a5c3c3b23c60b6c800a1f65c088d2735c49 Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Tue, 28 Mar 2023 13:34:06 +0530 Subject: [PATCH 3/3] fix: avoid dupl mem alloc. --- command.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/command.go b/command.go index 01af93e7a..637e9cdfe 100644 --- a/command.go +++ b/command.go @@ -4406,9 +4406,6 @@ func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { return err } - slots := make([]SlotRange, 0) - nodes := make([]Node, 0) - for j := 0; j < m; j++ { key, err := rd.ReadString() if err != nil { @@ -4421,7 +4418,6 @@ func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { if err != nil { return err } - for k := 0; k < l; k += 2 { start, err := rd.ReadInt() if err != nil { @@ -4433,14 +4429,14 @@ func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { return err } - slots = append(slots, SlotRange{Start: start, End: end}) + cmd.val[i].Slots = append(cmd.val[i].Slots, SlotRange{Start: start, End: end}) } case "nodes": nodesLen, err := rd.ReadArrayLen() if err != nil { return err } - nodes = make([]Node, nodesLen) + cmd.val[i].Nodes = make([]Node, nodesLen) for k := 0; k < nodesLen; k++ { nodeMapLen, err := rd.ReadMapLen() if err != nil { @@ -4455,23 +4451,23 @@ func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { switch nodeKey { case "id": - nodes[k].ID, err = rd.ReadString() + cmd.val[i].Nodes[k].ID, err = rd.ReadString() case "endpoint": - nodes[k].Endpoint, err = rd.ReadString() + cmd.val[i].Nodes[k].Endpoint, err = rd.ReadString() case "ip": - nodes[k].IP, err = rd.ReadString() + cmd.val[i].Nodes[k].IP, err = rd.ReadString() case "hostname": - nodes[k].Hostname, err = rd.ReadString() + cmd.val[i].Nodes[k].Hostname, err = rd.ReadString() case "port": - nodes[k].Port, err = rd.ReadInt() + cmd.val[i].Nodes[k].Port, err = rd.ReadInt() case "tls-port": - nodes[k].TLSPort, err = rd.ReadInt() + cmd.val[i].Nodes[k].TLSPort, err = rd.ReadInt() case "role": - nodes[k].Role, err = rd.ReadString() + cmd.val[i].Nodes[k].Role, err = rd.ReadString() case "replication-offset": - nodes[k].ReplicationOffset, err = rd.ReadInt() + cmd.val[i].Nodes[k].ReplicationOffset, err = rd.ReadInt() case "health": - nodes[k].Health, err = rd.ReadString() + cmd.val[i].Nodes[k].Health, err = rd.ReadString() default: return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS node reply", nodeKey) } @@ -4485,11 +4481,6 @@ func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS reply", key) } } - - cmd.val[i] = ClusterShard{ - Slots: slots, - Nodes: nodes, - } } return nil