@@ -19,54 +19,53 @@ import (
19
19
type PubSub struct {
20
20
base baseClient
21
21
22
- mu sync.Mutex
23
- cn * pool.Conn
24
- closed bool
25
-
26
- subMu sync.Mutex
22
+ mu sync.Mutex
23
+ cn * pool.Conn
27
24
channels []string
28
25
patterns []string
26
+ closed bool
29
27
30
28
cmd * Cmd
31
29
}
32
30
33
- func (c * PubSub ) conn () (* pool.Conn , bool , error ) {
31
+ func (c * PubSub ) conn () (* pool.Conn , error ) {
34
32
c .mu .Lock ()
35
- defer c .mu .Unlock ()
33
+ cn , err := c ._conn ()
34
+ c .mu .Unlock ()
35
+ return cn , err
36
+ }
36
37
38
+ func (c * PubSub ) _conn () (* pool.Conn , error ) {
37
39
if c .closed {
38
- return nil , false , pool .ErrClosed
40
+ return nil , pool .ErrClosed
39
41
}
40
42
41
43
if c .cn != nil {
42
- return c .cn , false , nil
44
+ return c .cn , nil
43
45
}
44
46
45
47
cn , err := c .base .connPool .NewConn ()
46
48
if err != nil {
47
- return nil , false , err
49
+ return nil , err
48
50
}
49
51
50
52
if ! cn .Inited {
51
53
if err := c .base .initConn (cn ); err != nil {
52
54
_ = c .base .connPool .CloseConn (cn )
53
- return nil , false , err
55
+ return nil , err
54
56
}
55
57
}
56
58
57
59
if err := c .resubscribe (cn ); err != nil {
58
60
_ = c .base .connPool .CloseConn (cn )
59
- return nil , false , err
61
+ return nil , err
60
62
}
61
63
62
64
c .cn = cn
63
- return cn , true , nil
65
+ return cn , nil
64
66
}
65
67
66
68
func (c * PubSub ) resubscribe (cn * pool.Conn ) error {
67
- c .subMu .Lock ()
68
- defer c .subMu .Unlock ()
69
-
70
69
var firstErr error
71
70
if len (c .channels ) > 0 {
72
71
if err := c ._subscribe (cn , "subscribe" , c .channels ... ); err != nil && firstErr == nil {
@@ -81,6 +80,18 @@ func (c *PubSub) resubscribe(cn *pool.Conn) error {
81
80
return firstErr
82
81
}
83
82
83
+ func (c * PubSub ) _subscribe (cn * pool.Conn , redisCmd string , channels ... string ) error {
84
+ args := make ([]interface {}, 1 + len (channels ))
85
+ args [0 ] = redisCmd
86
+ for i , channel := range channels {
87
+ args [1 + i ] = channel
88
+ }
89
+ cmd := NewSliceCmd (args ... )
90
+
91
+ cn .SetWriteTimeout (c .base .opt .WriteTimeout )
92
+ return writeCmd (cn , cmd )
93
+ }
94
+
84
95
func (c * PubSub ) putConn (cn * pool.Conn , err error ) {
85
96
if ! internal .IsBadConn (err , true ) {
86
97
return
@@ -114,67 +125,55 @@ func (c *PubSub) Close() error {
114
125
return nil
115
126
}
116
127
117
- func (c * PubSub ) subscribe (redisCmd string , channels ... string ) error {
118
- cn , isNew , err := c .conn ()
119
- if err != nil {
120
- return err
121
- }
122
-
123
- if isNew {
124
- return nil
125
- }
126
-
127
- err = c ._subscribe (cn , redisCmd , channels ... )
128
- c .putConn (cn , err )
129
- return err
130
- }
131
-
132
- func (c * PubSub ) _subscribe (cn * pool.Conn , redisCmd string , channels ... string ) error {
133
- args := make ([]interface {}, 1 + len (channels ))
134
- args [0 ] = redisCmd
135
- for i , channel := range channels {
136
- args [1 + i ] = channel
137
- }
138
- cmd := NewSliceCmd (args ... )
139
-
140
- cn .SetWriteTimeout (c .base .opt .WriteTimeout )
141
- return writeCmd (cn , cmd )
142
- }
143
-
144
128
// Subscribes the client to the specified channels. It returns
145
129
// empty subscription if there are no channels.
146
130
func (c * PubSub ) Subscribe (channels ... string ) error {
147
- c .subMu .Lock ()
131
+ c .mu .Lock ()
132
+ err := c .subscribe ("subscribe" , channels ... )
148
133
c .channels = appendIfNotExists (c .channels , channels ... )
149
- c .subMu .Unlock ()
150
- return c . subscribe ( "subscribe" , channels ... )
134
+ c .mu .Unlock ()
135
+ return err
151
136
}
152
137
153
138
// Subscribes the client to the given patterns. It returns
154
139
// empty subscription if there are no patterns.
155
140
func (c * PubSub ) PSubscribe (patterns ... string ) error {
156
- c .subMu .Lock ()
141
+ c .mu .Lock ()
142
+ err := c .subscribe ("psubscribe" , patterns ... )
157
143
c .patterns = appendIfNotExists (c .patterns , patterns ... )
158
- c .subMu .Unlock ()
159
- return c . subscribe ( "psubscribe" , patterns ... )
144
+ c .mu .Unlock ()
145
+ return err
160
146
}
161
147
162
148
// Unsubscribes the client from the given channels, or from all of
163
149
// them if none is given.
164
150
func (c * PubSub ) Unsubscribe (channels ... string ) error {
165
- c .subMu .Lock ()
151
+ c .mu .Lock ()
152
+ err := c .subscribe ("unsubscribe" , channels ... )
166
153
c .channels = remove (c .channels , channels ... )
167
- c .subMu .Unlock ()
168
- return c . subscribe ( "unsubscribe" , channels ... )
154
+ c .mu .Unlock ()
155
+ return err
169
156
}
170
157
171
158
// Unsubscribes the client from the given patterns, or from all of
172
159
// them if none is given.
173
160
func (c * PubSub ) PUnsubscribe (patterns ... string ) error {
174
- c .subMu .Lock ()
161
+ c .mu .Lock ()
162
+ err := c .subscribe ("punsubscribe" , patterns ... )
175
163
c .patterns = remove (c .patterns , patterns ... )
176
- c .subMu .Unlock ()
177
- return c .subscribe ("punsubscribe" , patterns ... )
164
+ c .mu .Unlock ()
165
+ return err
166
+ }
167
+
168
+ func (c * PubSub ) subscribe (redisCmd string , channels ... string ) error {
169
+ cn , err := c ._conn ()
170
+ if err != nil {
171
+ return err
172
+ }
173
+
174
+ err = c ._subscribe (cn , redisCmd , channels ... )
175
+ c .putConn (cn , err )
176
+ return err
178
177
}
179
178
180
179
func (c * PubSub ) Ping (payload ... string ) error {
@@ -184,7 +183,7 @@ func (c *PubSub) Ping(payload ...string) error {
184
183
}
185
184
cmd := NewCmd (args ... )
186
185
187
- cn , _ , err := c .conn ()
186
+ cn , err := c .conn ()
188
187
if err != nil {
189
188
return err
190
189
}
@@ -277,7 +276,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
277
276
c .cmd = NewCmd ()
278
277
}
279
278
280
- cn , _ , err := c .conn ()
279
+ cn , err := c .conn ()
281
280
if err != nil {
282
281
return nil , err
283
282
}
0 commit comments