@@ -213,6 +213,7 @@ func (shard *ringShard) Vote(up bool) bool {
213
213
type ringSharding struct {
214
214
opt * RingOptions
215
215
216
+ addrsMu sync.Mutex
216
217
mu sync.RWMutex
217
218
shards * ringShards
218
219
closed bool
@@ -245,46 +246,62 @@ func (c *ringSharding) OnNewNode(fn func(rdb *Client)) {
245
246
// decrease number of shards, that you use. It will reuse shards that
246
247
// existed before and close the ones that will not be used anymore.
247
248
func (c * ringSharding ) SetAddrs (addrs map [string ]string ) {
248
- c .mu .Lock ()
249
+ c .addrsMu .Lock ()
250
+ defer c .addrsMu .Unlock ()
251
+
252
+ cleanup := func (shards map [string ]* ringShard ) {
253
+ for addr , shard := range shards {
254
+ if err := shard .Client .Close (); err != nil {
255
+ internal .Logger .Printf (context .Background (), "shard.Close %s failed: %s" , addr , err )
256
+ }
257
+ }
258
+ }
249
259
260
+ c .mu .RLock ()
250
261
if c .closed {
251
- c .mu .Unlock ()
262
+ c .mu .RUnlock ()
252
263
return
253
264
}
265
+ existing := c .shards
266
+ c .mu .RUnlock ()
254
267
255
- shards , cleanup := c .newRingShards (addrs , c .shards )
268
+ shards , created , unused := c .newRingShards (addrs , existing )
269
+
270
+ c .mu .Lock ()
271
+ if c .closed {
272
+ cleanup (created )
273
+ c .mu .Unlock ()
274
+ return
275
+ }
256
276
c .shards = shards
257
277
c .rebalanceLocked ()
258
278
c .mu .Unlock ()
259
279
260
- cleanup ()
280
+ cleanup (unused )
261
281
}
262
282
263
283
func (c * ringSharding ) newRingShards (
264
- addrs map [string ]string , existingShards * ringShards ,
265
- ) (* ringShards , func ()) {
266
- shardMap := make (map [string ]* ringShard ) // indexed by addr
267
- unusedShards := make (map [string ]* ringShard ) // indexed by addr
268
-
269
- if existingShards != nil {
270
- for _ , shard := range existingShards .list {
271
- addr := shard .Client .opt .Addr
272
- shardMap [addr ] = shard
273
- unusedShards [addr ] = shard
274
- }
275
- }
284
+ addrs map [string ]string , existing * ringShards ,
285
+ ) (shards * ringShards , created , unused map [string ]* ringShard ) {
286
+
287
+ shards = & ringShards {m : make (map [string ]* ringShard , len (addrs ))}
288
+ created = make (map [string ]* ringShard ) // indexed by addr
289
+ unused = make (map [string ]* ringShard ) // indexed by addr
276
290
277
- shards := & ringShards {
278
- m : make (map [string ]* ringShard ),
291
+ if existing != nil {
292
+ for _ , shard := range existing .list {
293
+ unused [shard .addr ] = shard
294
+ }
279
295
}
280
296
281
297
for name , addr := range addrs {
282
- if shard , ok := shardMap [addr ]; ok {
298
+ if shard , ok := unused [addr ]; ok {
283
299
shards .m [name ] = shard
284
- delete (unusedShards , addr )
300
+ delete (unused , addr )
285
301
} else {
286
302
shard := newRingShard (c .opt , addr )
287
303
shards .m [name ] = shard
304
+ created [addr ] = shard
288
305
289
306
for _ , fn := range c .onNewNode {
290
307
fn (shard .Client )
@@ -296,13 +313,7 @@ func (c *ringSharding) newRingShards(
296
313
shards .list = append (shards .list , shard )
297
314
}
298
315
299
- return shards , func () {
300
- for addr , shard := range unusedShards {
301
- if err := shard .Client .Close (); err != nil {
302
- internal .Logger .Printf (context .Background (), "shard.Close %s failed: %s" , addr , err )
303
- }
304
- }
305
- }
316
+ return
306
317
}
307
318
308
319
func (c * ringSharding ) List () []* ringShard {
0 commit comments