Skip to content

Commit afafeda

Browse files
authored
after the connection pool is closed, no new connections should be added (#1863)
* after the connection pool is closed, no new connections should be added Signed-off-by: monkey92t <[email protected]> * remove runGoroutine Signed-off-by: monkey92t <[email protected]> * pool.popIdle add p.closed check Signed-off-by: monkey92t <[email protected]> * upgrade golangci-lint v1.42.0 Signed-off-by: monkey92t <[email protected]>
1 parent bc9d5c8 commit afafeda

File tree

4 files changed

+71
-9
lines changed

4 files changed

+71
-9
lines changed

internal/pool/export_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pool
22

3-
import "time"
3+
import (
4+
"time"
5+
)
46

57
func (cn *Conn) SetCreatedAt(tm time.Time) {
68
cn.createdAt = tm

internal/pool/pool.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,10 @@ func (p *ConnPool) checkMinIdleConns() {
121121
for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
122122
p.poolSize++
123123
p.idleConnsLen++
124+
124125
go func() {
125126
err := p.addIdleConn()
126-
if err != nil {
127+
if err != nil && err != ErrClosed {
127128
p.connsMu.Lock()
128129
p.poolSize--
129130
p.idleConnsLen--
@@ -140,9 +141,16 @@ func (p *ConnPool) addIdleConn() error {
140141
}
141142

142143
p.connsMu.Lock()
144+
defer p.connsMu.Unlock()
145+
146+
// It is not allowed to add new connections to the closed connection pool.
147+
if p.closed() {
148+
_ = cn.Close()
149+
return ErrClosed
150+
}
151+
143152
p.conns = append(p.conns, cn)
144153
p.idleConns = append(p.idleConns, cn)
145-
p.connsMu.Unlock()
146154
return nil
147155
}
148156

@@ -157,6 +165,14 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
157165
}
158166

159167
p.connsMu.Lock()
168+
defer p.connsMu.Unlock()
169+
170+
// It is not allowed to add new connections to the closed connection pool.
171+
if p.closed() {
172+
_ = cn.Close()
173+
return nil, ErrClosed
174+
}
175+
160176
p.conns = append(p.conns, cn)
161177
if pooled {
162178
// If pool is full remove the cn on next Put.
@@ -166,7 +182,6 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
166182
p.poolSize++
167183
}
168184
}
169-
p.connsMu.Unlock()
170185

171186
return cn, nil
172187
}
@@ -237,9 +252,13 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
237252

238253
for {
239254
p.connsMu.Lock()
240-
cn := p.popIdle()
255+
cn, err := p.popIdle()
241256
p.connsMu.Unlock()
242257

258+
if err != nil {
259+
return nil, err
260+
}
261+
243262
if cn == nil {
244263
break
245264
}
@@ -308,10 +327,13 @@ func (p *ConnPool) freeTurn() {
308327
<-p.queue
309328
}
310329

311-
func (p *ConnPool) popIdle() *Conn {
330+
func (p *ConnPool) popIdle() (*Conn, error) {
331+
if p.closed() {
332+
return nil, ErrClosed
333+
}
312334
n := len(p.idleConns)
313335
if n == 0 {
314-
return nil
336+
return nil, nil
315337
}
316338

317339
var cn *Conn
@@ -326,7 +348,7 @@ func (p *ConnPool) popIdle() *Conn {
326348
}
327349
p.idleConnsLen--
328350
p.checkMinIdleConns()
329-
return cn
351+
return cn, nil
330352
}
331353

332354
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {

internal/pool/pool_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool_test
22

33
import (
44
"context"
5+
"net"
56
"sync"
67
"testing"
78
"time"
@@ -30,6 +31,43 @@ var _ = Describe("ConnPool", func() {
3031
connPool.Close()
3132
})
3233

34+
It("should safe close", func() {
35+
const minIdleConns = 10
36+
37+
var (
38+
wg sync.WaitGroup
39+
closedChan = make(chan struct{})
40+
)
41+
wg.Add(minIdleConns)
42+
connPool = pool.NewConnPool(&pool.Options{
43+
Dialer: func(ctx context.Context) (net.Conn, error) {
44+
wg.Done()
45+
<-closedChan
46+
return &net.TCPConn{}, nil
47+
},
48+
PoolSize: 10,
49+
PoolTimeout: time.Hour,
50+
IdleTimeout: time.Millisecond,
51+
IdleCheckFrequency: time.Millisecond,
52+
MinIdleConns: minIdleConns,
53+
})
54+
wg.Wait()
55+
Expect(connPool.Close()).NotTo(HaveOccurred())
56+
close(closedChan)
57+
58+
// We wait for 1 second and believe that checkMinIdleConns has been executed.
59+
time.Sleep(time.Second)
60+
61+
Expect(connPool.Stats()).To(Equal(&pool.Stats{
62+
Hits: 0,
63+
Misses: 0,
64+
Timeouts: 0,
65+
TotalConns: 0,
66+
IdleConns: 0,
67+
StaleConns: 0,
68+
}))
69+
})
70+
3371
It("should unblock client when conn is removed", func() {
3472
// Reserve one connection.
3573
cn, err := connPool.Get(ctx)

internal/proto/reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919

2020
//------------------------------------------------------------------------------
2121

22-
const Nil = RedisError("redis: nil")
22+
const Nil = RedisError("redis: nil") // nolint:errname
2323

2424
type RedisError string
2525

0 commit comments

Comments
 (0)