Skip to content

Commit c2f4a08

Browse files
committed
rewrite as suggested by @AlexanderYastrebov
Signed-off-by: Sandor Szücs <[email protected]>
1 parent 4f1e623 commit c2f4a08

File tree

1 file changed

+20
-22
lines changed

1 file changed

+20
-22
lines changed

ring.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ type ringShards struct {
214214
opt *RingOptions
215215

216216
mu sync.RWMutex
217+
muClose sync.Mutex
217218
hash ConsistentHash
218219
shards map[string]*ringShard // read only, updated by SetAddrs
219220
list []*ringShard // read only, updated by SetAddrs
@@ -230,36 +231,30 @@ func newRingShards(opt *RingOptions) *ringShards {
230231
return c
231232
}
232233

233-
// SetAddrs must not be called concurrently.
234+
// SetAddrs replaces the shards in use, such that you can increase and
235+
// decrease number of shards, that you use. It will reuse shards that
236+
// existed before and close the ones that will not be used anymore.
234237
func (c *ringShards) SetAddrs(addrs map[string]string) {
235-
shards := make(map[string]*ringShard)
238+
c.muClose.Lock()
239+
defer c.muClose.Unlock()
236240
if c.closed {
237241
return
238242
}
239243

244+
shards := make(map[string]*ringShard)
245+
unusedShards := make(map[string]*ringShard)
246+
240247
for k, shard := range c.shards {
241248
if addr, ok := addrs[k]; ok && shard.addr == addr {
242249
shards[k] = shard
243250
} else {
244-
// close shards that are not reused
245-
defer func() {
246-
c.mu.Lock()
247-
if !c.closed {
248-
err := shard.Client.Close()
249-
if err != nil {
250-
internal.Logger.Printf(context.Background(), "Failed to close ring shard client %s %s: %v", k, shard.addr, err)
251-
}
252-
}
253-
c.mu.Unlock()
254-
}()
251+
unusedShards[k] = shard
255252
}
256253
}
257254

258-
newShardNames := make([]string, 0)
259255
for k, addr := range addrs {
260256
if shard, ok := c.shards[k]; !ok || shard.addr != addr {
261257
shards[k] = newRingShard(c.opt, k, addr)
262-
newShardNames = append(newShardNames, k)
263258
}
264259
}
265260

@@ -269,15 +264,16 @@ func (c *ringShards) SetAddrs(addrs map[string]string) {
269264
}
270265

271266
c.mu.Lock()
272-
if c.closed {
273-
for _, k := range newShardNames {
274-
shards[k].Client.Close()
267+
c.shards = shards
268+
c.list = list
269+
c.mu.Unlock()
270+
271+
for k, shard := range unusedShards {
272+
err := shard.Client.Close()
273+
if err != nil {
274+
internal.Logger.Printf(context.Background(), "Failed to close ring shard client %s %s: %v", k, shard.addr, err)
275275
}
276-
} else {
277-
c.shards = shards
278-
c.list = list
279276
}
280-
c.mu.Unlock()
281277

282278
c.rebalance()
283279
}
@@ -404,6 +400,8 @@ func (c *ringShards) Len() int {
404400
}
405401

406402
func (c *ringShards) Close() error {
403+
c.muClose.Lock()
404+
defer c.muClose.Unlock()
407405
c.mu.Lock()
408406
defer c.mu.Unlock()
409407

0 commit comments

Comments
 (0)