diff --git a/cluster.go b/cluster.go index e5d49ddee..1e1d7b0fc 100644 --- a/cluster.go +++ b/cluster.go @@ -1178,8 +1178,8 @@ func (c *ClusterClient) _processPipelineNode( return err } - return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { - return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds) + return cn.WithReaders(ctx, c.opt.ReadTimeout, len(cmds), func(pvs []*proto.Value) error { + return c.pipelineReadCmds(ctx, node, pvs, cmds, failedCmds) }) }) }) @@ -1188,12 +1188,15 @@ func (c *ClusterClient) _processPipelineNode( func (c *ClusterClient) pipelineReadCmds( ctx context.Context, node *clusterNode, - rd *proto.Reader, + pvs []*proto.Value, cmds []Cmder, failedCmds *cmdsMap, ) error { - for _, cmd := range cmds { - err := cmd.readReply(rd) + if len(pvs) != len(cmds) { + return fmt.Errorf("value len(%d), cmds len(%d)", len(pvs), len(cmds)) + } + for key, cmd := range cmds { + err := cmd.readReply(pvs[key]) cmd.SetErr(err) if err == nil { @@ -1343,12 +1346,12 @@ func (c *ClusterClient) _processTxPipelineNode( return err } - return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { + return cn.WithReaders(ctx, c.opt.ReadTimeout, len(cmds), func(pvs []*proto.Value) error { statusCmd := cmds[0].(*StatusCmd) // Trim multi and exec. cmds = cmds[1 : len(cmds)-1] - err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds) + err := c.txPipelineReadQueued(ctx, pvs, statusCmd, cmds, failedCmds) if err != nil { moved, ask, addr := isMovedError(err) if moved || ask { @@ -1357,7 +1360,7 @@ func (c *ClusterClient) _processTxPipelineNode( return err } - return pipelineReadCmds(rd, cmds) + return pipelineReadCmds(pvs[len(pvs)-1].Slice, cmds) }) }) }) @@ -1365,40 +1368,33 @@ func (c *ClusterClient) _processTxPipelineNode( func (c *ClusterClient) txPipelineReadQueued( ctx context.Context, - rd *proto.Reader, + pvs []*proto.Value, statusCmd *StatusCmd, cmds []Cmder, failedCmds *cmdsMap, ) error { // Parse queued replies. - if err := statusCmd.readReply(rd); err != nil { + if err := statusCmd.readReply(pvs[0]); err != nil { return err } - for _, cmd := range cmds { - err := statusCmd.readReply(rd) - if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) { - continue + for key, cmd := range cmds { + err := statusCmd.readReply(pvs[key+1]) + if err != nil && !c.checkMovedErr(ctx, cmd, err, failedCmds) && !isRedisError(err) { + return err } - return err } - // Parse number of replies. - line, err := rd.ReadLine() - if err != nil { - if err == Nil { - err = TxFailedErr - } + n, err := pvs[len(pvs)-1].SliceLen() + switch err { + case nil: + case Nil: + return TxFailedErr + default: return err } - - switch line[0] { - case proto.ErrorReply: - return proto.ParseErrorReply(line) - case proto.ArrayReply: - // ok - default: - return fmt.Errorf("redis: expected '*', but got line %q", line) + if n != len(cmds) { + return fmt.Errorf("redis: expected %d, but got %d", len(cmds), n) } return nil diff --git a/command.go b/command.go index f10c4781a..591bc110b 100644 --- a/command.go +++ b/command.go @@ -23,7 +23,7 @@ type Cmder interface { setFirstKeyPos(int8) readTimeout() *time.Duration - readReply(rd *proto.Reader) error + readReply(pv *proto.Value) error SetErr(error) Err() error @@ -184,7 +184,7 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) { type Cmd struct { baseCmd - val interface{} + val *proto.Value } func NewCmd(ctx context.Context, args ...interface{}) *Cmd { @@ -197,148 +197,89 @@ func NewCmd(ctx context.Context, args ...interface{}) *Cmd { } func (cmd *Cmd) String() string { - return cmdString(cmd, cmd.val) + return cmdString(cmd, cmd.val.Interface()) } func (cmd *Cmd) Val() interface{} { - return cmd.val + if cmd.val != nil { + return cmd.val.Interface() + } + return nil } func (cmd *Cmd) Result() (interface{}, error) { - return cmd.val, cmd.err + if cmd.err != nil { + return nil, cmd.err + } + return cmd.val.Interface(), cmd.err } func (cmd *Cmd) Text() (string, error) { if cmd.err != nil { return "", cmd.err } - switch val := cmd.val.(type) { - case string: - return val, nil - default: - err := fmt.Errorf("redis: unexpected type=%T for String", val) - return "", err - } + return cmd.val.String() } func (cmd *Cmd) Int() (int, error) { if cmd.err != nil { return 0, cmd.err } - switch val := cmd.val.(type) { - case int64: - return int(val), nil - case string: - return strconv.Atoi(val) - default: - err := fmt.Errorf("redis: unexpected type=%T for Int", val) + i, err := cmd.val.Int64() + if err != nil { return 0, err } + return int(i), nil } func (cmd *Cmd) Int64() (int64, error) { if cmd.err != nil { return 0, cmd.err } - switch val := cmd.val.(type) { - case int64: - return val, nil - case string: - return strconv.ParseInt(val, 10, 64) - default: - err := fmt.Errorf("redis: unexpected type=%T for Int64", val) - return 0, err - } + return cmd.val.Int64() } func (cmd *Cmd) Uint64() (uint64, error) { if cmd.err != nil { return 0, cmd.err } - switch val := cmd.val.(type) { - case int64: - return uint64(val), nil - case string: - return strconv.ParseUint(val, 10, 64) - default: - err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) + i, err := cmd.val.Int64() + if err != nil { return 0, err } + return uint64(i), nil } func (cmd *Cmd) Float32() (float32, error) { if cmd.err != nil { return 0, cmd.err } - switch val := cmd.val.(type) { - case int64: - return float32(val), nil - case string: - f, err := strconv.ParseFloat(val, 32) - if err != nil { - return 0, err - } - return float32(f), nil - default: - err := fmt.Errorf("redis: unexpected type=%T for Float32", val) + f, err := cmd.val.Float64() + if err != nil { return 0, err } + return float32(f), nil } func (cmd *Cmd) Float64() (float64, error) { if cmd.err != nil { return 0, cmd.err } - switch val := cmd.val.(type) { - case int64: - return float64(val), nil - case string: - return strconv.ParseFloat(val, 64) - default: - err := fmt.Errorf("redis: unexpected type=%T for Float64", val) - return 0, err - } + return cmd.val.Float64() } func (cmd *Cmd) Bool() (bool, error) { if cmd.err != nil { return false, cmd.err } - switch val := cmd.val.(type) { - case int64: - return val != 0, nil - case string: - return strconv.ParseBool(val) - default: - err := fmt.Errorf("redis: unexpected type=%T for Bool", val) - return false, err - } -} - -func (cmd *Cmd) readReply(rd *proto.Reader) (err error) { - cmd.val, err = rd.ReadReply(sliceParser) - return err + return cmd.val.Bool() } -// sliceParser implements proto.MultiBulkParse. -func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { - vals := make([]interface{}, n) - for i := 0; i < len(vals); i++ { - v, err := rd.ReadReply(sliceParser) - if err != nil { - if err == Nil { - vals[i] = nil - continue - } - if err, ok := err.(proto.RedisError); ok { - vals[i] = err - continue - } - return nil, err - } - vals[i] = v +func (cmd *Cmd) readReply(pv *proto.Value) error { + if pv.RedisError == nil { + cmd.val = pv } - return vals, nil + return pv.RedisError } //------------------------------------------------------------------------------ @@ -392,13 +333,9 @@ func (cmd *SliceCmd) Scan(dst interface{}) error { return hscan.Scan(dst, args, cmd.val) } -func (cmd *SliceCmd) readReply(rd *proto.Reader) error { - v, err := rd.ReadArrayReply(sliceParser) - if err != nil { - return err - } - cmd.val = v.([]interface{}) - return nil +func (cmd *SliceCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.SliceInterface() + return err } //------------------------------------------------------------------------------ @@ -432,8 +369,8 @@ func (cmd *StatusCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StatusCmd) readReply(rd *proto.Reader) (err error) { - cmd.val, err = rd.ReadString() +func (cmd *StatusCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.String() return err } @@ -472,8 +409,8 @@ func (cmd *IntCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntCmd) readReply(rd *proto.Reader) (err error) { - cmd.val, err = rd.ReadIntReply() +func (cmd *IntCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.Int64() return err } @@ -508,18 +445,8 @@ func (cmd *IntSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntSliceCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]int64, n) - for i := 0; i < len(cmd.val); i++ { - num, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.val[i] = num - } - return nil, nil - }) +func (cmd *IntSliceCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.SliceInt64() return err } @@ -556,8 +483,8 @@ func (cmd *DurationCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *DurationCmd) readReply(rd *proto.Reader) error { - n, err := rd.ReadIntReply() +func (cmd *DurationCmd) readReply(pv *proto.Value) error { + n, err := pv.Int64() if err != nil { return err } @@ -603,26 +530,16 @@ func (cmd *TimeCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *TimeCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d elements, expected 2", n) - } - - sec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - microsec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - cmd.val = time.Unix(sec, microsec*1000) - return nil, nil - }) - return err +func (cmd *TimeCmd) readReply(pv *proto.Value) error { + times, err := pv.SliceInt64() + if err != nil { + return err + } + if len(times) != 2 { + return fmt.Errorf("got %d elements, expected 2", len(times)) + } + cmd.val = time.Unix(times[0], times[1]*1000) + return nil } //------------------------------------------------------------------------------ @@ -656,27 +573,9 @@ func (cmd *BoolCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolCmd) readReply(rd *proto.Reader) error { - v, err := rd.ReadReply(nil) - // `SET key value NX` returns nil when key already exists. But - // `SETNX key value` returns bool (0/1). So convert nil to bool. - if err == Nil { - cmd.val = false - return nil - } - if err != nil { - return err - } - switch v := v.(type) { - case int64: - cmd.val = v == 1 - return nil - case string: - cmd.val = v == "OK" - return nil - default: - return fmt.Errorf("got %T, wanted int64 or string", v) - } +func (cmd *BoolCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.Status() + return err } //------------------------------------------------------------------------------ @@ -774,8 +673,8 @@ func (cmd *StringCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringCmd) readReply(rd *proto.Reader) (err error) { - cmd.val, err = rd.ReadString() +func (cmd *StringCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.String() return err } @@ -810,8 +709,8 @@ func (cmd *FloatCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatCmd) readReply(rd *proto.Reader) (err error) { - cmd.val, err = rd.ReadFloatReply() +func (cmd *FloatCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.Float64() return err } @@ -846,21 +745,8 @@ func (cmd *FloatSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatSliceCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]float64, n) - for i := 0; i < len(cmd.val); i++ { - switch num, err := rd.ReadFloatReply(); { - case err == Nil: - cmd.val[i] = 0 - case err != nil: - return nil, err - default: - cmd.val[i] = num - } - } - return nil, nil - }) +func (cmd *FloatSliceCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.SliceFloat64() return err } @@ -899,21 +785,8 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { return proto.ScanSlice(cmd.Val(), container) } -func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]string, n) - for i := 0; i < len(cmd.val); i++ { - switch s, err := rd.ReadString(); { - case err == Nil: - cmd.val[i] = "" - case err != nil: - return nil, err - default: - cmd.val[i] = s - } - } - return nil, nil - }) +func (cmd *StringSliceCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.SliceString() return err } @@ -948,18 +821,8 @@ func (cmd *BoolSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]bool, n) - for i := 0; i < len(cmd.val); i++ { - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.val[i] = n == 1 - } - return nil, nil - }) +func (cmd *BoolSliceCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.SliceStatus() return err } @@ -1015,24 +878,8 @@ func (cmd *StringStringMapCmd) Scan(dst interface{}) error { return nil } -func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]string, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - - value, err := rd.ReadString() - if err != nil { - return nil, err - } - - cmd.val[key] = value - } - return nil, nil - }) +func (cmd *StringStringMapCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.MapStringString() return err } @@ -1067,24 +914,8 @@ func (cmd *StringIntMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]int64, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - cmd.val[key] = n - } - return nil, nil - }) +func (cmd *StringIntMapCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = pv.MapStringInt64() return err } @@ -1119,19 +950,17 @@ func (cmd *StringStructMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]struct{}, n) - for i := int64(0); i < n; i++ { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - cmd.val[key] = struct{}{} - } - return nil, nil - }) - return err +func (cmd *StringStructMapCmd) readReply(pv *proto.Value) (err error) { + ss, err := pv.SliceString() + if err != nil { + return err + } + + cmd.val = make(map[string]struct{}, len(ss)) + for _, val := range ss { + cmd.val[val] = struct{}{} + } + return nil } //------------------------------------------------------------------------------ @@ -1170,79 +999,70 @@ func (cmd *XMessageSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { - var err error - cmd.val, err = readXMessageSlice(rd) +func (cmd *XMessageSliceCmd) readReply(pv *proto.Value) (err error) { + cmd.val, err = readXMessages(pv) return err } -func readXMessageSlice(rd *proto.Reader) ([]XMessage, error) { - n, err := rd.ReadArrayLen() +func readXMessages(pv *proto.Value) ([]XMessage, error) { + n, err := pv.SliceLen() if err != nil { return nil, err } - msgs := make([]XMessage, n) - for i := 0; i < n; i++ { - var err error - msgs[i], err = readXMessage(rd) + if n == 0 { + return nil, nil + } + + xMsgs := make([]XMessage, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + xMsg, err := readXMessage(item) if err != nil { - return nil, err + return err } - } - return msgs, nil + xMsgs = append(xMsgs, xMsg) + return nil + }) + return xMsgs, err } -func readXMessage(rd *proto.Reader) (XMessage, error) { - n, err := rd.ReadArrayLen() - if err != nil { +func readXMessage(item *proto.Value) (XMessage, error) { + n, err := item.SliceLen() + if err != nil && err != Nil { return XMessage{}, err } if n != 2 { return XMessage{}, fmt.Errorf("got %d, wanted 2", n) } - id, err := rd.ReadString() - if err != nil { + id, err := item.Slice[0].String() + if err != nil && err != Nil { return XMessage{}, err } - var values map[string]interface{} + ss, err := item.Slice[1].SliceString() + if err != nil && err != Nil { + return XMessage{}, err + } + n = len(ss) - v, err := rd.ReadArrayReply(stringInterfaceMapParser) - if err != nil { - if err != proto.Nil { - return XMessage{}, err + var value map[string]interface{} + if n > 0 { + if n%2 != 0 { + return XMessage{}, fmt.Errorf("got %d, wanted multiple of 2", n) + } + value = make(map[string]interface{}, n/2) + for i := 0; i < n; i += 2 { + value[ss[i]] = ss[i+1] } - } else { - values = v.(map[string]interface{}) } return XMessage{ ID: id, - Values: values, + Values: value, }, nil } -// stringInterfaceMapParser implements proto.MultiBulkParse. -func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]interface{}, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadString() - if err != nil { - return nil, err - } - - value, err := rd.ReadString() - if err != nil { - return nil, err - } - - m[key] = value - } - return m, nil -} - //------------------------------------------------------------------------------ type XStream struct { @@ -1279,38 +1099,42 @@ func (cmd *XStreamSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]XStream, n) - for i := 0; i < len(cmd.val); i++ { - i := i - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) - } +func (cmd *XStreamSliceCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } - stream, err := rd.ReadString() - if err != nil { - return nil, err - } + if n == 0 { + return nil + } - msgs, err := readXMessageSlice(rd) - if err != nil { - return nil, err - } + cmd.val = make([]XStream, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() + if err != nil { + return err + } + if n != 2 { + return fmt.Errorf("got %d, wanted 2", n) + } - cmd.val[i] = XStream{ - Stream: stream, - Messages: msgs, - } - return nil, nil - }) - if err != nil { - return nil, err - } + stream, err := item.Slice[0].String() + if err != nil { + return err } - return nil, nil + + xMsgs, err := readXMessages(item.Slice[1]) + if err != nil { + return err + } + cmd.val = append(cmd.val, XStream{ + Stream: stream, + Messages: xMsgs, + }) + return nil }) + return err } @@ -1351,69 +1175,64 @@ func (cmd *XPendingCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XPendingCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 4 { - return nil, fmt.Errorf("got %d, wanted 4", n) - } +func (cmd *XPendingCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } + if n != 4 { + return fmt.Errorf("got %d, wanted 4", n) + } - count, err := rd.ReadIntReply() - if err != nil { - return nil, err - } + count, err := pv.Slice[0].Int64() + if err != nil { + return err + } - lower, err := rd.ReadString() - if err != nil && err != Nil { - return nil, err - } + lower, err := pv.Slice[1].String() + if err != nil { + return err + } - higher, err := rd.ReadString() - if err != nil && err != Nil { - return nil, err - } + higher, err := pv.Slice[2].String() + if err != nil { + return err + } + cmd.val = &XPending{ + Count: count, + Lower: lower, + Higher: higher, + } - cmd.val = &XPending{ - Count: count, - Lower: lower, - Higher: higher, - } - _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - for i := int64(0); i < n; i++ { - _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) - } - - consumerName, err := rd.ReadString() - if err != nil { - return nil, err - } - - consumerPending, err := rd.ReadInt() - if err != nil { - return nil, err - } - - if cmd.val.Consumers == nil { - cmd.val.Consumers = make(map[string]int64) - } - cmd.val.Consumers[consumerName] = consumerPending - - return nil, nil - }) - if err != nil { - return nil, err - } + n, err = pv.Slice[3].SliceLen() + if err == nil && n > 0 { + cmd.val.Consumers = make(map[string]int64, n) + err = pv.Slice[3].SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() + if err != nil { + return err + } + if n != 2 { + return fmt.Errorf("got %d, wanted 2", n) + } + name, err := item.Slice[0].String() + if err != nil { + return err + } + pending, err := item.Slice[1].Int64() + if err != nil { + return err } - return nil, nil + cmd.val.Consumers[name] = pending + + return nil }) - if err != nil && err != Nil { - return nil, err + if err != nil { + return err } + } - return nil, nil - }) - return err + return nil } //------------------------------------------------------------------------------ @@ -1453,49 +1272,52 @@ func (cmd *XPendingExtCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]XPendingExt, 0, n) - for i := int64(0); i < n; i++ { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 4 { - return nil, fmt.Errorf("got %d, wanted 4", n) - } +func (cmd *XPendingExtCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } - id, err := rd.ReadString() - if err != nil { - return nil, err - } + cmd.val = make([]XPendingExt, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() + if err != nil { + return err + } + if n != 4 { + return fmt.Errorf("got %d, wanted 4", n) + } - consumer, err := rd.ReadString() - if err != nil && err != Nil { - return nil, err - } + id, err := item.Slice[0].String() + if err != nil { + return err + } - idle, err := rd.ReadIntReply() - if err != nil && err != Nil { - return nil, err - } + consumer, err := item.Slice[1].String() + if err != nil && err != Nil { + return err + } - retryCount, err := rd.ReadIntReply() - if err != nil && err != Nil { - return nil, err - } + idle, err := item.Slice[2].Int64() + if err != nil && err != Nil { + return err + } - cmd.val = append(cmd.val, XPendingExt{ - ID: id, - Consumer: consumer, - Idle: time.Duration(idle) * time.Millisecond, - RetryCount: retryCount, - }) - return nil, nil - }) - if err != nil { - return nil, err - } + retryCount, err := item.Slice[3].Int64() + if err != nil && err != Nil { + return err } - return nil, nil + + cmd.val = append(cmd.val, XPendingExt{ + ID: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + + return nil }) + return err } @@ -1535,65 +1357,60 @@ func (cmd *XInfoConsumersCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XInfoConsumersCmd) readReply(rd *proto.Reader) error { - n, err := rd.ReadArrayLen() +func (cmd *XInfoConsumersCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() if err != nil { return err } + if n == 0 { + return nil + } - cmd.val = make([]XInfoConsumer, n) - - for i := 0; i < n; i++ { - cmd.val[i], err = readXConsumerInfo(rd) + cmd.val = make([]XInfoConsumer, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() if err != nil { return err } - } - - return nil -} + if n != 6 { + return fmt.Errorf("redis: got %d elements in XINFO CONSUMERS reply, wanted 6", n) + } -func readXConsumerInfo(rd *proto.Reader) (XInfoConsumer, error) { - var consumer XInfoConsumer + var consumer XInfoConsumer + for i := 0; i < n; i += 2 { + key, err := item.Slice[i].String() + if err != nil { + return err + } - n, err := rd.ReadArrayLen() - if err != nil { - return consumer, err - } - if n != 6 { - return consumer, fmt.Errorf("redis: got %d elements in XINFO CONSUMERS reply, wanted 6", n) - } - - for i := 0; i < 3; i++ { - key, err := rd.ReadString() - if err != nil { - return consumer, err - } - - val, err := rd.ReadString() - if err != nil { - return consumer, err - } - - switch key { - case "name": - consumer.Name = val - case "pending": - consumer.Pending, err = strconv.ParseInt(val, 0, 64) - if err != nil { - return consumer, err - } - case "idle": - consumer.Idle, err = strconv.ParseInt(val, 0, 64) - if err != nil { - return consumer, err + switch key { + case "name": + val, err := item.Slice[i+1].String() + if err != nil { + return err + } + consumer.Name = val + case "pending": + val, err := item.Slice[i+1].Int64() + if err != nil { + return err + } + consumer.Pending = val + case "idle": + val, err := item.Slice[i+1].Int64() + if err != nil { + return err + } + consumer.Idle = val + default: + return fmt.Errorf("redis: unexpected content %s in XINFO CONSUMERS reply", key) } - default: - return consumer, fmt.Errorf("redis: unexpected content %s in XINFO CONSUMERS reply", key) } - } + cmd.val = append(cmd.val, consumer) + return nil + }) - return consumer, nil + return err } //------------------------------------------------------------------------------ @@ -1633,67 +1450,66 @@ func (cmd *XInfoGroupsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error { - n, err := rd.ReadArrayLen() +func (cmd *XInfoGroupsCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() if err != nil { return err } - - cmd.val = make([]XInfoGroup, n) - - for i := 0; i < n; i++ { - cmd.val[i], err = readXGroupInfo(rd) - if err != nil { - return err - } - } - - return nil -} - -func readXGroupInfo(rd *proto.Reader) (XInfoGroup, error) { - var group XInfoGroup - - n, err := rd.ReadArrayLen() - if err != nil { - return group, err - } - if n != 8 { - return group, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply, wanted 8", n) + if n == 0 { + return nil } - for i := 0; i < 4; i++ { - key, err := rd.ReadString() + cmd.val = make([]XInfoGroup, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() if err != nil { - return group, err + return err } - - val, err := rd.ReadString() - if err != nil { - return group, err + if n != 8 { + return fmt.Errorf("redis: got %d elements in XINFO GROUPS reply, wanted 8", n) } - switch key { - case "name": - group.Name = val - case "consumers": - group.Consumers, err = strconv.ParseInt(val, 0, 64) + var group XInfoGroup + for i := 0; i < n; i += 2 { + key, err := item.Slice[i].String() if err != nil { - return group, err + return err } - case "pending": - group.Pending, err = strconv.ParseInt(val, 0, 64) - if err != nil { - return group, err + + switch key { + case "name": + val, err := item.Slice[i+1].String() + if err != nil { + return err + } + group.Name = val + case "consumers": + val, err := item.Slice[i+1].Int64() + if err != nil { + return err + } + group.Consumers = val + case "pending": + val, err := item.Slice[i+1].Int64() + if err != nil { + return err + } + group.Pending = val + case "last-delivered-id": + val, err := item.Slice[i+1].String() + if err != nil { + return err + } + group.LastDeliveredID = val + default: + return fmt.Errorf("redis: unexpected content %s in XINFO GROUPS reply", key) } - case "last-delivered-id": - group.LastDeliveredID = val - default: - return group, fmt.Errorf("redis: unexpected content %s in XINFO GROUPS reply", key) } - } + cmd.val = append(cmd.val, group) + return nil + }) - return group, nil + return err } //------------------------------------------------------------------------------ @@ -1736,50 +1552,53 @@ func (cmd *XInfoStreamCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error { - v, err := rd.ReadReply(xStreamInfoParser) +func (cmd *XInfoStreamCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() if err != nil { return err } - cmd.val = v.(*XInfoStream) - return nil -} + if n == 0 { + return nil + } -func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + // no full. if n != 14 { - return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ - "wanted 14", n) + return fmt.Errorf("redis: got %d elements in XINFO STREAM reply, wanted 14", n) } - var info XInfoStream - for i := 0; i < 7; i++ { - key, err := rd.ReadString() + + info := &XInfoStream{} + for i := 0; i < n; i += 2 { + var key string + key, err = pv.Slice[i].String() if err != nil { - return nil, err + return err } + switch key { case "length": - info.Length, err = rd.ReadIntReply() + info.Length, err = pv.Slice[i+1].Int64() case "radix-tree-keys": - info.RadixTreeKeys, err = rd.ReadIntReply() + info.RadixTreeKeys, err = pv.Slice[i+1].Int64() case "radix-tree-nodes": - info.RadixTreeNodes, err = rd.ReadIntReply() + info.RadixTreeNodes, err = pv.Slice[i+1].Int64() case "groups": - info.Groups, err = rd.ReadIntReply() + info.Groups, err = pv.Slice[i+1].Int64() case "last-generated-id": - info.LastGeneratedID, err = rd.ReadString() + info.LastGeneratedID, err = pv.Slice[i+1].String() case "first-entry": - info.FirstEntry, err = readXMessage(rd) + info.FirstEntry, err = readXMessage(pv.Slice[i+1]) case "last-entry": - info.LastEntry, err = readXMessage(rd) + info.LastEntry, err = readXMessage(pv.Slice[i+1]) default: - return nil, fmt.Errorf("redis: unexpected content %s "+ + return fmt.Errorf("redis: unexpected content %s "+ "in XINFO STREAM reply", key) } if err != nil { - return nil, err + return err } } - return &info, nil + cmd.val = info + return nil } //------------------------------------------------------------------------------ @@ -1813,28 +1632,34 @@ func (cmd *ZSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]Z, n/2) - for i := 0; i < len(cmd.val); i++ { - member, err := rd.ReadString() - if err != nil { - return nil, err - } +func (cmd *ZSliceCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } + if n%2 != 0 { + return fmt.Errorf("got %d, wanted multiple of 2", n) + } - score, err := rd.ReadFloatReply() - if err != nil { - return nil, err - } + cmd.val = make([]Z, 0, n/2) + for i := 0; i < n; i += 2 { + member, err := pv.Slice[i].String() + if err != nil { + return err + } - cmd.val[i] = Z{ - Member: member, - Score: score, - } + score, err := pv.Slice[i+1].Float64() + if err != nil { + return err } - return nil, nil - }) - return err + + cmd.val = append(cmd.val, Z{ + Score: score, + Member: member, + }) + } + + return nil } //------------------------------------------------------------------------------ @@ -1868,33 +1693,36 @@ func (cmd *ZWithKeyCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - if n != 3 { - return nil, fmt.Errorf("got %d elements, expected 3", n) - } +func (cmd *ZWithKeyCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } + if n == 0 { + return nil + } + if n != 3 { + return fmt.Errorf("got %d elements, expected 3", n) + } - cmd.val = &ZWithKey{} - var err error + cmd.val = &ZWithKey{} - cmd.val.Key, err = rd.ReadString() - if err != nil { - return nil, err - } + cmd.val.Key, err = pv.Slice[0].String() + if err != nil { + return err + } - cmd.val.Member, err = rd.ReadString() - if err != nil { - return nil, err - } + cmd.val.Member, err = pv.Slice[1].String() + if err != nil { + return err + } - cmd.val.Score, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } + cmd.val.Score, err = pv.Slice[2].Float64() + if err != nil { + return err + } - return nil, nil - }) - return err + return nil } //------------------------------------------------------------------------------ @@ -1932,8 +1760,25 @@ func (cmd *ScanCmd) String() string { return cmdString(cmd, cmd.page) } -func (cmd *ScanCmd) readReply(rd *proto.Reader) (err error) { - cmd.page, cmd.cursor, err = rd.ReadScanReply() +func (cmd *ScanCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } + if n == 0 { + return nil + } + if n != 2 { + return fmt.Errorf("redis: got %d elements in scan reply, expected 2", n) + } + + cursor, err := pv.Slice[0].Int64() + if err != nil { + return err + } + cmd.cursor = uint64(cursor) + cmd.page, err = pv.Slice[1].SliceString() + return err } @@ -1986,68 +1831,73 @@ func (cmd *ClusterSlotsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]ClusterSlot, n) - for i := 0; i < len(cmd.val); i++ { - n, err := rd.ReadArrayLen() +func (cmd *ClusterSlotsCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } + if n == 0 { + return nil + } + + cmd.val = make([]ClusterSlot, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() + if err != nil { + return err + } + if n < 2 { + return fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) + } + + start, err := item.Slice[0].Int64() + if err != nil { + return err + } + end, err := item.Slice[1].Int64() + if err != nil { + return err + } + + var nodeVal *proto.Value + nodes := make([]ClusterNode, n-2) + for i := 0; i < (n - 2); i++ { + nodeVal = item.Slice[i+2] + nx, err := nodeVal.SliceLen() if err != nil { - return nil, err + return err } - if n < 2 { - err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) - return nil, err + if nx != 2 && nx != 3 { + return fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) } - start, err := rd.ReadIntReply() + ip, err := nodeVal.Slice[0].String() if err != nil { - return nil, err + return err } - - end, err := rd.ReadIntReply() + port, err := nodeVal.Slice[1].String() if err != nil { - return nil, err + return err } - nodes := make([]ClusterNode, n-2) - for j := 0; j < len(nodes); j++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 && n != 3 { - err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) - return nil, err - } + nodes[i].Addr = net.JoinHostPort(ip, port) - ip, err := rd.ReadString() + // new version. + if nx == 3 { + id, err := nodeVal.Slice[2].String() if err != nil { - return nil, err - } - - port, err := rd.ReadString() - if err != nil { - return nil, err - } - - nodes[j].Addr = net.JoinHostPort(ip, port) - - if n == 3 { - id, err := rd.ReadString() - if err != nil { - return nil, err - } - nodes[j].ID = id + return err } - } - - cmd.val[i] = ClusterSlot{ - Start: int(start), - End: int(end), - Nodes: nodes, + nodes[i].ID = id } } - return nil, nil + + cmd.val = append(cmd.val, ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + }) + return nil }) return err } @@ -2140,81 +1990,88 @@ func (cmd *GeoLocationCmd) String() string { return cmdString(cmd, cmd.locations) } -func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { - v, err := rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) +func (cmd *GeoLocationCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() if err != nil { return err } - cmd.locations = v.([]GeoLocation) - return nil -} - -func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - locs := make([]GeoLocation, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(newGeoLocationParser(q)) - if err != nil { - return nil, err - } - switch vv := v.(type) { - case string: - locs = append(locs, GeoLocation{ - Name: vv, - }) - case *GeoLocation: - // TODO: avoid copying - locs = append(locs, *vv) - default: - return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) - } - } - return locs, nil + if n == 0 { + return nil } -} -func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - var loc GeoLocation - var err error - - loc.Name, err = rd.ReadString() - if err != nil { - return nil, err - } - if q.WithDist { - loc.Dist, err = rd.ReadFloatReply() + cmd.locations = make([]GeoLocation, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + if item.IsSimpleType() { + name, err := item.String() if err != nil { - return nil, err + return err } - } - if q.WithGeoHash { - loc.GeoHash, err = rd.ReadIntReply() + cmd.locations = append(cmd.locations, GeoLocation{ + Name: name, + }) + } else { + n, err = item.SliceLen() if err != nil { - return nil, err + return err } - } - if q.WithCoord { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err + expectNum := 1 + if cmd.q.WithDist { + expectNum++ } - if n != 2 { - return nil, fmt.Errorf("got %d coordinates, expected 2", n) + if cmd.q.WithGeoHash { + expectNum++ + } + if cmd.q.WithCoord { + expectNum++ } - loc.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err + if n != expectNum { + return fmt.Errorf("got %d, wanted %d", n, expectNum) } - loc.Latitude, err = rd.ReadFloatReply() + + var loc GeoLocation + var idx int + + loc.Name, err = item.Slice[idx].String() if err != nil { - return nil, err + return err + } + idx++ + + if cmd.q.WithDist { + loc.Dist, err = item.Slice[idx].Float64() + if err != nil { + return err + } + idx++ + } + + if cmd.q.WithGeoHash { + loc.GeoHash, err = item.Slice[idx].Int64() + if err != nil { + return err + } + idx++ } + + if cmd.q.WithCoord { + cd, err := item.Slice[idx].SliceFloat64() + if err != nil { + return err + } + if len(cd) != 2 { + return fmt.Errorf("got %d coordinates, expected 2", len(cd)) + } + + loc.Longitude = cd[0] + loc.Latitude = cd[1] + } + cmd.locations = append(cmd.locations, loc) } + return nil + }) - return &loc, nil - } + return err } //------------------------------------------------------------------------------ @@ -2252,38 +2109,36 @@ func (cmd *GeoPosCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]*GeoPos, n) - for i := 0; i < len(cmd.val); i++ { - i := i - _, err := rd.ReadReply(func(rd *proto.Reader, n int64) (interface{}, error) { - longitude, err := rd.ReadFloatReply() - if err != nil { - return nil, err - } - - latitude, err := rd.ReadFloatReply() - if err != nil { - return nil, err - } +func (cmd *GeoPosCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } + if n == 0 { + return nil + } - cmd.val[i] = &GeoPos{ - Longitude: longitude, - Latitude: latitude, - } - return nil, nil - }) - if err != nil { - if err == Nil { - cmd.val[i] = nil - continue - } - return nil, err + cmd.val = make([]*GeoPos, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + var sf []float64 + sf, err = item.SliceFloat64() + switch err { + case Nil: + cmd.val = append(cmd.val, nil) + case nil: + if len(sf) != 2 { + return fmt.Errorf("got %d, wanted 2", len(sf)) } + cmd.val = append(cmd.val, &GeoPos{ + Longitude: sf[0], + Latitude: sf[1], + }) + default: + return err } - return nil, nil + return nil }) + return err } @@ -2329,113 +2184,82 @@ func (cmd *CommandsInfoCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make(map[string]*CommandInfo, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(commandInfoParser) - if err != nil { - return nil, err - } - vv := v.(*CommandInfo) - cmd.val[vv.Name] = vv - } - return nil, nil - }) - return err -} - -func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { +func (cmd *CommandsInfoCmd) readReply(pv *proto.Value) error { const numArgRedis5 = 6 const numArgRedis6 = 7 - switch n { - case numArgRedis5, numArgRedis6: - // continue - default: - return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 7", n) + n, err := pv.SliceLen() + if err != nil { + return err } - var cmd CommandInfo - var err error + cmd.val = make(map[string]*CommandInfo, n) + err = pv.SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() + if err != nil { + return err + } + if n != numArgRedis5 && n != numArgRedis6 { + return fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 7", n) + } - cmd.Name, err = rd.ReadString() - if err != nil { - return nil, err - } + info := &CommandInfo{} - arity, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.Arity = int8(arity) + info.Name, err = item.Slice[0].String() + if err != nil { + return err + } - _, err = rd.ReadReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.Flags = make([]string, n) - for i := 0; i < len(cmd.Flags); i++ { - switch s, err := rd.ReadString(); { - case err == Nil: - cmd.Flags[i] = "" - case err != nil: - return nil, err - default: - cmd.Flags[i] = s - } + var arity, firstKeyPos, lastKeyPos, stepCount int64 + + arity, err = item.Slice[1].Int64() + if err != nil { + return err } - return nil, nil - }) - if err != nil { - return nil, err - } + info.Arity = int8(arity) - firstKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.FirstKeyPos = int8(firstKeyPos) + info.Flags, err = item.Slice[2].SliceString() + if err != nil { + return err + } - lastKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.LastKeyPos = int8(lastKeyPos) + firstKeyPos, err = item.Slice[3].Int64() + if err != nil { + return err + } + info.FirstKeyPos = int8(firstKeyPos) - stepCount, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.StepCount = int8(stepCount) + lastKeyPos, err = item.Slice[4].Int64() + if err != nil { + return err + } + info.LastKeyPos = int8(lastKeyPos) - for _, flag := range cmd.Flags { - if flag == "readonly" { - cmd.ReadOnly = true - break + stepCount, err = item.Slice[5].Int64() + if err != nil { + return err } - } + info.StepCount = int8(stepCount) - if n == numArgRedis5 { - return &cmd, nil - } + for _, flag := range info.Flags { + if flag == "readonly" { + info.ReadOnly = true + break + } + } - _, err = rd.ReadReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.ACLFlags = make([]string, n) - for i := 0; i < len(cmd.ACLFlags); i++ { - switch s, err := rd.ReadString(); { - case err == Nil: - cmd.ACLFlags[i] = "" - case err != nil: - return nil, err - default: - cmd.ACLFlags[i] = s + if n >= numArgRedis6 { + info.ACLFlags, err = item.Slice[6].SliceString() + if err != nil { + return err } } - return nil, nil + + cmd.val[info.Name] = info + return nil }) - if err != nil { - return nil, err - } - return &cmd, nil + return err } //------------------------------------------------------------------------------ @@ -2516,76 +2340,74 @@ func (cmd *SlowLogCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - cmd.val = make([]SlowLog, n) - for i := 0; i < len(cmd.val); i++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n < 4 { - err := fmt.Errorf("redis: got %d elements in slowlog get, expected at least 4", n) - return nil, err - } +func (cmd *SlowLogCmd) readReply(pv *proto.Value) error { + n, err := pv.SliceLen() + if err != nil { + return err + } - id, err := rd.ReadIntReply() - if err != nil { - return nil, err + cmd.val = make([]SlowLog, 0, n) + err = pv.SliceScan(func(item *proto.Value) error { + n, err = item.SliceLen() + if err != nil { + if err == Nil { + return nil } + return err + } + if n < 4 { + return fmt.Errorf("redis: got %d elements in slowlog get, expected at least 4", n) + } - createdAt, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - createdAtTime := time.Unix(createdAt, 0) + id, err := item.Slice[0].Int64() + if err != nil && err != Nil { + return err + } - costs, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - costsDuration := time.Duration(costs) * time.Microsecond + createAt, err := item.Slice[1].Int64() + if err != nil && err != Nil { + return err + } + createdAtTime := time.Unix(createAt, 0) - cmdLen, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if cmdLen < 1 { - err := fmt.Errorf("redis: got %d elements commands reply in slowlog get, expected at least 1", cmdLen) - return nil, err - } + costs, err := item.Slice[2].Int64() + if err != nil && err != Nil { + return err + } + costsDuration := time.Duration(costs) * time.Microsecond - cmdString := make([]string, cmdLen) - for i := 0; i < cmdLen; i++ { - cmdString[i], err = rd.ReadString() - if err != nil { - return nil, err - } - } + cmdString, err := item.Slice[3].SliceString() + if err != nil { + return err + } + if len(cmdString) < 1 { + return fmt.Errorf("redis: got %d elements commands reply in slowlog get, expected at least 1", len(cmdString)) + } - var address, name string - for i := 4; i < n; i++ { - str, err := rd.ReadString() - if err != nil { - return nil, err - } - if i == 4 { - address = str - } else if i == 5 { - name = str - } + var address, name string + if n > 4 { + address, err = item.Slice[4].String() + if err != nil && err != Nil { + return err } - - cmd.val[i] = SlowLog{ - ID: id, - Time: createdAtTime, - Duration: costsDuration, - Args: cmdString, - ClientAddr: address, - ClientName: name, + } + if n > 5 { + name, err = item.Slice[5].String() + if err != nil && err != Nil { + return err } } - return nil, nil + + cmd.val = append(cmd.val, SlowLog{ + ID: id, + Time: createdAtTime, + Duration: costsDuration, + Args: cmdString, + ClientAddr: address, + ClientName: name, + }) + return nil }) + return err } diff --git a/extra/rediscensus/go.mod b/extra/rediscensus/go.mod index f07afdc53..45c52b200 100644 --- a/extra/rediscensus/go.mod +++ b/extra/rediscensus/go.mod @@ -7,6 +7,5 @@ replace github.com/go-redis/redis/extra/rediscmd => ../rediscmd require ( github.com/go-redis/redis/extra/rediscmd v0.2.0 github.com/go-redis/redis/v8 v8.4.4 - github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect go.opencensus.io v0.22.6 ) diff --git a/extra/rediscensus/go.sum b/extra/rediscensus/go.sum index 67f4218fc..aaa94f45b 100644 --- a/extra/rediscensus/go.sum +++ b/extra/rediscensus/go.sum @@ -16,18 +16,13 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/go-redis/redis/extra/rediscmd v0.2.0 h1:A3bhCsCKsedClEH9/jYlcKqOuBoeeV+H0yDie5t+a6w= -github.com/go-redis/redis/extra/rediscmd v0.2.0/go.mod h1:Z5bP1EHl9PvWhx/DupfCdZwB0JgOO3aVxWc/PFux+BE= -github.com/go-redis/redis/v8 v8.3.2/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeYhKWrBejTU= github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc= github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -35,15 +30,14 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= -github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -53,33 +47,33 @@ github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= -github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U= github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= +github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= -go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.22.6 h1:BdkrbWrzDlV9dnbzoP7sfN+dHheJ4J9JOaYxcUDL+ok= go.opencensus.io v0.22.6/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY= go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw= go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -88,27 +82,27 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091 h1:DMyOG0U+gKfu8JZzg2UQe9MeaC1X+xQWlAKcRnjxjCw= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= @@ -118,16 +112,19 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= @@ -138,16 +135,15 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/pool/conn.go b/internal/pool/conn.go index ee064c9fc..c764099e7 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -64,14 +64,41 @@ func (cn *Conn) RemoteAddr() net.Addr { return nil } -func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error { +func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(pv *proto.Value) error) error { ctx, span := internal.StartSpan(ctx, "redis.with_reader") defer span.End() if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { return internal.RecordError(ctx, span, err) } - if err := fn(cn.rd); err != nil { + v, err := cn.rd.ReadReply() + if err != nil { + return internal.RecordError(ctx, span, err) + } + if err := fn(v); err != nil { + return internal.RecordError(ctx, span, err) + } + return nil +} + +func (cn *Conn) WithReaders(ctx context.Context, timeout time.Duration, n int, + fn func(pvs []*proto.Value) error) error { + ctx, span := internal.StartSpan(ctx, "redis.with_readers") + defer span.End() + + if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { + return internal.RecordError(ctx, span, err) + } + + var err error + pvs := make([]*proto.Value, n) + for i := 0; i < n; i++ { + pvs[i], err = cn.rd.ReadReply() + if err != nil { + return internal.RecordError(ctx, span, err) + } + } + if err := fn(pvs); err != nil { return internal.RecordError(ctx, span, err) } return nil diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 254a18de5..74fbb37a6 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -228,8 +228,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { return nil, ErrClosed } - err := p.waitTurn(ctx) - if err != nil { + if err := p.waitTurn(ctx); err != nil { return nil, err } diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 0ab8c9d2f..17f75222f 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -4,19 +4,34 @@ import ( "bufio" "fmt" "io" + "math" + "math/big" + "strconv" "github.com/go-redis/redis/v8/internal/util" ) const ( - ErrorReply = '-' - StatusReply = '+' - IntReply = ':' - StringReply = '$' - ArrayReply = '*' + redisStatus = '+' // +\r\n + redisError = '-' // -\r\n + redisString = '$' // $\r\n\r\n + redisInteger = ':' // :\r\n + redisNil = '_' // _\r\n + redisFloat = ',' // ,\r\n (golang float) + redisBool = '#' // true: #t\r\n false: #f\r\n + redisBlobError = '!' // !\r\n\r\n + redisVerb = '=' // =\r\nFORMAT:\r\n + redisBigInt = '(' // (\r\n + redisArray = '*' // *\r\n... (same as resp2) + redisMap = '%' // %\r\n(key)\r\n(value)\r\n... (golang map) + redisSet = '~' + redisAttr = '|' + redisPush = '>' ) -//------------------------------------------------------------------------------ +// Not used temporarily. +// Streamed = "EOF:" +// StreamedAggregated = '?' const Nil = RedisError("redis: nil") @@ -26,19 +41,13 @@ func (e RedisError) Error() string { return string(e) } func (RedisError) RedisError() {} -//------------------------------------------------------------------------------ - -type MultiBulkParse func(*Reader, int64) (interface{}, error) - type Reader struct { - rd *bufio.Reader - _buf []byte + rd *bufio.Reader } func NewReader(rd io.Reader) *Reader { return &Reader{ - rd: bufio.NewReader(rd), - _buf: make([]byte, 64), + rd: bufio.NewReader(rd), } } @@ -54,278 +63,223 @@ func (r *Reader) Reset(rd io.Reader) { r.rd.Reset(rd) } -func (r *Reader) ReadLine() ([]byte, error) { +func (r *Reader) ReadReply() (*Value, error) { line, err := r.readLine() if err != nil { return nil, err } - if isNilReply(line) { - return nil, Nil - } - return line, nil -} -// readLine that returns an error if: -// - there is a pending read error; -// - or line does not end with \r\n. -func (r *Reader) readLine() ([]byte, error) { - b, err := r.rd.ReadSlice('\n') - if err != nil { - if err != bufio.ErrBufferFull { - return nil, err + v := new(Value) + v.Typ = line[0] + + switch line[0] { + case redisStatus: + v.Str = string(line[1:]) + case redisError: + v.RedisError = RedisError(line[1:]) + case redisInteger: + v.Integer, err = util.ParseInt(line[1:], 10, 64) + case redisNil: + v.RedisError = Nil + case redisFloat: + v.Float, err = r.readFloat(line) + case redisBool: + v.Boolean, err = r.readBool(line) + case redisBigInt: + v.BigInt, err = r.readBigInt(line) + + case redisBlobError: + var blobErr string + blobErr, err = r.readString(line) + if err == nil { + v.RedisError = RedisError(blobErr) + } + case redisString: + v.Str, err = r.readString(line) + case redisVerb: + var s string + s, err = r.readString(line) + if err == nil { + if len(s) < 4 || s[3] != ':' { + err = fmt.Errorf("redis: can't parse verbatim string reply: %q", line) + } else { + v.Str = s[4:] + v.StrFmt = s[:3] + } } - full := make([]byte, len(b)) - copy(full, b) + case redisArray, redisSet, redisPush: + v.Slice, err = r.readArraySetPush(line) + case redisMap: + v.Map, err = r.readMap(line) + case redisAttr: + var ( + attr map[*Value]*Value + val *Value + ) + attr, err = r.readMap(line) + if err != nil && err != Nil { + return nil, err + } - b, err = r.rd.ReadBytes('\n') + val, err = r.ReadReply() if err != nil { return nil, err } - full = append(full, b...) //nolint:makezero - b = full + v.Attribute = &AttributeType{ + Attr: attr, + Value: val, + } + default: + err = fmt.Errorf("redis: invalid reply: %q", line) } - if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' { - return nil, fmt.Errorf("redis: invalid reply: %q", b) + + if err == Nil { + if v.RedisError == nil { + v.RedisError = Nil + } + err = nil } - return b[:len(b)-2], nil + + return v, err } -func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { - line, err := r.ReadLine() +func (r *Reader) readMap(line []byte) (map[*Value]*Value, error) { + n, err := replyLen(line) if err != nil { return nil, err } - switch line[0] { - case ErrorReply: - return nil, ParseErrorReply(line) - case StatusReply: - return string(line[1:]), nil - case IntReply: - return util.ParseInt(line[1:], 10, 64) - case StringReply: - return r.readStringReply(line) - case ArrayReply: - n, err := parseArrayLen(line) + // Maps can have any other type as field and value, + // however Redis will use only a subset of the available possibilities. + // For instance it is very unlikely that Redis commands would return + // an Array as a key, however Lua scripts and modules will likely be able to do so. + m := make(map[*Value]*Value) + for i := 0; i < n; i++ { + k, err := r.ReadReply() if err != nil { return nil, err } - if m == nil { - err := fmt.Errorf("redis: got %.100q, but multi bulk parser is nil", line) + + v, err := r.ReadReply() + if err != nil { return nil, err } - return m(r, n) + + m[k] = v } - return nil, fmt.Errorf("redis: can't parse %.100q", line) + return m, nil } -func (r *Reader) ReadIntReply() (int64, error) { - line, err := r.ReadLine() +func (r *Reader) readArraySetPush(line []byte) ([]*Value, error) { + n, err := replyLen(line) if err != nil { - return 0, err + return nil, err } - switch line[0] { - case ErrorReply: - return 0, ParseErrorReply(line) - case IntReply: - return util.ParseInt(line[1:], 10, 64) - default: - return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line) + + vs := make([]*Value, n) + for i := 0; i < n; i++ { + v, err := r.ReadReply() + if err != nil { + return nil, err + } + vs[i] = v } + return vs, nil } -func (r *Reader) ReadString() (string, error) { - line, err := r.ReadLine() - if err != nil { - return "", err +func (r *Reader) readBigInt(line []byte) (*big.Int, error) { + i := new(big.Int) + if i, ok := i.SetString(string(line[1:]), 10); ok { + return i, nil } - switch line[0] { - case ErrorReply: - return "", ParseErrorReply(line) - case StringReply: - return r.readStringReply(line) - case StatusReply: - return string(line[1:]), nil - case IntReply: - return string(line[1:]), nil + return nil, fmt.Errorf("redis: can't parse bigInt reply: %q", line) +} + +func (r *Reader) readBool(line []byte) (bool, error) { + switch string(line[1:]) { + case "t": + return true, nil + case "f": + return false, nil default: - return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line) + return false, fmt.Errorf("redis: can't parse bool reply: %q", line) } } -func (r *Reader) readStringReply(line []byte) (string, error) { - if isNilReply(line) { - return "", Nil +var ( + uvinf = math.Inf(1) + uvneginf = math.Inf(-1) +) + +func (r *Reader) readFloat(line []byte) (float64, error) { + v := string(line[1:]) + switch v { + case "inf": + return uvinf, nil + case "-inf": + return uvneginf, nil + default: + return strconv.ParseFloat(v, 64) } +} - replyLen, err := util.Atoi(line[1:]) +func (r *Reader) readString(line []byte) (string, error) { + n, err := replyLen(line) if err != nil { return "", err } - b := make([]byte, replyLen+2) + b := make([]byte, n+2) _, err = io.ReadFull(r.rd, b) if err != nil { return "", err } - return util.BytesToString(b[:replyLen]), nil + return string(b[:n]), nil } -func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { - line, err := r.ReadLine() +func (r *Reader) readLine() ([]byte, error) { + b, err := r.rd.ReadSlice('\n') if err != nil { - return nil, err - } - switch line[0] { - case ErrorReply: - return nil, ParseErrorReply(line) - case ArrayReply: - n, err := parseArrayLen(line) - if err != nil { + if err != bufio.ErrBufferFull { return nil, err } - return m(r, n) - default: - return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line) - } -} -func (r *Reader) ReadArrayLen() (int, error) { - line, err := r.ReadLine() - if err != nil { - return 0, err - } - switch line[0] { - case ErrorReply: - return 0, ParseErrorReply(line) - case ArrayReply: - n, err := parseArrayLen(line) - if err != nil { - return 0, err - } - return int(n), nil - default: - return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line) - } -} - -func (r *Reader) ReadScanReply() ([]string, uint64, error) { - n, err := r.ReadArrayLen() - if err != nil { - return nil, 0, err - } - if n != 2 { - return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n) - } - - cursor, err := r.ReadUint() - if err != nil { - return nil, 0, err - } - - n, err = r.ReadArrayLen() - if err != nil { - return nil, 0, err - } - - keys := make([]string, n) + full := make([]byte, len(b)) + copy(full, b) - for i := 0; i < n; i++ { - key, err := r.ReadString() + b, err = r.rd.ReadBytes('\n') if err != nil { - return nil, 0, err + return nil, err } - keys[i] = key - } - - return keys, cursor, err -} -func (r *Reader) ReadInt() (int64, error) { - b, err := r.readTmpBytesReply() - if err != nil { - return 0, err + full = append(full, b...) //nolint:makezero + b = full } - return util.ParseInt(b, 10, 64) -} - -func (r *Reader) ReadUint() (uint64, error) { - b, err := r.readTmpBytesReply() - if err != nil { - return 0, err + if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' { + return nil, fmt.Errorf("redis: invalid reply: %q", b) } - return util.ParseUint(b, 10, 64) + return b[:len(b)-2], nil } -func (r *Reader) ReadFloatReply() (float64, error) { - b, err := r.readTmpBytesReply() +func replyLen(line []byte) (n int, err error) { + n, err = util.Atoi(line[1:]) if err != nil { return 0, err } - return util.ParseFloat(b, 64) -} - -func (r *Reader) readTmpBytesReply() ([]byte, error) { - line, err := r.ReadLine() - if err != nil { - return nil, err - } - switch line[0] { - case ErrorReply: - return nil, ParseErrorReply(line) - case StringReply: - return r._readTmpBytesReply(line) - case StatusReply: - return line[1:], nil - default: - return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) - } -} -func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) { - if isNilReply(line) { - return nil, Nil - } - - replyLen, err := util.Atoi(line[1:]) - if err != nil { - return nil, err + if n < -1 { + return 0, fmt.Errorf("redis: invalid reply: %q", line) } - buf := r.buf(replyLen + 2) - _, err = io.ReadFull(r.rd, buf) - if err != nil { - return nil, err - } - - return buf[:replyLen], nil -} - -func (r *Reader) buf(n int) []byte { - if n <= cap(r._buf) { - return r._buf[:n] - } - d := n - cap(r._buf) - r._buf = append(r._buf, make([]byte, d)...) - return r._buf -} - -func isNilReply(b []byte) bool { - return len(b) == 3 && - (b[0] == StringReply || b[0] == ArrayReply) && - b[1] == '-' && b[2] == '1' -} - -func ParseErrorReply(line []byte) error { - return RedisError(string(line[1:])) -} - -func parseArrayLen(line []byte) (int64, error) { - if isNilReply(line) { - return 0, Nil + switch line[0] { + case redisString, redisVerb, redisBlobError, + redisArray, redisSet, redisPush, redisMap, redisAttr: + if n == -1 { + return 0, Nil + } } - return util.ParseInt(line[1:], 10, 64) + return n, nil } diff --git a/internal/proto/reader_test.go b/internal/proto/reader_test.go index b8c99dd61..6431ac8fc 100644 --- a/internal/proto/reader_test.go +++ b/internal/proto/reader_test.go @@ -1,72 +1,516 @@ -package proto_test +package proto import ( "bytes" + "errors" + "fmt" "io" + "math" + "math/big" "testing" - "github.com/go-redis/redis/v8/internal/proto" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" ) -func BenchmarkReader_ParseReply_Status(b *testing.B) { - benchmarkParseReply(b, "+OK\r\n", nil, false) +func BenchmarkReader_ParseReplyResp3_Status(b *testing.B) { + benchmarkParseReply(b, "+OK\r\n", false) } func BenchmarkReader_ParseReply_Int(b *testing.B) { - benchmarkParseReply(b, ":1\r\n", nil, false) + benchmarkParseReply(b, ":1\r\n", false) } func BenchmarkReader_ParseReply_Error(b *testing.B) { - benchmarkParseReply(b, "-Error message\r\n", nil, true) + benchmarkParseReply(b, "-Error message\r\n", true) } func BenchmarkReader_ParseReply_String(b *testing.B) { - benchmarkParseReply(b, "$5\r\nhello\r\n", nil, false) + benchmarkParseReply(b, "$5\r\nhello\r\n", false) +} + +func BenchmarkReader_ParseReply_Nil(b *testing.B) { + benchmarkParseReply(b, "_\r\n", true) +} + +func BenchmarkReader_ParseReply_Float(b *testing.B) { + benchmarkParseReply(b, ",100.1234\r\n", false) +} + +func BenchmarkReader_ParseReply_Bool(b *testing.B) { + benchmarkParseReply(b, "#t\r\n", false) +} + +func BenchmarkReader_ParseReply_BlobError(b *testing.B) { + benchmarkParseReply(b, "!10\r\nblob error\r\n", true) +} + +func BenchmarkReader_ParseReply_Verb(b *testing.B) { + benchmarkParseReply(b, "=9\r\ntxt:hello\r\n", false) +} + +func BenchmarkReader_ParseReply_BigInt(b *testing.B) { + benchmarkParseReply(b, "(3492890328409238509324850943850943825024385\r\n", false) } func BenchmarkReader_ParseReply_Slice(b *testing.B) { - benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", multiBulkParse, false) + benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false) } -func TestReader_ReadLine(t *testing.T) { - original := bytes.Repeat([]byte("a"), 8192) - original[len(original)-2] = '\r' - original[len(original)-1] = '\n' - r := proto.NewReader(bytes.NewReader(original)) - read, err := r.ReadLine() - if err != nil && err != io.EOF { - t.Errorf("Should be able to read the full buffer: %v", err) - } +func BenchmarkReader_ParseReply_Slice2(b *testing.B) { + r := "*3\r\n" + - if bytes.Compare(read, original[:len(original)-2]) != 0 { - t.Errorf("Values must be equal: %d expected %d", len(read), len(original[:len(original)-2])) - } + "%2\r\n" + + "+key\r\n" + + "$2\r\nhi\r\n" + + "$5\r\nindex\r\n" + + ":1024\r\n" + + + "$5\r\nhello\r\n" + + + "*2\r\n" + + "_\r\n" + + "#t\r\n" + + benchmarkParseReply(b, r, false) } -func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wanterr bool) { +func BenchmarkReader_ParseReply_Map(b *testing.B) { + r := "%2\r\n" + + + "*2\r\n" + + "$2\r\nhi\r\n" + + "*1\r\n" + + "_\r\n" + + + "#f\r\n" + + + "$1\r\nk\r\n" + + + "*1\r\n" + + "$4\r\ntext\r\n" + + benchmarkParseReply(b, r, false) +} + +func BenchmarkReader_ParseReply_Set(b *testing.B) { + benchmarkParseReply(b, "~2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false) +} + +func BenchmarkReader_ParseReply_Attr(b *testing.B) { + r := "|1\r\n" + + "+key\r\n" + + "%2\r\n" + + "$1\r\na\r\n" + + ",0.1923\r\n" + + "$1\r\nb\r\n" + + ",0.0012\r\n" + + + "*2\r\n" + + ":1024\r\n" + + ":2048\r\n" + benchmarkParseReply(b, r, false) +} + +func BenchmarkReader_ParseReply_Push(b *testing.B) { + benchmarkParseReply(b, ">2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false) +} + +func benchmarkParseReply(b *testing.B, reply string, redisErr bool) { buf := new(bytes.Buffer) for i := 0; i < b.N; i++ { buf.WriteString(reply) } - p := proto.NewReader(buf) + p := NewReader(buf) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := p.ReadReply(m) - if !wanterr && err != nil { + v, err := p.ReadReply() + if err != nil { b.Fatal(err) } + if redisErr { + if v.RedisError == nil { + b.Fatal("expect redis error, but it is nil") + } + } else { + if v.RedisError != nil { + b.Fatal("expect nil, but it is error") + } + } } } -func multiBulkParse(p *proto.Reader, n int64) (interface{}, error) { - vv := make([]interface{}, 0, n) - for i := int64(0); i < n; i++ { - v, err := p.ReadReply(multiBulkParse) - if err != nil { - return nil, err - } - vv = append(vv, v) - } - return vv, nil +var notNil = errors.New("not nil") + +type respList struct { + resp string + expect *Value + err error } + +var list = []respList{ + //SimpleString + { + resp: "+\r\n", + expect: &Value{Typ: redisStatus}, + }, + { + resp: "+hello world\r\n", + expect: &Value{Typ: redisStatus, Str: "hello world"}, + }, + + // SimpleError + { + resp: "-\r\n", + expect: &Value{Typ: redisError, RedisError: RedisError("")}, + }, + { + resp: "-hello world\r\n", + expect: &Value{Typ: redisError, RedisError: RedisError("hello world")}, + }, + + // BlobString + { + resp: "$0\r\n\r\n", + expect: &Value{Typ: redisString}, + }, + { + resp: "$0\r\n", + err: io.EOF, + }, + { + resp: "$-10\r\n", + err: notNil, + }, + { + resp: "$\r\n", + err: notNil, + }, + { + resp: "$6\r\nhello\n\r\n", + expect: &Value{Typ: redisString, Str: "hello\n"}, + }, + + // Number + { + resp: ":\r\n", + err: notNil, + }, + { + resp: ":0\r\n", + expect: &Value{Typ: redisInteger, Integer: int64(0)}, + }, + { + resp: ":-10\r\n", + expect: &Value{Typ: redisInteger, Integer: int64(-10)}, + }, + { + resp: ":1024\r\n", + expect: &Value{Typ: redisInteger, Integer: int64(1024)}, + }, + + // Null + { + resp: "_\r\n", + expect: &Value{Typ: redisNil, RedisError: Nil}, + }, + { + resp: "_hello\r\n", + expect: &Value{Typ: redisNil, RedisError: Nil}, + }, + + // Float + { + resp: ",\r\n", + err: notNil, + }, + { + resp: ",0\r\n", + expect: &Value{Typ: redisFloat, Float: 0}, + }, + { + resp: ",100.1234\r\n", + expect: &Value{Typ: redisFloat, Float: 100.1234}, + }, + { + resp: ",-100.4321\r\n", + expect: &Value{Typ: redisFloat, Float: -100.4321}, + }, + { + resp: ",inf\r\n", + expect: &Value{Typ: redisFloat, Float: math.Inf(1)}, + }, + { + resp: ",-inf\r\n", + expect: &Value{Typ: redisFloat, Float: math.Inf(-1)}, + }, + + // Bool + { + resp: "#\r\n", + err: notNil, + }, + { + resp: "#t\r\n", + expect: &Value{Typ: redisBool, Boolean: true}, + }, + { + resp: "#f\r\n", + expect: &Value{Typ: redisBool, Boolean: false}, + }, + + // BlobError + { + resp: "!0\r\n\r\n", + expect: &Value{Typ: redisBlobError, RedisError: RedisError("")}, + }, + { + resp: "!0\r\n", + err: io.EOF, + }, + { + resp: "!-10\r\n", + err: errors.New("redis: invalid reply: \"!-10\""), + }, + { + resp: "!\r\n", + err: notNil, + }, + { + resp: "!6\r\nhello\n\r\n", + expect: &Value{Typ: redisBlobError, RedisError: RedisError("hello\n")}, + }, + + // VerbatimString + { + resp: "=0\r\n", + err: io.EOF, + }, + { + resp: "=-10\r\n", + err: errors.New("redis: invalid reply: \"=-10\""), + }, + { + resp: "=\r\n", + err: notNil, + }, + { + resp: "=2\r\nhi\r\n", + err: notNil, + }, + { + resp: "=5\r\nhello\r\n", + err: notNil, + }, + { + resp: "=9\r\ntxt:hello\r\n", + expect: &Value{Typ: redisVerb, Str: "hello", StrFmt: "txt"}, + }, + + // BigNumber + { + resp: "(\r\n", + err: errors.New("redis: can't parse bigInt reply: \"(\""), + }, + { + resp: "(hello1024\r\n", + err: notNil, + }, + { + resp: "(3492890328409238509324850943850943825024385\r\n", + expect: &Value{Typ: redisBigInt, BigInt: bigInt("3492890328409238509324850943850943825024385")}, + }, + { + resp: "(-3492890328409238509324850943850943825024385\r\n", + expect: &Value{Typ: redisBigInt, BigInt: bigInt("-3492890328409238509324850943850943825024385")}, + }, + + // Array + { + resp: "*0\r\n", + expect: &Value{Typ: redisArray, Slice: make([]*Value, 0)}, + }, + { + resp: "*\r\n", + err: notNil, + }, + { + resp: "*2\r\n+hello world\r\n:1024\r\n", + expect: &Value{Typ: redisArray, Slice: []*Value{ + {Typ: redisStatus, Str: "hello world"}, + {Typ: redisInteger, Integer: 1024}, + }}, + }, + { + resp: "*2\r\n*2\r\n$2\r\nok\r\n,100.1234\r\n$5\r\nhello\r\n", + expect: &Value{Typ: redisArray, Slice: []*Value{ + { + Typ: redisArray, + Slice: []*Value{ + {Typ: redisString, Str: "ok"}, + {Typ: redisFloat, Float: 100.1234}, + }, + }, + { + Typ: redisString, + Str: "hello", + }, + }}, + }, + + // Set + { + resp: "~0\r\n", + expect: &Value{Typ: redisSet, Slice: make([]*Value, 0)}, + }, + { + resp: "~\r\n", + err: notNil, + }, + { + resp: "~2\r\n+hello world\r\n:1024\r\n", + expect: &Value{Typ: redisSet, Slice: []*Value{ + {Typ: redisStatus, Str: "hello world"}, + {Typ: redisInteger, Integer: 1024}, + }}, + }, + { + resp: "~2\r\n*2\r\n$2\r\nok\r\n,100.1234\r\n$5\r\nhello\r\n", + expect: &Value{Typ: redisSet, Slice: []*Value{ + { + Typ: redisArray, + Slice: []*Value{ + {Typ: redisString, Str: "ok"}, + {Typ: redisFloat, Float: 100.1234}, + }, + }, + { + Typ: redisString, + Str: "hello", + }, + }}, + }, + + // Push + { + resp: ">0\r\n", + expect: &Value{Typ: redisPush, Slice: make([]*Value, 0)}, + }, + { + resp: ">\r\n", + err: notNil, + }, + { + resp: ">2\r\n+hello world\r\n:1024\r\n", + expect: &Value{Typ: redisPush, Slice: []*Value{ + {Typ: redisStatus, Str: "hello world"}, + {Typ: redisInteger, Integer: 1024}, + }}, + }, + { + resp: ">2\r\n*2\r\n$2\r\nok\r\n,100.1234\r\n$5\r\nhello\r\n", + expect: &Value{Typ: redisPush, Slice: []*Value{ + { + Typ: redisArray, + Slice: []*Value{ + {Typ: redisString, Str: "ok"}, + {Typ: redisFloat, Float: 100.1234}, + }, + }, + { + Typ: redisString, + Str: "hello", + }, + }}, + }, +} + +func bigInt(s string) *big.Int { + i := new(big.Int) + i, _ = i.SetString(s, 10) + return i +} + +var _ = Describe("resp proto", func() { + It("should format", func() { + pp := []byte{ + redisStatus, + redisError, + redisString, + redisInteger, + redisNil, + redisFloat, + redisBool, + redisBlobError, + redisVerb, + redisBigInt, + redisArray, + redisMap, + redisSet, + redisAttr, + redisPush, + } + for _, p := range pp { + wr := new(bytes.Buffer) + r := NewReader(wr) + + wr.WriteString(fmt.Sprintf("%s1024\n\r\n", string(p))) + _, err := r.ReadReply() + Expect(err).To(Equal( + fmt.Errorf("redis: invalid reply: %q", fmt.Sprintf("%s1024\n", string(p))), + )) + } + }) + + It("read map", func() { + p := "%2\r\n*2\r\n$2\r\nhi\r\n*1\r\n_\r\n#f\r\n$1\r\nk\r\n*1\r\n$4\r\ntext\r\n" + wr := new(bytes.Buffer) + r := NewReader(wr) + + wr.WriteString(p) + v, err := r.ReadReply() + Expect(err).NotTo(HaveOccurred()) + + expectVal := &Value{ + Typ: redisMap, + Map: map[*Value]*Value{ + {Typ: redisArray, Slice: []*Value{ + {Typ: redisString, Str: "hi"}, + {Typ: redisArray, Slice: []*Value{ + {Typ: redisNil, RedisError: Nil}, + }}, + }}: {Typ: redisBool, Boolean: false}, + + {Typ: redisString, Str: "k"}: { + Typ: redisArray, + Slice: []*Value{ + {Typ: redisString, Str: "text"}, + }, + }, + }, + } + Expect(v.Interface()).To(Equal(expectVal.Interface())) + }) + + It("read", func() { + for _, l := range list { + wr := new(bytes.Buffer) + r := NewReader(wr) + + wr.WriteString(l.resp) + + v, err := r.ReadReply() + if l.err != nil { + if l.err == notNil { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).To(Equal(l.err)) + } + } else { + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(l.expect)) + } + } + }) +}) diff --git a/internal/proto/value.go b/internal/proto/value.go new file mode 100644 index 000000000..bd02b36f2 --- /dev/null +++ b/internal/proto/value.go @@ -0,0 +1,493 @@ +package proto + +import ( + "errors" + "fmt" + "math/big" + "strconv" +) + +type Value struct { + Typ byte + Str string + StrFmt string + Integer int64 + Float float64 + BigInt *big.Int + Slice []*Value + Map map[*Value]*Value + Attribute *AttributeType + Boolean bool + + RedisError error +} + +type AttributeType struct { + Attr map[*Value]*Value + Value *Value +} + +var ( + errInt = errors.New("redis return value is not int") + errFloat = errors.New("redis return value is not float") + errBool = errors.New("redis return value is not bool") + errStatus = errors.New("redis return value is not int/bool/string") + errVerb = errors.New("redis return value is not verb") + errMap = errors.New("redis return value is not map") + errSlice = errors.New("redis return value is not array") + errAttr = errors.New("redis return value is not attr") + errStr = errors.New("redis return value cannot be converted to string") +) + +func (v *Value) IsSimpleType() bool { + switch v.Typ { + case redisStatus, redisString, redisInteger, + redisFloat, redisBool, redisVerb, redisBigInt: + return true + } + return false +} + +func (v *Value) IsSlice() bool { + switch v.Typ { + case redisArray, redisSet, redisPush: + return true + } + return false +} + +// Convert the result set to int, there may be overflow problems. +func (v *Value) Int64() (int64, error) { + if v.RedisError != nil { + return 0, v.RedisError + } + switch v.Typ { + case redisInteger: + return v.Integer, nil + case redisBigInt: + return v.BigInt.Int64(), nil + case redisString, redisStatus, redisVerb: + return strconv.ParseInt(v.Str, 10, 64) + } + + return 0, errInt +} + +func (v *Value) Float64() (float64, error) { + if v.RedisError != nil { + return 0, v.RedisError + } + switch v.Typ { + case redisFloat: + return v.Float, nil + case redisString, redisStatus, redisVerb: + return strconv.ParseFloat(v.Str, 64) + } + + return 0, errFloat +} + +func (v *Value) Bool() (bool, error) { + if v.RedisError != nil { + return false, v.RedisError + } + switch v.Typ { + case redisBool: + return v.Boolean, nil + case redisInteger: + return v.Integer != 0, nil + case redisString, redisStatus, redisVerb: + return strconv.ParseBool(v.Str) + } + + return false, errBool +} + +func (v *Value) Status() (bool, error) { + switch v.RedisError { + case Nil: + return false, nil + case nil: + default: + return false, v.RedisError + } + + switch v.Typ { + case redisBool: + return v.Boolean, nil + case redisInteger: + return v.Integer == 1, nil + case redisString, redisStatus, redisVerb: + return v.Str == "OK", nil + } + + return false, errStatus +} + +// The Value can be converted to the type of string, try to use string to represent. +func (v *Value) String() (string, error) { + if v.RedisError != nil { + return "", v.RedisError + } + + switch v.Typ { + case redisStatus, redisString: + return v.Str, nil + case redisInteger: + return strconv.FormatInt(v.Integer, 10), nil + case redisFloat: + return strconv.FormatFloat(v.Float, 'g', -1, 64), nil + case redisBool: + if v.Boolean { + return "true", nil + } + return "false", nil + case redisVerb: + return fmt.Sprintf("%s:%s", v.StrFmt, v.Str), nil + case redisBigInt: + return v.BigInt.String(), nil + } + + return "", errStr +} + +func (v *Value) FmtString() (format string, str string, err error) { + if v.RedisError != nil { + return "", "", v.RedisError + } + if v.Typ == redisVerb { + return v.StrFmt, v.Str, nil + } + + return "", "", errVerb +} + +// Try to convert the result set to map[string]interface, +// which requires the result set to be a simple k-v structure. +// If it is a complex map, it needs to be converted in the outer layer. +func (v *Value) MapStringString() (map[string]string, error) { //nolint:dupl + if v.RedisError != nil { + return nil, v.RedisError + } + + m := make(map[string]string) + switch v.Typ { + case redisMap: + for key, val := range v.Map { + keyStr, err := key.String() + if err != nil { + return nil, err + } + + // val is allowed to be redis.Nil. + valStr, err := val.String() + if err != nil && err != Nil { + return nil, err + } + + m[keyStr] = valStr + } + return m, nil + case redisArray, redisSet, redisPush: + n := len(v.Slice) + if n == 0 { + return m, nil + } + if n%2 != 0 { + return nil, errors.New("the map requires the result set to be a multiple of 2") + } + for i := 0; i < n; i += 2 { + keyStr, err := v.Slice[i].String() + if err != nil { + return nil, err + } + + // val is allowed to be redis.Nil. + valStr, err := v.Slice[i+1].String() + if err != nil && err != Nil { + return nil, err + } + + m[keyStr] = valStr + } + + return m, nil + } + + return nil, errMap +} + +func (v *Value) MapStringInt64() (map[string]int64, error) { //nolint:dupl + if v.RedisError != nil { + return nil, v.RedisError + } + + m := make(map[string]int64) + switch v.Typ { + case redisMap: + for key, val := range v.Map { + keyStr, err := key.String() + if err != nil { + return nil, err + } + + // val is allowed to be redis.Nil. + valInt, err := val.Int64() + if err != nil && err != Nil { + return nil, err + } + + m[keyStr] = valInt + } + return m, nil + case redisArray, redisSet, redisPush: + n := len(v.Slice) + if n == 0 { + return m, nil + } + if n%2 != 0 { + return nil, errors.New("the map requires the result set to be a multiple of 2") + } + for i := 0; i < n; i += 2 { + keyStr, err := v.Slice[i].String() + if err != nil { + return nil, err + } + + // val is allowed to be redis.Nil. + valInt, err := v.Slice[i+1].Int64() + if err != nil && err != Nil { + return nil, err + } + + m[keyStr] = valInt + } + + return m, nil + } + + return nil, errMap +} + +// Convert the result set to []string, which requires the result set to be a simple array, +// if it is a complex array, it needs to be converted in the outer layer. +func (v *Value) SliceString() ([]string, error) { + if v.RedisError != nil { + return nil, v.RedisError + } + + switch v.Typ { + case redisArray, redisSet, redisPush: + ss := make([]string, 0, len(v.Slice)) + for _, val := range v.Slice { + // val is allowed to be redis.Nil. + tmp, err := val.String() + if err != nil && err != Nil { + return nil, err + } + ss = append(ss, tmp) + } + return ss, nil + } + + return nil, errSlice +} + +func (v *Value) SliceInt64() ([]int64, error) { + if v.RedisError != nil { + return nil, v.RedisError + } + + switch v.Typ { + case redisArray, redisSet, redisPush: + si := make([]int64, 0, len(v.Slice)) + for _, val := range v.Slice { + i, err := val.Int64() + if err != nil && err != Nil { + return nil, err + } + si = append(si, i) + } + return si, nil + } + + return nil, errSlice +} + +func (v *Value) SliceFloat64() ([]float64, error) { + if v.RedisError != nil { + return nil, v.RedisError + } + + switch v.Typ { + case redisArray, redisSet, redisPush: + sf := make([]float64, 0, len(v.Slice)) + for _, val := range v.Slice { + i, err := val.Float64() + if err != nil && err != Nil { + return nil, err + } + sf = append(sf, i) + } + return sf, nil + } + + return nil, errSlice +} + +func (v *Value) SliceStatus() ([]bool, error) { + if v.RedisError != nil { + return nil, v.RedisError + } + + switch v.Typ { + case redisArray, redisSet, redisPush: + sb := make([]bool, 0, len(v.Slice)) + for _, val := range v.Slice { + b, err := val.Status() + if err != nil && err != Nil { + return nil, err + } + sb = append(sb, b) + } + return sb, nil + } + + return nil, errSlice +} + +func (v *Value) SliceInterface() ([]interface{}, error) { + if v.RedisError != nil { + return nil, v.RedisError + } + + switch v.Typ { + case redisArray, redisSet, redisPush: + si := make([]interface{}, 0, len(v.Slice)) + for _, val := range v.Slice { + iv := val.Interface() + if iv == Nil { + iv = nil + } + si = append(si, iv) + } + return si, nil + } + + return nil, errSlice +} + +func (v *Value) AttrInterface() interface{} { + if v.RedisError != nil { + return v.RedisError + } + + if v.Typ == redisAttr { + m := make(map[interface{}]interface{}) + for key, val := range v.Attribute.Attr { + m[key.Interface()] = val.Interface() + } + return m + } + + return errAttr +} + +func (v *Value) Interface() interface{} { + if v.RedisError != nil { + return v.RedisError + } + + switch v.Typ { + case redisStatus, redisString: + return v.Str + case redisInteger: + return v.Integer + case redisFloat: + return v.Float + case redisBool: + return v.Boolean + case redisVerb: + return fmt.Sprintf("%s:%s", v.StrFmt, v.Str) + case redisBigInt: + return v.BigInt.String() + + case redisArray, redisSet, redisPush: + slice := make([]interface{}, 0, len(v.Slice)) + for _, val := range v.Slice { + slice = append(slice, val.Interface()) + } + return slice + case redisMap: + m := make(map[interface{}]interface{}) + for key, val := range v.Map { + // panic: runtime error: hash of unhashable type [Type]. + m[fmt.Sprint(key.Interface())] = val.Interface() + } + return m + case redisAttr: + return v.Attribute.Value.Interface() + } + + // Value.Typ are all valid types and will not come here. + return nil +} + +func (v *Value) SliceScan(fn func(item *Value) error) error { + if v.RedisError != nil { + return v.RedisError + } + + switch v.Typ { + case redisArray, redisSet, redisPush: + for _, item := range v.Slice { + if err := fn(item); err != nil { + return err + } + } + return nil + } + + return errSlice +} + +func (v *Value) MapScan(fn func(key, item *Value) error) error { + if v.RedisError != nil { + return v.RedisError + } + + if v.Typ == redisMap { + for key, item := range v.Map { + if err := fn(key, item); err != nil { + return err + } + } + return nil + } + + return errMap +} + +func (v *Value) MapLen() (int, error) { + if v.RedisError != nil { + return 0, v.RedisError + } + + if v.Typ == redisMap { + return len(v.Map), nil + } + + return 0, errMap +} + +func (v *Value) SliceLen() (int, error) { + if v.RedisError != nil { + return 0, v.RedisError + } + switch v.Typ { + case redisArray, redisSet, redisPush: + return len(v.Slice), nil + } + + return 0, errSlice +} diff --git a/internal/proto/value_test.go b/internal/proto/value_test.go new file mode 100644 index 000000000..a83b9a6bc --- /dev/null +++ b/internal/proto/value_test.go @@ -0,0 +1,244 @@ +package proto + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "math/big" +) + +var _ = Describe("resp value", func() { + var v *Value + + BeforeEach(func() { + v = &Value{} + }) + It("int64", func() { + v.Typ = redisInteger + v.Integer = 1024 + Expect(v.Int64()).To(Equal(int64(1024))) + + v.Typ = redisString + v.Str = "2048" + Expect(v.Int64()).To(Equal(int64(2048))) + + v.Typ = redisBigInt + v.BigInt = &big.Int{} + v.BigInt.SetString("4096", 10) + Expect(v.Int64()).To(Equal(int64(4096))) + }) + + It("float64", func() { + v.Typ = redisFloat + v.Float = 1024.1024 + Expect(v.Float64()).To(Equal(1024.1024)) + + v.Typ = redisString + v.Str = "2048.2048" + Expect(v.Float64()).To(Equal(2048.2048)) + }) + + It("bool", func() { + v.Typ = redisBool + v.Boolean = true + Expect(v.Bool()).To(BeTrue()) + + v.Typ = redisInteger + v.Integer = 1 + Expect(v.Bool()).To(BeTrue()) + + v.Typ = redisString + v.Str = "1" + Expect(v.Bool()).To(BeTrue()) + }) + + It("status", func() { + v.Typ = redisBool + v.Boolean = true + Expect(v.Status()).To(BeTrue()) + + v.Typ = redisInteger + v.Integer = 1 + Expect(v.Status()).To(BeTrue()) + + v.Typ = redisString + v.Str = "OK" + Expect(v.Status()).To(BeTrue()) + }) + + It("string", func() { + v.Typ = redisString + v.Str = "string" + Expect(v.String()).To(Equal("string")) + + v.Typ = redisStatus + v.Str = "status" + Expect(v.String()).To(Equal("status")) + + v.Typ = redisInteger + v.Integer = 1024 + Expect(v.String()).To(Equal("1024")) + + v.Typ = redisFloat + v.Float = 2048.2048 + Expect(v.String()).To(Equal("2048.2048")) + + v.Typ = redisBool + v.Boolean = true + Expect(v.String()).To(Equal("true")) + + v.Typ = redisVerb + v.StrFmt, v.Str = "txt", "hello" + Expect(v.String()).To(Equal("txt:hello")) + + v.Typ = redisBigInt + v.BigInt = &big.Int{} + v.BigInt.SetString("4096", 10) + Expect(v.String()).To(Equal("4096")) + }) + + It("fmt string", func() { + v.Typ = redisVerb + v.StrFmt, v.Str = "txt", "hello" + format, str, _ := v.FmtString() + Expect(format).To(Equal("txt")) + Expect(str).To(Equal("hello")) + }) + + It("map string string", func() { + v.Typ = redisMap + v.Map = map[*Value]*Value{ + { + Typ: redisInteger, + Integer: 1024, + }: { + Typ: redisStatus, + Str: "OK", + }, + } + Expect(v.MapStringString()).To(Equal(map[string]string{ + "1024": "OK", + })) + + v.Typ = redisArray + v.Slice = []*Value{ + {Typ: redisInteger, Integer: 2048}, + {Typ: redisString, Str: "hello"}, + {Typ: redisFloat, Float: 4096.4096}, + } + _, err := v.MapStringString() + Expect(err).To(HaveOccurred()) + + v.Slice = append(v.Slice, &Value{ + Typ: redisBool, + Boolean: true, + }) + Expect(v.MapStringString()).To(Equal(map[string]string{ + "2048": "hello", + "4096.4096": "true", + })) + }) + + It("map string int64", func() { + v.Typ = redisMap + v.Map = map[*Value]*Value{ + { + Typ: redisStatus, + Str: "OK", + }: { + Typ: redisInteger, + Integer: 1024, + }, + } + Expect(v.MapStringInt64()).To(Equal(map[string]int64{ + "OK": 1024, + })) + + v.Typ = redisArray + v.Slice = []*Value{ + {Typ: redisString, Str: "hello"}, + {Typ: redisInteger, Integer: 2048}, + {Typ: redisFloat, Float: 4096.4096}, + } + _, err := v.MapStringString() + Expect(err).To(HaveOccurred()) + + v.Slice = append(v.Slice, &Value{ + Typ: redisInteger, + Integer: 8192, + }) + Expect(v.MapStringInt64()).To(Equal(map[string]int64{ + "hello": 2048, + "4096.4096": 8192, + })) + }) + + It("slice string", func() { + v.Typ = redisArray + v.Slice = []*Value{ + {Typ: redisInteger, Integer: 1024}, + {Typ: redisFloat, Float: 2048.2048}, + {Typ: redisStatus, Str: "OK"}, + {Typ: redisString, Str: "hello"}, + {Typ: redisBool, Boolean: true}, + } + Expect(v.SliceString()).To(Equal([]string{ + "1024", "2048.2048", "OK", "hello", "true", + })) + }) + + It("slice int64", func() { + v.Typ = redisArray + v.Slice = []*Value{ + {Typ: redisInteger, Integer: 1024}, + {Typ: redisString, Str: "2048"}, + } + Expect(v.SliceInt64()).To(Equal([]int64{ + 1024, 2048, + })) + }) + + It("slice float64", func() { + v.Typ = redisArray + v.Slice = []*Value{ + {Typ: redisFloat, Float: 1024.1024}, + {Typ: redisString, Str: "2048.2048"}, + } + Expect(v.SliceFloat64()).To(Equal([]float64{ + 1024.1024, 2048.2048, + })) + }) + + It("slice scan", func() { + v.Typ = redisArray + v.Slice = []*Value{ + {Typ: redisInteger, Integer: 1024}, + {Typ: redisString, Str: "hello"}, + {Typ: redisBool, Boolean: true}, + } + + var strs []string + err := v.SliceScan(func(item *Value) error { + temp, err := item.String() + Expect(err).NotTo(HaveOccurred()) + strs = append(strs, temp) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + Expect(strs).To(Equal([]string{ + "1024", "hello", "true", + })) + }) + + It("slice len", func() { + v.Typ = redisArray + v.Slice = []*Value{ + {Typ: redisInteger, Integer: 1024}, + {Typ: redisString, Str: "hello"}, + {Typ: redisBool, Boolean: true}, + } + + n, err := v.SliceLen() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(3)) + }) +}) diff --git a/internal/proto/writer.go b/internal/proto/writer.go index 81b09b8e4..87d3e2223 100644 --- a/internal/proto/writer.go +++ b/internal/proto/writer.go @@ -34,7 +34,7 @@ func NewWriter(wr writer) *Writer { } func (w *Writer) WriteArgs(args []interface{}) error { - if err := w.WriteByte(ArrayReply); err != nil { + if err := w.WriteByte(redisArray); err != nil { return err } @@ -111,7 +111,7 @@ func (w *Writer) WriteArg(v interface{}) error { } func (w *Writer) bytes(b []byte) error { - if err := w.WriteByte(StringReply); err != nil { + if err := w.WriteByte(redisString); err != nil { return err } diff --git a/internal/proto/writer_test.go b/internal/proto/writer_test.go index c5df9a6f2..415c91972 100644 --- a/internal/proto/writer_test.go +++ b/internal/proto/writer_test.go @@ -3,7 +3,6 @@ package proto_test import ( "bytes" "encoding" - "testing" "time" "github.com/go-redis/redis/v8/internal/proto" @@ -80,14 +79,14 @@ func (discard) WriteByte(c byte) error { return nil } -func BenchmarkWriteBuffer_Append(b *testing.B) { - buf := proto.NewWriter(discard{}) - args := []interface{}{"hello", "world", "foo", "bar"} - - for i := 0; i < b.N; i++ { - err := buf.WriteArgs(args) - if err != nil { - b.Fatal(err) - } - } -} +//func BenchmarkWriteBuffer_Append(b *testing.B) { +// buf := proto.NewWriter(discard{}) +// args := []interface{}{"hello", "world", "foo", "bar"} +// +// for i := 0; i < b.N; i++ { +// err := buf.WriteArgs(args) +// if err != nil { +// b.Fatal(err) +// } +// } +//} diff --git a/pubsub.go b/pubsub.go index bd5bd2adc..a2846d8ce 100644 --- a/pubsub.go +++ b/pubsub.go @@ -118,7 +118,7 @@ func mapKeys(m map[string]struct{}) []string { i := 0 for k := range m { s[i] = k - i++ // nolint:wastedassign + i++ } return s } @@ -375,8 +375,8 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int return nil, err } - err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error { - return c.cmd.readReply(rd) + err = cn.WithReader(ctx, timeout, func(pv *proto.Value) error { + return c.cmd.readReply(pv) }) c.releaseConnWithLock(ctx, cn, err, timeout > 0) diff --git a/redis.go b/redis.go index 7995c4365..a1fb026b3 100644 --- a/redis.go +++ b/redis.go @@ -468,20 +468,23 @@ func (c *baseClient) pipelineProcessCmds( return true, err } - err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { - return pipelineReadCmds(rd, cmds) + err = cn.WithReaders(ctx, c.opt.ReadTimeout, len(cmds), func(pvs []*proto.Value) error { + return pipelineReadCmds(pvs, cmds) }) return true, err } -func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { - for _, cmd := range cmds { - err := cmd.readReply(rd) - cmd.SetErr(err) - if err != nil && !isRedisError(err) { - return err +func pipelineReadCmds(pvs []*proto.Value, cmds []Cmder) error { + if len(pvs) == len(cmds) { + for key, cmd := range cmds { + err := cmd.readReply(pvs[key]) + cmd.SetErr(err) + if err != nil && !isRedisError(err) { + return err + } } } + return nil } @@ -495,17 +498,35 @@ func (c *baseClient) txPipelineProcessCmds( return true, err } - err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { + err = cn.WithReaders(ctx, c.opt.ReadTimeout, len(cmds), func(pvs []*proto.Value) error { statusCmd := cmds[0].(*StatusCmd) + if err := statusCmd.readReply(pvs[0]); err != nil { + return err + } + // Trim multi and exec. cmds = cmds[1 : len(cmds)-1] - err := txPipelineReadQueued(rd, statusCmd, cmds) - if err != nil { + for key := range cmds { + if err := statusCmd.readReply(pvs[key+1]); err != nil && !isRedisError(err) { + return err + } + } + + pv := pvs[len(pvs)-1] + n, err := pv.SliceLen() + switch err { + case nil: + case Nil: + return TxFailedErr + default: return err } + if n != len(cmds) { + return fmt.Errorf("redis: expected %d, but got %d", len(cmds), n) + } - return pipelineReadCmds(rd, cmds) + return pipelineReadCmds(pv.Slice, cmds) }) return false, err } @@ -521,40 +542,6 @@ func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder { return cmdCopy } -func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { - // Parse queued replies. - if err := statusCmd.readReply(rd); err != nil { - return err - } - - for range cmds { - if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) { - return err - } - } - - // Parse number of replies. - line, err := rd.ReadLine() - if err != nil { - if err == Nil { - err = TxFailedErr - } - return err - } - - switch line[0] { - case proto.ErrorReply: - return proto.ParseErrorReply(line) - case proto.ArrayReply: - // ok - default: - err := fmt.Errorf("redis: expected '*', but got line %q", line) - return err - } - - return nil -} - //------------------------------------------------------------------------------ // Client is a Redis client representing a pool of zero or more diff --git a/result.go b/result.go index 24cfd4994..01ce958fe 100644 --- a/result.go +++ b/result.go @@ -1,11 +1,15 @@ package redis -import "time" +import ( + "time" + + "github.com/go-redis/redis/v8/internal/proto" +) // NewCmdResult returns a Cmd initialised with val and err for testing. -func NewCmdResult(val interface{}, err error) *Cmd { +func NewCmdResult(pv *proto.Value, err error) *Cmd { var cmd Cmd - cmd.val = val + cmd.val = pv cmd.SetErr(err) return &cmd }