Skip to content

Commit dbd2c99

Browse files
committed
Rework pipeline retrying
1 parent 0daeac9 commit dbd2c99

13 files changed

+386
-254
lines changed

cluster.go

Lines changed: 203 additions & 122 deletions
Large diffs are not rendered by default.

cluster_test.go

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ var _ = Describe("ClusterClient", func() {
200200

201201
Eventually(func() string {
202202
return client.Get("A").Val()
203-
}).Should(Equal("VALUE"))
203+
}, 30*time.Second).Should(Equal("VALUE"))
204204

205205
cnt, err := client.Del("A").Result()
206206
Expect(err).NotTo(HaveOccurred())
@@ -215,7 +215,7 @@ var _ = Describe("ClusterClient", func() {
215215

216216
Eventually(func() string {
217217
return client.Get("A").Val()
218-
}).Should(Equal("VALUE"))
218+
}, 30*time.Second).Should(Equal("VALUE"))
219219
})
220220

221221
It("distributes keys", func() {
@@ -227,7 +227,7 @@ var _ = Describe("ClusterClient", func() {
227227
for _, master := range cluster.masters() {
228228
Eventually(func() string {
229229
return master.Info("keyspace").Val()
230-
}, 5*time.Second).Should(Or(
230+
}, 30*time.Second).Should(Or(
231231
ContainSubstring("keys=31"),
232232
ContainSubstring("keys=29"),
233233
ContainSubstring("keys=40"),
@@ -251,7 +251,7 @@ var _ = Describe("ClusterClient", func() {
251251
for _, master := range cluster.masters() {
252252
Eventually(func() string {
253253
return master.Info("keyspace").Val()
254-
}, 5*time.Second).Should(Or(
254+
}, 30*time.Second).Should(Or(
255255
ContainSubstring("keys=31"),
256256
ContainSubstring("keys=29"),
257257
ContainSubstring("keys=40"),
@@ -320,10 +320,6 @@ var _ = Describe("ClusterClient", func() {
320320
Expect(err).NotTo(HaveOccurred())
321321
Expect(cmds).To(HaveLen(14))
322322

323-
if opt.RouteByLatency {
324-
return
325-
}
326-
327323
for _, key := range keys {
328324
slot := hashtag.Slot(key)
329325
client.SwapSlotNodes(slot)
@@ -432,6 +428,9 @@ var _ = Describe("ClusterClient", func() {
432428
})
433429

434430
AfterEach(func() {
431+
_ = client.ForEachMaster(func(master *redis.Client) error {
432+
return master.FlushDB().Err()
433+
})
435434
Expect(client.Close()).NotTo(HaveOccurred())
436435
})
437436

@@ -560,6 +559,9 @@ var _ = Describe("ClusterClient", func() {
560559
})
561560

562561
AfterEach(func() {
562+
_ = client.ForEachMaster(func(master *redis.Client) error {
563+
return master.FlushDB().Err()
564+
})
563565
Expect(client.Close()).NotTo(HaveOccurred())
564566
})
565567

@@ -575,10 +577,19 @@ var _ = Describe("ClusterClient", func() {
575577
_ = client.ForEachMaster(func(master *redis.Client) error {
576578
return master.FlushDB().Err()
577579
})
580+
581+
_ = client.ForEachSlave(func(slave *redis.Client) error {
582+
Eventually(func() int64 {
583+
return client.DBSize().Val()
584+
}, 30*time.Second).Should(Equal(int64(0)))
585+
return nil
586+
})
578587
})
579588

580589
AfterEach(func() {
581-
client.FlushDB()
590+
_ = client.ForEachMaster(func(master *redis.Client) error {
591+
return master.FlushDB().Err()
592+
})
582593
Expect(client.Close()).NotTo(HaveOccurred())
583594
})
584595

@@ -597,7 +608,7 @@ var _ = Describe("ClusterClient without nodes", func() {
597608
Expect(client.Close()).NotTo(HaveOccurred())
598609
})
599610

600-
It("returns an error", func() {
611+
It("Ping returns an error", func() {
601612
err := client.Ping().Err()
602613
Expect(err).To(MatchError("redis: cluster has no nodes"))
603614
})
@@ -626,15 +637,15 @@ var _ = Describe("ClusterClient without valid nodes", func() {
626637

627638
It("returns an error", func() {
628639
err := client.Ping().Err()
629-
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
640+
Expect(err).To(MatchError("redis: cannot load cluster slots"))
630641
})
631642

632643
It("pipeline returns an error", func() {
633644
_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
634645
pipe.Ping()
635646
return nil
636647
})
637-
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
648+
Expect(err).To(MatchError("redis: cannot load cluster slots"))
638649
})
639650
})
640651

@@ -664,7 +675,7 @@ var _ = Describe("ClusterClient timeout", func() {
664675
It("Tx timeouts", func() {
665676
err := client.Watch(func(tx *redis.Tx) error {
666677
return tx.Ping().Err()
667-
})
678+
}, "foo")
668679
Expect(err).To(HaveOccurred())
669680
Expect(err.(net.Error).Timeout()).To(BeTrue())
670681
})
@@ -676,42 +687,20 @@ var _ = Describe("ClusterClient timeout", func() {
676687
return nil
677688
})
678689
return err
679-
})
690+
}, "foo")
680691
Expect(err).To(HaveOccurred())
681692
Expect(err.(net.Error).Timeout()).To(BeTrue())
682693
})
683694
}
684695

685-
Context("read timeout", func() {
686-
BeforeEach(func() {
687-
opt := redisClusterOptions()
688-
opt.ReadTimeout = time.Nanosecond
689-
opt.WriteTimeout = -1
690-
client = cluster.clusterClient(opt)
691-
})
692-
693-
testTimeout()
694-
})
696+
const pause = time.Second
695697

696-
Context("write timeout", func() {
698+
Context("read/write timeout", func() {
697699
BeforeEach(func() {
698700
opt := redisClusterOptions()
699-
opt.ReadTimeout = time.Nanosecond
700-
opt.WriteTimeout = -1
701-
client = cluster.clusterClient(opt)
702-
})
703-
704-
testTimeout()
705-
})
706-
707-
Context("ClientPause timeout", func() {
708-
const pause = time.Second
709-
710-
BeforeEach(func() {
711-
opt := redisClusterOptions()
712-
opt.ReadTimeout = pause / 10
713-
opt.WriteTimeout = pause / 10
714-
opt.MaxRedirects = -1
701+
opt.ReadTimeout = 100 * time.Millisecond
702+
opt.WriteTimeout = 100 * time.Millisecond
703+
opt.MaxRedirects = 1
715704
client = cluster.clusterClient(opt)
716705

717706
err := client.ForEachNode(func(client *redis.Client) error {

command.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,19 @@ type Cmder interface {
4646

4747
func setCmdsErr(cmds []Cmder, e error) {
4848
for _, cmd := range cmds {
49-
cmd.setErr(e)
49+
if cmd.Err() == nil {
50+
cmd.setErr(e)
51+
}
52+
}
53+
}
54+
55+
func firstCmdsErr(cmds []Cmder) error {
56+
for _, cmd := range cmds {
57+
if err := cmd.Err(); err != nil {
58+
return err
59+
}
5060
}
61+
return nil
5162
}
5263

5364
func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
@@ -95,7 +106,6 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
95106
return 1
96107
}
97108
if info == nil {
98-
internal.Logf("info for cmd=%s not found", cmd.Name())
99109
return -1
100110
}
101111
return int(info.FirstKeyPos)

commands_test.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,21 @@ var _ = Describe("Commands", func() {
2727
Describe("server", func() {
2828

2929
It("should Auth", func() {
30-
_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
30+
cmds, err := client.Pipelined(func(pipe redis.Pipeliner) error {
3131
pipe.Auth("password")
32+
pipe.Auth("")
3233
return nil
3334
})
3435
Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
36+
Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
37+
Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
38+
39+
stats := client.Pool().Stats()
40+
Expect(stats.Requests).To(Equal(uint32(2)))
41+
Expect(stats.Hits).To(Equal(uint32(1)))
42+
Expect(stats.Timeouts).To(Equal(uint32(0)))
43+
Expect(stats.TotalConns).To(Equal(uint32(1)))
44+
Expect(stats.FreeConns).To(Equal(uint32(1)))
3545
})
3646

3747
It("should Echo", func() {
@@ -187,6 +197,29 @@ var _ = Describe("Commands", func() {
187197
Expect(tm).To(BeTemporally("~", time.Now(), 3*time.Second))
188198
})
189199

200+
It("Should Command", func() {
201+
cmds, err := client.Command().Result()
202+
Expect(err).NotTo(HaveOccurred())
203+
Expect(len(cmds)).To(BeNumerically("~", 180, 10))
204+
205+
cmd := cmds["mget"]
206+
Expect(cmd.Name).To(Equal("mget"))
207+
Expect(cmd.Arity).To(Equal(int8(-2)))
208+
Expect(cmd.Flags).To(ContainElement("readonly"))
209+
Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
210+
Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
211+
Expect(cmd.StepCount).To(Equal(int8(1)))
212+
213+
cmd = cmds["ping"]
214+
Expect(cmd.Name).To(Equal("ping"))
215+
Expect(cmd.Arity).To(Equal(int8(-1)))
216+
Expect(cmd.Flags).To(ContainElement("stale"))
217+
Expect(cmd.Flags).To(ContainElement("fast"))
218+
Expect(cmd.FirstKeyPos).To(Equal(int8(0)))
219+
Expect(cmd.LastKeyPos).To(Equal(int8(0)))
220+
Expect(cmd.StepCount).To(Equal(int8(0)))
221+
})
222+
190223
})
191224

192225
Describe("debugging", func() {
@@ -2887,24 +2920,6 @@ var _ = Describe("Commands", func() {
28872920

28882921
})
28892922

2890-
Describe("Command", func() {
2891-
2892-
It("returns map of commands", func() {
2893-
cmds, err := client.Command().Result()
2894-
Expect(err).NotTo(HaveOccurred())
2895-
Expect(len(cmds)).To(BeNumerically("~", 180, 10))
2896-
2897-
cmd := cmds["mget"]
2898-
Expect(cmd.Name).To(Equal("mget"))
2899-
Expect(cmd.Arity).To(Equal(int8(-2)))
2900-
Expect(cmd.Flags).To(ContainElement("readonly"))
2901-
Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
2902-
Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
2903-
Expect(cmd.StepCount).To(Equal(int8(1)))
2904-
})
2905-
2906-
})
2907-
29082923
Describe("Eval", func() {
29092924

29102925
It("returns keys and values", func() {

export_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,26 @@ func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error)
2020
}
2121

2222
func (c *ClusterClient) SlotAddrs(slot int) []string {
23+
state, err := c.state()
24+
if err != nil {
25+
panic(err)
26+
}
27+
2328
var addrs []string
24-
for _, n := range c.state().slotNodes(slot) {
29+
for _, n := range state.slotNodes(slot) {
2530
addrs = append(addrs, n.Client.getAddr())
2631
}
2732
return addrs
2833
}
2934

3035
// SwapSlot swaps a slot's master/slave address for testing MOVED redirects.
3136
func (c *ClusterClient) SwapSlotNodes(slot int) {
32-
nodes := c.state().slots[slot]
37+
state, err := c.state()
38+
if err != nil {
39+
panic(err)
40+
}
41+
42+
nodes := state.slots[slot]
3343
if len(nodes) == 2 {
3444
nodes[0], nodes[1] = nodes[1], nodes[0]
3545
}

internal/error.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,23 @@ type RedisError string
1313
func (e RedisError) Error() string { return string(e) }
1414

1515
func IsRetryableError(err error) bool {
16-
return IsNetworkError(err) || err.Error() == "ERR max number of clients reached"
16+
if IsNetworkError(err) {
17+
return true
18+
}
19+
s := err.Error()
20+
if s == "ERR max number of clients reached" {
21+
return true
22+
}
23+
if strings.HasPrefix(s, "LOADING ") {
24+
return true
25+
}
26+
if strings.HasPrefix(s, "CLUSTERDOWN ") {
27+
return true
28+
}
29+
return false
1730
}
1831

19-
func IsInternalError(err error) bool {
32+
func IsRedisError(err error) bool {
2033
_, ok := err.(RedisError)
2134
return ok
2235
}
@@ -33,7 +46,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
3346
if err == nil {
3447
return false
3548
}
36-
if IsInternalError(err) {
49+
if IsRedisError(err) {
3750
return false
3851
}
3952
if allowTimeout {
@@ -45,7 +58,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
4558
}
4659

4760
func IsMovedError(err error) (moved bool, ask bool, addr string) {
48-
if !IsInternalError(err) {
61+
if !IsRedisError(err) {
4962
return
5063
}
5164

@@ -69,7 +82,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
6982
func IsLoadingError(err error) bool {
7083
return strings.HasPrefix(err.Error(), "LOADING ")
7184
}
72-
73-
func IsClusterDownError(err error) bool {
74-
return strings.HasPrefix(err.Error(), "CLUSTERDOWN ")
75-
}

internal/proto/reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (p *Reader) ReadLine() ([]byte, error) {
6363
return nil, bufio.ErrBufferFull
6464
}
6565
if len(line) == 0 {
66-
return nil, internal.RedisError("redis: reply is empty")
66+
return nil, fmt.Errorf("redis: reply is empty")
6767
}
6868
if isNilReply(line) {
6969
return nil, internal.Nil

internal/proto/scan.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
func Scan(b []byte, v interface{}) error {
1212
switch v := v.(type) {
1313
case nil:
14-
return internal.RedisError("redis: Scan(nil)")
14+
return fmt.Errorf("redis: Scan(nil)")
1515
case *string:
1616
*v = internal.BytesToString(b)
1717
return nil

main_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis_test
33
import (
44
"errors"
55
"fmt"
6+
"log"
67
"net"
78
"os"
89
"os/exec"
@@ -51,7 +52,7 @@ var cluster = &clusterScenario{
5152
}
5253

5354
func init() {
54-
//redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
55+
redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
5556
}
5657

5758
var _ = BeforeSuite(func() {

0 commit comments

Comments
 (0)