Skip to content

Support Monitor Command #2830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 17, 2023
83 changes: 83 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/redis/go-redis/v9/internal"
Expand Down Expand Up @@ -5381,3 +5382,85 @@ func (cmd *InfoCmd) Item(section, key string) string {
return cmd.val[section][key]
}
}

type MonitorStatus int

const (
monitorStatusIdle MonitorStatus = iota
monitorStatusStart
monitorStatusStop
)

type MonitorCmd struct {
baseCmd
ch chan string
status MonitorStatus
mu sync.Mutex
}

func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd {
return &MonitorCmd{
baseCmd: baseCmd{
ctx: ctx,
args: []interface{}{"monitor"},
},
ch: ch,
status: monitorStatusIdle,
mu: sync.Mutex{},
}
}

func (cmd *MonitorCmd) String() string {
return cmdString(cmd, nil)
}

func (cmd *MonitorCmd) readReply(rd *proto.Reader) error {
ctx, cancel := context.WithCancel(cmd.ctx)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
err := cmd.readMonitor(rd, cancel)
if err != nil {
cmd.err = err
return
}
}
}
}(ctx)
return nil
}

func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error {
for {
cmd.mu.Lock()
st := cmd.status
cmd.mu.Unlock()
if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart {
line, err := rd.ReadString()
if err != nil {
return err
}
cmd.ch <- line
}
if st == monitorStatusStop {
cancel()
break
}
}
return nil
}

func (cmd *MonitorCmd) Start() {
cmd.mu.Lock()
defer cmd.mu.Unlock()
cmd.status = monitorStatusStart
}

func (cmd *MonitorCmd) Stop() {
cmd.mu.Lock()
defer cmd.mu.Unlock()
cmd.status = monitorStatusStop
}
18 changes: 17 additions & 1 deletion commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ type Cmdable interface {
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
Time(ctx context.Context) *TimeCmd
DebugObject(ctx context.Context, key string) *StringCmd

MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd

ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
Expand Down Expand Up @@ -700,3 +699,20 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St
_ = c(ctx, cmd)
return cmd
}

/*
Monitor - represents a Redis MONITOR command, allowing the user to capture
and process all commands sent to a Redis server. This mimics the behavior of
MONITOR in the redis-cli.

Notes:
- Using MONITOR blocks the connection to the server for itself. It needs a dedicated connection
- The user should create a channel of type string
- This runs concurrently in the background. Trigger via the Start and Stop functions
See further: Redis MONITOR command: https://redis.io/commands/monitor
*/
func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd {
cmd := newMonitorCmd(ctx, ch)
_ = c(ctx, cmd)
return cmd
}
5 changes: 5 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ var (
redisAddr = ":" + redisPort
)

var (
rediStackPort = "6379"
rediStackAddr = ":" + rediStackPort
)

var (
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}

Expand Down
48 changes: 48 additions & 0 deletions monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package redis_test

import (
"context"
"time"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"

"github.com/redis/go-redis/v9"
)

var _ = Describe("Monitor command", Label("monitor"), func() {
ctx := context.TODO()
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should monitor", Label("monitor"), func() {
ress := make(chan string)
client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr})
mn := client1.Monitor(ctx, ress)
mn.Start()
// Wait for the Redis server to be in monitoring mode.
time.Sleep(100 * time.Millisecond)
client.Set(ctx, "foo", "bar", 0)
client.Set(ctx, "bar", "baz", 0)
client.Set(ctx, "bap", 8, 0)
client.Get(ctx, "bap")
lst := []string{}
for i := 0; i < 5; i++ {
s := <-ress
lst = append(lst, s)
}
mn.Stop()
Expect(lst[0]).To(ContainSubstring("OK"))
Expect(lst[1]).To(ContainSubstring(`"set" "foo" "bar"`))
Expect(lst[2]).To(ContainSubstring(`"set" "bar" "baz"`))
Expect(lst[3]).To(ContainSubstring(`"set" "bap" "8"`))
})
})