From baa21b62de240425fcbc8d105c466ac0c29c1913 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Sun, 10 Dec 2023 12:21:55 +0200 Subject: [PATCH 1/6] Add monitor command --- command.go | 71 ++++++++++++++++++++++++++++++++++++++++++ commands.go | 8 ++++- gears_commands_test.go | 14 +++++++++ 3 files changed, 92 insertions(+), 1 deletion(-) diff --git a/command.go b/command.go index c641a3fae..9e148fe1d 100644 --- a/command.go +++ b/command.go @@ -5381,3 +5381,74 @@ func (cmd *InfoCmd) Item(section, key string) string { return cmd.val[section][key] } } + +type MonitorCmd struct { + baseCmd + ch chan<- string +} + +func NewMonitorCmd(ctx context.Context, ch chan<- string) *MonitorCmd { + return &MonitorCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: []interface{}{ + "monitor", + }, + }, + ch: ch, + } +} + +// func (cmd *InfoCmd) SetVal(val map[string]map[string]string) { +// cmd.val = val +// } + +// func (cmd *InfoCmd) Val() map[string]map[string]string { +// return cmd.val +// } + +// func (cmd *InfoCmd) Result() (map[string]map[string]string, error) { +// return cmd.Val(), cmd.Err() +// } + +func (cmd *MonitorCmd) String() string { + return cmdString(cmd, nil) +} + +func (cmd *MonitorCmd) readReply(rd *proto.Reader) error { + + go func() { + for { + line, err := rd.ReadString() + if err != nil { + return + } + cmd.ch <- line + fmt.Println(line) + } + }() + return nil + +} + +func (cmd *MonitorCmd) Stop() { + close(cmd.ch) +} + +// func (cmd *MonitorCmd) WaitAndRecieve() *string { +// if cmd == nil { +// c.cmd = NewCmd(ctx) +// } + +// err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error { +// return c.cmd.readReply(rd) +// }) + +// c.releaseConnWithLock(ctx, cn, err, timeout > 0) + +// if err != nil { +// return nil, err +// } + +// return c.newMessage(c.cmd.Val()) +// } diff --git a/commands.go b/commands.go index 842cc23a3..5dae6d18c 100644 --- a/commands.go +++ b/commands.go @@ -204,7 +204,6 @@ type Cmdable interface { SlowLogGet(ctx context.Context, num int64) *SlowLogCmd Time(ctx context.Context) *TimeCmd DebugObject(ctx context.Context, key string) *StringCmd - MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd @@ -700,3 +699,10 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St _ = c(ctx, cmd) return cmd } + +func (c cmdable) Monitor(ctx context.Context, ch chan<- string) *MonitorCmd { + cmd := NewMonitorCmd(ctx, ch) + _ = c(ctx, cmd) + + return cmd +} diff --git a/gears_commands_test.go b/gears_commands_test.go index b1117a4dc..778ecaa80 100644 --- a/gears_commands_test.go +++ b/gears_commands_test.go @@ -111,4 +111,18 @@ var _ = Describe("RedisGears commands", Label("gears"), func() { Expect(err).NotTo(HaveOccurred()) Expect(resultAdd).To(BeEquivalentTo("bar")) }) + + It("should monitor", Label("mon", "monitor"), func() { + ress := make(chan string) + + client1 := redis.NewClient(&redis.Options{Addr: ":6379"}) + client1.Monitor(ctx, ress) + client.Set(ctx, "foo", "bar", 0) + client.Set(ctx, "bar", "baz", 0) + client.Set(ctx, "bap", 8, 0) + client.Get(ctx, "bap") + panic(<-ress) + // stop <- true + // Expect(<-ress).To(BeEquivalentTo("")) + }) }) From 25aa778afb797a226dcc8eeaa80b6826126871d5 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Mon, 11 Dec 2023 17:35:47 +0200 Subject: [PATCH 2/6] Add monitor commadn and tests --- command.go | 75 +++++++++++++++++------------------------- commands.go | 11 +++++-- gears_commands_test.go | 14 -------- monitor_test.go | 47 ++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 60 deletions(-) create mode 100644 monitor_test.go diff --git a/command.go b/command.go index 9e148fe1d..c6bbfbc8a 100644 --- a/command.go +++ b/command.go @@ -5382,73 +5382,60 @@ func (cmd *InfoCmd) Item(section, key string) string { } } +type MonitorStatus int + +const ( + MonitorStatusIdle MonitorStatus = iota + MonitorStatusStart + MonitorStatusStop +) + type MonitorCmd struct { baseCmd - ch chan<- string + ch chan string + status MonitorStatus } -func NewMonitorCmd(ctx context.Context, ch chan<- string) *MonitorCmd { +func NewMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { return &MonitorCmd{ baseCmd: baseCmd{ - ctx: ctx, - args: []interface{}{ - "monitor", - }, + ctx: ctx, + args: []interface{}{"monitor"}, }, - ch: ch, + ch: ch, + status: MonitorStatusIdle, } } -// func (cmd *InfoCmd) SetVal(val map[string]map[string]string) { -// cmd.val = val -// } - -// func (cmd *InfoCmd) Val() map[string]map[string]string { -// return cmd.val -// } - -// func (cmd *InfoCmd) Result() (map[string]map[string]string, error) { -// return cmd.Val(), cmd.Err() -// } - func (cmd *MonitorCmd) String() string { return cmdString(cmd, nil) } func (cmd *MonitorCmd) readReply(rd *proto.Reader) error { - - go func() { - for { + go cmd.readMonitor(rd) + return nil +} +func (cmd *MonitorCmd) readMonitor(rd *proto.Reader) error { + for cmd.status == MonitorStatusStart { + if pk, _ := rd.Peek(1); len(pk) != 0 { line, err := rd.ReadString() if err != nil { - return + return err } cmd.ch <- line - fmt.Println(line) } - }() + + } + if cmd.status == MonitorStatusStop { + close(cmd.ch) + } return nil +} +func (cmd *MonitorCmd) Start() { + cmd.status = MonitorStatusStart } func (cmd *MonitorCmd) Stop() { - close(cmd.ch) + cmd.status = MonitorStatusStop } - -// func (cmd *MonitorCmd) WaitAndRecieve() *string { -// if cmd == nil { -// c.cmd = NewCmd(ctx) -// } - -// err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error { -// return c.cmd.readReply(rd) -// }) - -// c.releaseConnWithLock(ctx, cn, err, timeout > 0) - -// if err != nil { -// return nil, err -// } - -// return c.newMessage(c.cmd.Val()) -// } diff --git a/commands.go b/commands.go index 5dae6d18c..468ba2ffd 100644 --- a/commands.go +++ b/commands.go @@ -700,9 +700,16 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St return cmd } -func (c cmdable) Monitor(ctx context.Context, ch chan<- string) *MonitorCmd { +// Monitor - represents a Redis MONITOR command, which allows you to capture +// and process all commands processed by a Redis server. Note that: +// -- using MONITOR blocks the connection to the server for itself, +// so it is recommended to createa separate connection dedicated to monitoring purposes. +// -- The provided channel for monitor output should be of type string. +// -- The MonitorCmd runs concurrently in the background. Use the Start method +// to initiate monitoring and the Stop method to terminate it. +// Redis MONITOR command: https://redis.io/commands/monitor +func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd { cmd := NewMonitorCmd(ctx, ch) _ = c(ctx, cmd) - return cmd } diff --git a/gears_commands_test.go b/gears_commands_test.go index 778ecaa80..b1117a4dc 100644 --- a/gears_commands_test.go +++ b/gears_commands_test.go @@ -111,18 +111,4 @@ var _ = Describe("RedisGears commands", Label("gears"), func() { Expect(err).NotTo(HaveOccurred()) Expect(resultAdd).To(BeEquivalentTo("bar")) }) - - It("should monitor", Label("mon", "monitor"), func() { - ress := make(chan string) - - client1 := redis.NewClient(&redis.Options{Addr: ":6379"}) - client1.Monitor(ctx, ress) - client.Set(ctx, "foo", "bar", 0) - client.Set(ctx, "bar", "baz", 0) - client.Set(ctx, "bap", 8, 0) - client.Get(ctx, "bap") - panic(<-ress) - // stop <- true - // Expect(<-ress).To(BeEquivalentTo("")) - }) }) diff --git a/monitor_test.go b/monitor_test.go new file mode 100644 index 000000000..20beec517 --- /dev/null +++ b/monitor_test.go @@ -0,0 +1,47 @@ +package redis_test + +import ( + "context" + "time" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + + "github.com/redis/go-redis/v9" +) + +var _ = Describe("Monitor command", Label("monitor"), func() { + ctx := context.TODO() + var client *redis.Client + + BeforeEach(func() { + client = redis.NewClient(&redis.Options{Addr: ":6379"}) + Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should monitor", Label("monitor"), func() { + ress := make(chan string) + client1 := redis.NewClient(&redis.Options{Addr: ":6379"}) + mn := client1.Monitor(ctx, ress) + mn.Start() + time.Sleep(100 * time.Millisecond) + client.Set(ctx, "foo", "bar", 0) + client.Set(ctx, "bar", "baz", 0) + client.Set(ctx, "bap", 8, 0) + client.Get(ctx, "bap") + lst := []string{} + for i := 0; i < 5; i++ { + s := <-ress + lst = append(lst, s) + } + mn.Stop() + Expect(lst[0]).To(ContainSubstring("OK")) + Expect(lst[1]).To(ContainSubstring("\"set\" \"foo\" \"bar\"")) + Expect(lst[2]).To(ContainSubstring("\"set\" \"bar\" \"baz\"")) + Expect(lst[3]).To(ContainSubstring("\"set\" \"bap\" \"8\"")) + }) +}) From 00504a042615956fec4e1d55a2cfb28d7f4e3dbf Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Tue, 12 Dec 2023 12:18:18 +0200 Subject: [PATCH 3/6] insure goroutine shutdown --- command.go | 39 +++++++++++++++++++++++++++------------ commands.go | 21 ++++++++++++--------- main_test.go | 5 +++++ monitor_test.go | 9 +++++---- 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/command.go b/command.go index c6bbfbc8a..a0095c1de 100644 --- a/command.go +++ b/command.go @@ -5385,9 +5385,9 @@ func (cmd *InfoCmd) Item(section, key string) string { type MonitorStatus int const ( - MonitorStatusIdle MonitorStatus = iota - MonitorStatusStart - MonitorStatusStop + monitorStatusIdle MonitorStatus = iota + monitorStatusStart + monitorStatusStop ) type MonitorCmd struct { @@ -5396,14 +5396,14 @@ type MonitorCmd struct { status MonitorStatus } -func NewMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { +func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { return &MonitorCmd{ baseCmd: baseCmd{ ctx: ctx, args: []interface{}{"monitor"}, }, ch: ch, - status: MonitorStatusIdle, + status: monitorStatusIdle, } } @@ -5412,11 +5412,26 @@ func (cmd *MonitorCmd) String() string { } func (cmd *MonitorCmd) readReply(rd *proto.Reader) error { - go cmd.readMonitor(rd) + ctx, cancel := context.WithCancel(cmd.ctx) + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + err := cmd.readMonitor(rd, cancel) + if err != nil { + cmd.err = err + return + } + } + } + }(ctx) return nil } -func (cmd *MonitorCmd) readMonitor(rd *proto.Reader) error { - for cmd.status == MonitorStatusStart { + +func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error { + for cmd.status == monitorStatusStart { if pk, _ := rd.Peek(1); len(pk) != 0 { line, err := rd.ReadString() if err != nil { @@ -5426,16 +5441,16 @@ func (cmd *MonitorCmd) readMonitor(rd *proto.Reader) error { } } - if cmd.status == MonitorStatusStop { - close(cmd.ch) + if cmd.status == monitorStatusStop { + cancel() } return nil } func (cmd *MonitorCmd) Start() { - cmd.status = MonitorStatusStart + cmd.status = monitorStatusStart } func (cmd *MonitorCmd) Stop() { - cmd.status = MonitorStatusStop + cmd.status = monitorStatusStop } diff --git a/commands.go b/commands.go index 468ba2ffd..546ebafb2 100644 --- a/commands.go +++ b/commands.go @@ -700,16 +700,19 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St return cmd } -// Monitor - represents a Redis MONITOR command, which allows you to capture -// and process all commands processed by a Redis server. Note that: -// -- using MONITOR blocks the connection to the server for itself, -// so it is recommended to createa separate connection dedicated to monitoring purposes. -// -- The provided channel for monitor output should be of type string. -// -- The MonitorCmd runs concurrently in the background. Use the Start method -// to initiate monitoring and the Stop method to terminate it. -// Redis MONITOR command: https://redis.io/commands/monitor +/* +Monitor - represents a Redis MONITOR command, allowing the user to capture +and process all commands sent to a Redis server. This mimics the behavior of +MONITOR in the redis-cli. + +Notes: +- Using MONITOR blocks the connection to the server for itself. It needs a dedicated connection +- The user should create a channel of type string +- This runs concurrently in the background. Trigger via the Start and Stop functions +See further: Redis MONITOR command: https://redis.io/commands/monitor +*/ func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd { - cmd := NewMonitorCmd(ctx, ch) + cmd := newMonitorCmd(ctx, ch) _ = c(ctx, cmd) return cmd } diff --git a/main_test.go b/main_test.go index 6aaaf1c08..1a1660c9d 100644 --- a/main_test.go +++ b/main_test.go @@ -41,6 +41,11 @@ var ( redisAddr = ":" + redisPort ) +var ( + rediStackPort = "6379" + rediStackAddr = ":" + rediStackPort +) + var ( sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3} diff --git a/monitor_test.go b/monitor_test.go index 20beec517..956860198 100644 --- a/monitor_test.go +++ b/monitor_test.go @@ -25,9 +25,10 @@ var _ = Describe("Monitor command", Label("monitor"), func() { It("should monitor", Label("monitor"), func() { ress := make(chan string) - client1 := redis.NewClient(&redis.Options{Addr: ":6379"}) + client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr}) mn := client1.Monitor(ctx, ress) mn.Start() + // Wait for monitor to start listening. time.Sleep(100 * time.Millisecond) client.Set(ctx, "foo", "bar", 0) client.Set(ctx, "bar", "baz", 0) @@ -40,8 +41,8 @@ var _ = Describe("Monitor command", Label("monitor"), func() { } mn.Stop() Expect(lst[0]).To(ContainSubstring("OK")) - Expect(lst[1]).To(ContainSubstring("\"set\" \"foo\" \"bar\"")) - Expect(lst[2]).To(ContainSubstring("\"set\" \"bar\" \"baz\"")) - Expect(lst[3]).To(ContainSubstring("\"set\" \"bap\" \"8\"")) + Expect(lst[1]).To(ContainSubstring(`"set" "foo" "bar"`)) + Expect(lst[2]).To(ContainSubstring(`"set" "bar" "baz"`)) + Expect(lst[3]).To(ContainSubstring(`"set" "bap" "8"`)) }) }) From 78bc9c58e88d6831393f2feabd8ec958d8cbd9dd Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Tue, 12 Dec 2023 13:37:24 +0200 Subject: [PATCH 4/6] fix data race --- command.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/command.go b/command.go index a0095c1de..4c0bfcb31 100644 --- a/command.go +++ b/command.go @@ -8,6 +8,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/redis/go-redis/v9/internal" @@ -5394,9 +5395,11 @@ type MonitorCmd struct { baseCmd ch chan string status MonitorStatus + mu sync.Mutex } func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { + mu := sync.Mutex{} return &MonitorCmd{ baseCmd: baseCmd{ ctx: ctx, @@ -5404,6 +5407,7 @@ func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { }, ch: ch, status: monitorStatusIdle, + mu: mu, } } @@ -5431,26 +5435,33 @@ func (cmd *MonitorCmd) readReply(rd *proto.Reader) error { } func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error { - for cmd.status == monitorStatusStart { - if pk, _ := rd.Peek(1); len(pk) != 0 { + for { + cmd.mu.Lock() + st := cmd.status + cmd.mu.Unlock() + if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart { line, err := rd.ReadString() if err != nil { return err } cmd.ch <- line } - - } - if cmd.status == monitorStatusStop { - cancel() + if st == monitorStatusStop { + cancel() + break + } } return nil } func (cmd *MonitorCmd) Start() { + cmd.mu.Lock() + defer cmd.mu.Unlock() cmd.status = monitorStatusStart } func (cmd *MonitorCmd) Stop() { + cmd.mu.Lock() + defer cmd.mu.Unlock() cmd.status = monitorStatusStop } From e4563b0db467329d1430b50afe915e871671af25 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Tue, 12 Dec 2023 13:43:16 +0200 Subject: [PATCH 5/6] linting --- command.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/command.go b/command.go index 4c0bfcb31..ea1bd927d 100644 --- a/command.go +++ b/command.go @@ -5399,7 +5399,6 @@ type MonitorCmd struct { } func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { - mu := sync.Mutex{} return &MonitorCmd{ baseCmd: baseCmd{ ctx: ctx, @@ -5407,7 +5406,7 @@ func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd { }, ch: ch, status: monitorStatusIdle, - mu: mu, + mu: sync.Mutex{}, } } From 0d8b1b9eff3e30e2185c36809c0e5079ef15ed6b Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Wed, 13 Dec 2023 10:17:57 +0200 Subject: [PATCH 6/6] change timeout explanation --- monitor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitor_test.go b/monitor_test.go index 956860198..1bc82ecae 100644 --- a/monitor_test.go +++ b/monitor_test.go @@ -28,7 +28,7 @@ var _ = Describe("Monitor command", Label("monitor"), func() { client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr}) mn := client1.Monitor(ctx, ress) mn.Start() - // Wait for monitor to start listening. + // Wait for the Redis server to be in monitoring mode. time.Sleep(100 * time.Millisecond) client.Set(ctx, "foo", "bar", 0) client.Set(ctx, "bar", "baz", 0)