Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c26a3cb

Browse files
committedFeb 20, 2025
(POC) GODRIVER-3419 Use one goroutine per connection, guard maxConnecting with a chan semaphore.
1 parent a3ad820 commit c26a3cb

File tree

1 file changed

+283
-141
lines changed

1 file changed

+283
-141
lines changed
 

‎x/mongo/driver/topology/pool.go

Lines changed: 283 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,14 @@ type pool struct {
119119
// loop runs or waits. Its lock guards cancelBackgroundCtx, conns, and newConnWait. Any changes
120120
// to the state of the guarded values must be made while holding the lock to prevent undefined
121121
// behavior in the createConnections() waiting logic.
122-
createConnectionsCond *sync.Cond
123-
cancelBackgroundCtx context.CancelFunc // cancelBackgroundCtx is called to signal background goroutines to stop.
124-
conns map[int64]*connection // conns holds all currently open connections.
125-
newConnWait wantConnQueue // newConnWait holds all wantConn requests for new connections.
122+
// createConnectionsCond *sync.Cond
123+
createConnectionSem chan struct{}
124+
125+
connsMu sync.Mutex
126+
backgroundCtx context.Context
127+
cancelBackgroundCtx context.CancelFunc // cancelBackgroundCtx is called to signal background goroutines to stop.
128+
conns map[int64]*connection // conns holds all currently open connections.
129+
newConnWait wantConnQueue // newConnWait holds all wantConn requests for new connections.
126130

127131
idleMu sync.Mutex // idleMu guards idleConns, idleConnWait
128132
idleConns []*connection // idleConns holds all idle connections.
@@ -210,24 +214,25 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
210214
}
211215

212216
pool := &pool{
213-
address: config.Address,
214-
minSize: config.MinPoolSize,
215-
maxSize: config.MaxPoolSize,
216-
maxConnecting: maxConnecting,
217-
loadBalanced: config.LoadBalanced,
218-
monitor: config.PoolMonitor,
219-
logger: config.Logger,
220-
handshakeErrFn: config.handshakeErrFn,
221-
connOpts: connOpts,
222-
generation: newPoolGenerationMap(),
223-
state: poolPaused,
224-
maintainInterval: maintainInterval,
225-
maintainReady: make(chan struct{}, 1),
226-
backgroundDone: &sync.WaitGroup{},
227-
createConnectionsCond: sync.NewCond(&sync.Mutex{}),
228-
conns: make(map[int64]*connection, config.MaxPoolSize),
229-
idleConns: make([]*connection, 0, config.MaxPoolSize),
230-
connectTimeout: config.ConnectTimeout,
217+
address: config.Address,
218+
minSize: config.MinPoolSize,
219+
maxSize: config.MaxPoolSize,
220+
maxConnecting: maxConnecting,
221+
loadBalanced: config.LoadBalanced,
222+
monitor: config.PoolMonitor,
223+
logger: config.Logger,
224+
handshakeErrFn: config.handshakeErrFn,
225+
connOpts: connOpts,
226+
generation: newPoolGenerationMap(),
227+
state: poolPaused,
228+
maintainInterval: maintainInterval,
229+
maintainReady: make(chan struct{}, 1),
230+
backgroundDone: &sync.WaitGroup{},
231+
createConnectionSem: make(chan struct{}, maxConnecting),
232+
// createConnectionsCond: sync.NewCond(&sync.Mutex{}),
233+
conns: make(map[int64]*connection, config.MaxPoolSize),
234+
idleConns: make([]*connection, 0, config.MaxPoolSize),
235+
connectTimeout: config.ConnectTimeout,
231236
}
232237
// minSize must not exceed maxSize if maxSize is not 0
233238
if pool.maxSize != 0 && pool.minSize > pool.maxSize {
@@ -240,19 +245,18 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
240245
// Create a Context with cancellation that's used to signal the createConnections() and
241246
// maintain() background goroutines to stop. Also create a "backgroundDone" WaitGroup that is
242247
// used to wait for the background goroutines to return.
243-
var ctx context.Context
244-
ctx, pool.cancelBackgroundCtx = context.WithCancel(context.Background())
248+
pool.backgroundCtx, pool.cancelBackgroundCtx = context.WithCancel(context.Background())
245249

246-
for i := 0; i < int(pool.maxConnecting); i++ {
247-
pool.backgroundDone.Add(1)
248-
go pool.createConnections(ctx, pool.backgroundDone)
249-
}
250+
// for i := 0; i < int(pool.maxConnecting); i++ {
251+
// pool.backgroundDone.Add(1)
252+
// go pool.createConnections(ctx, pool.backgroundDone)
253+
// }
250254

251255
// If maintainInterval is not positive, don't start the maintain() goroutine. Expect that
252256
// negative values are only used in testing; this config value is not user-configurable.
253257
if maintainInterval > 0 {
254258
pool.backgroundDone.Add(1)
255-
go pool.maintain(ctx, pool.backgroundDone)
259+
go pool.maintain(pool.backgroundCtx, pool.backgroundDone)
256260
}
257261

258262
if mustLogPoolMessage(pool) {
@@ -343,10 +347,9 @@ func (p *pool) close(ctx context.Context) {
343347
// condition by cancelling the "background goroutine" Context, even tho cancelling the Context
344348
// is also synchronized by a lock. Otherwise, we run into an intermittent bug that prevents the
345349
// createConnections() goroutines from exiting.
346-
p.createConnectionsCond.L.Lock()
350+
p.connsMu.Lock()
347351
p.cancelBackgroundCtx()
348-
p.createConnectionsCond.Broadcast()
349-
p.createConnectionsCond.L.Unlock()
352+
p.connsMu.Unlock()
350353

351354
// Wait for all background goroutines to exit.
352355
p.backgroundDone.Wait()
@@ -402,7 +405,7 @@ func (p *pool) close(ctx context.Context) {
402405
// Collect all conns from the pool and try to deliver ErrPoolClosed to any waiting wantConns
403406
// from newConnWait while holding the createConnectionsCond lock. We can't call removeConnection
404407
// on the connections while holding any locks, so do that after we release the lock.
405-
p.createConnectionsCond.L.Lock()
408+
p.connsMu.Lock()
406409
conns := make([]*connection, 0, len(p.conns))
407410
for _, conn := range p.conns {
408411
conns = append(conns, conn)
@@ -414,7 +417,7 @@ func (p *pool) close(ctx context.Context) {
414417
}
415418
w.tryDeliver(nil, ErrPoolClosed)
416419
}
417-
p.createConnectionsCond.L.Unlock()
420+
p.connsMu.Unlock()
418421

419422
if mustLogPoolMessage(p) {
420423
logPoolMessage(p, logger.ConnectionPoolClosed)
@@ -724,19 +727,19 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro
724727
return ErrWrongPool
725728
}
726729

727-
p.createConnectionsCond.L.Lock()
730+
p.connsMu.Lock()
728731
_, ok := p.conns[conn.driverConnectionID]
729732
if !ok {
730733
// If the connection has been removed from the pool already, exit without doing any
731734
// additional state changes.
732-
p.createConnectionsCond.L.Unlock()
735+
p.connsMu.Unlock()
733736
return nil
734737
}
735738
delete(p.conns, conn.driverConnectionID)
736739
// Signal the createConnectionsCond so any goroutines waiting for a new connection slot in the
737740
// pool will proceed.
738-
p.createConnectionsCond.Signal()
739-
p.createConnectionsCond.L.Unlock()
741+
// p.createConnectionsCond.Signal() // TODO: What do we do with this?
742+
p.connsMu.Unlock()
740743

741744
// Only update the generation numbers map if the connection has retrieved its generation number.
742745
// Otherwise, we'd decrement the count for the generation even though it had never been
@@ -1029,7 +1032,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec
10291032

10301033
p.removePerishedConns()
10311034
if interruptAllConnections {
1032-
p.createConnectionsCond.L.Lock()
1035+
p.connsMu.Lock()
10331036
p.idleMu.Lock()
10341037

10351038
idleConns := make(map[*connection]bool, len(p.idleConns))
@@ -1045,7 +1048,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec
10451048
}
10461049

10471050
p.idleMu.Unlock()
1048-
p.createConnectionsCond.L.Unlock()
1051+
p.connsMu.Unlock()
10491052

10501053
p.interruptConnections(conns)
10511054
}
@@ -1067,15 +1070,15 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec
10671070
// Clear the new connections wait queue. This effectively pauses the createConnections()
10681071
// background goroutine because newConnWait is empty and checkOut() won't insert any more
10691072
// wantConns into newConnWait until the pool is marked "ready" again.
1070-
p.createConnectionsCond.L.Lock()
1073+
p.connsMu.Lock()
10711074
for {
10721075
w := p.newConnWait.popFront()
10731076
if w == nil {
10741077
break
10751078
}
10761079
w.tryDeliver(nil, pcErr)
10771080
}
1078-
p.createConnectionsCond.L.Unlock()
1081+
p.connsMu.Unlock()
10791082
}
10801083
}
10811084

@@ -1121,17 +1124,19 @@ func (p *pool) getOrQueueForIdleConn(w *wantConn) bool {
11211124
}
11221125

11231126
func (p *pool) queueForNewConn(w *wantConn) {
1124-
p.createConnectionsCond.L.Lock()
1125-
defer p.createConnectionsCond.L.Unlock()
1127+
p.connsMu.Lock()
1128+
defer p.connsMu.Unlock()
11261129

11271130
p.newConnWait.cleanFront()
11281131
p.newConnWait.pushBack(w)
1129-
p.createConnectionsCond.Signal()
1132+
1133+
p.backgroundDone.Add(1)
1134+
go p.createConnection(p.backgroundCtx, p.backgroundDone)
11301135
}
11311136

11321137
func (p *pool) totalConnectionCount() int {
1133-
p.createConnectionsCond.L.Lock()
1134-
defer p.createConnectionsCond.L.Unlock()
1138+
p.connsMu.Lock()
1139+
defer p.connsMu.Unlock()
11351140

11361141
return len(p.conns)
11371142
}
@@ -1144,35 +1149,37 @@ func (p *pool) availableConnectionCount() int {
11441149
}
11451150

11461151
// createConnections creates connections for wantConn requests on the newConnWait queue.
1147-
func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
1152+
func (p *pool) createConnection(ctx context.Context, wg *sync.WaitGroup) {
11481153
defer wg.Done()
11491154

1150-
// condition returns true if the createConnections() loop should continue and false if it should
1151-
// wait. Note that the condition also listens for Context cancellation, which also causes the
1152-
// loop to continue, allowing for a subsequent check to return from createConnections().
1153-
condition := func() bool {
1154-
checkOutWaiting := p.newConnWait.len() > 0
1155-
poolHasSpace := p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
1156-
cancelled := ctx.Err() != nil
1157-
return (checkOutWaiting && poolHasSpace) || cancelled
1158-
}
1159-
1160-
// wait waits for there to be an available wantConn and for the pool to have space for a new
1161-
// connection. When the condition becomes true, it creates a new connection and returns the
1162-
// waiting wantConn and new connection. If the Context is cancelled or there are any
1163-
// errors, wait returns with "ok = false".
1164-
wait := func() (*wantConn, *connection, bool) {
1165-
p.createConnectionsCond.L.Lock()
1166-
defer p.createConnectionsCond.L.Unlock()
1167-
1168-
for !condition() {
1169-
p.createConnectionsCond.Wait()
1170-
}
1155+
// TODO: Do we want to perma-block here? Seems like that could create a
1156+
// bunch of waiting goroutines that will never complete until the pool
1157+
// is disconnected.
1158+
select {
1159+
case p.createConnectionSem <- struct{}{}:
1160+
defer func() {
1161+
<-p.createConnectionSem
1162+
}()
1163+
case <-ctx.Done():
1164+
return
1165+
case <-time.After(10 * time.Millisecond):
1166+
return
1167+
}
11711168

1169+
nextWaiter := func() (*wantConn, *connection, bool) {
1170+
p.connsMu.Lock()
1171+
defer p.connsMu.Unlock()
1172+
1173+
// TODO: Do we need this check here?
11721174
if ctx.Err() != nil {
11731175
return nil, nil, false
11741176
}
11751177

1178+
// TODO: Is this condition correct?
1179+
if p.maxSize > 0 && uint64(len(p.conns)) >= p.maxSize {
1180+
return nil, nil, false
1181+
}
1182+
11761183
p.newConnWait.cleanFront()
11771184
w := p.newConnWait.popFront()
11781185
if w == nil {
@@ -1187,99 +1194,234 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
11871194
return w, conn, true
11881195
}
11891196

1190-
for ctx.Err() == nil {
1191-
w, conn, ok := wait()
1192-
if !ok {
1193-
continue
1194-
}
1195-
1196-
if mustLogPoolMessage(p) {
1197-
keysAndValues := logger.KeyValues{
1198-
logger.KeyDriverConnectionID, conn.driverConnectionID,
1199-
}
1200-
1201-
logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
1202-
}
1197+
w, conn, ok := nextWaiter()
1198+
if !ok {
1199+
return
1200+
}
12031201

1204-
if p.monitor != nil {
1205-
p.monitor.Event(&event.PoolEvent{
1206-
Type: event.ConnectionCreated,
1207-
Address: p.address.String(),
1208-
ConnectionID: conn.driverConnectionID,
1209-
})
1202+
if mustLogPoolMessage(p) {
1203+
keysAndValues := logger.KeyValues{
1204+
logger.KeyDriverConnectionID, conn.driverConnectionID,
12101205
}
12111206

1212-
start := time.Now()
1213-
// Pass the createConnections context to connect to allow pool close to
1214-
// cancel connection establishment so shutdown doesn't block indefinitely if
1215-
// connectTimeout=0.
1216-
//
1217-
// Per the specifications, an explicit value of connectTimeout=0 means the
1218-
// timeout is "infinite".
1207+
logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
1208+
}
12191209

1220-
var cancel context.CancelFunc
1210+
if p.monitor != nil {
1211+
p.monitor.Event(&event.PoolEvent{
1212+
Type: event.ConnectionCreated,
1213+
Address: p.address.String(),
1214+
ConnectionID: conn.driverConnectionID,
1215+
})
1216+
}
12211217

1222-
connctx := context.Background()
1223-
if p.connectTimeout != 0 {
1224-
connctx, cancel = context.WithTimeout(ctx, p.connectTimeout)
1225-
}
1218+
start := time.Now()
1219+
// Pass the createConnections context to connect to allow pool close to
1220+
// cancel connection establishment so shutdown doesn't block indefinitely if
1221+
// connectTimeout=0.
1222+
//
1223+
// Per the specifications, an explicit value of connectTimeout=0 means the
1224+
// timeout is "infinite".
12261225

1227-
err := conn.connect(connctx)
1226+
var cancel context.CancelFunc
12281227

1229-
if cancel != nil {
1230-
cancel()
1231-
}
1228+
connctx := context.Background()
1229+
if p.connectTimeout != 0 {
1230+
connctx, cancel = context.WithTimeout(ctx, p.connectTimeout)
1231+
}
12321232

1233-
if err != nil {
1234-
w.tryDeliver(nil, err)
1235-
1236-
// If there's an error connecting the new connection, call the handshake error handler
1237-
// that implements the SDAM handshake error handling logic. This must be called after
1238-
// delivering the connection error to the waiting wantConn. If it's called before, the
1239-
// handshake error handler may clear the connection pool, leading to a different error
1240-
// message being delivered to the same waiting wantConn in idleConnWait when the wait
1241-
// queues are cleared.
1242-
if p.handshakeErrFn != nil {
1243-
p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
1244-
}
1233+
err := conn.connect(connctx)
12451234

1246-
_ = p.removeConnection(conn, reason{
1247-
loggerConn: logger.ReasonConnClosedError,
1248-
event: event.ReasonError,
1249-
}, err)
1235+
if cancel != nil {
1236+
cancel()
1237+
}
12501238

1251-
_ = p.closeConnection(conn)
1239+
if err != nil {
1240+
w.tryDeliver(nil, err)
12521241

1253-
continue
1242+
// If there's an error connecting the new connection, call the handshake error handler
1243+
// that implements the SDAM handshake error handling logic. This must be called after
1244+
// delivering the connection error to the waiting wantConn. If it's called before, the
1245+
// handshake error handler may clear the connection pool, leading to a different error
1246+
// message being delivered to the same waiting wantConn in idleConnWait when the wait
1247+
// queues are cleared.
1248+
if p.handshakeErrFn != nil {
1249+
p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
12541250
}
12551251

1256-
duration := time.Since(start)
1257-
if mustLogPoolMessage(p) {
1258-
keysAndValues := logger.KeyValues{
1259-
logger.KeyDriverConnectionID, conn.driverConnectionID,
1260-
logger.KeyDurationMS, duration.Milliseconds(),
1261-
}
1252+
_ = p.removeConnection(conn, reason{
1253+
loggerConn: logger.ReasonConnClosedError,
1254+
event: event.ReasonError,
1255+
}, err)
12621256

1263-
logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
1264-
}
1257+
_ = p.closeConnection(conn)
12651258

1266-
if p.monitor != nil {
1267-
p.monitor.Event(&event.PoolEvent{
1268-
Type: event.ConnectionReady,
1269-
Address: p.address.String(),
1270-
ConnectionID: conn.driverConnectionID,
1271-
Duration: duration,
1272-
})
1273-
}
1259+
return
1260+
}
12741261

1275-
if w.tryDeliver(conn, nil) {
1276-
continue
1262+
duration := time.Since(start)
1263+
if mustLogPoolMessage(p) {
1264+
keysAndValues := logger.KeyValues{
1265+
logger.KeyDriverConnectionID, conn.driverConnectionID,
1266+
logger.KeyDurationMS, duration.Milliseconds(),
12771267
}
12781268

1279-
_ = p.checkInNoEvent(conn)
1269+
logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
12801270
}
1271+
1272+
if p.monitor != nil {
1273+
p.monitor.Event(&event.PoolEvent{
1274+
Type: event.ConnectionReady,
1275+
Address: p.address.String(),
1276+
ConnectionID: conn.driverConnectionID,
1277+
Duration: duration,
1278+
})
1279+
}
1280+
1281+
if w.tryDeliver(conn, nil) {
1282+
return
1283+
}
1284+
1285+
_ = p.checkInNoEvent(conn)
12811286
}
12821287

1288+
// createConnections creates connections for wantConn requests on the newConnWait queue.
1289+
// func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
1290+
// defer wg.Done()
1291+
1292+
// // condition returns true if the createConnections() loop should continue and false if it should
1293+
// // wait. Note that the condition also listens for Context cancellation, which also causes the
1294+
// // loop to continue, allowing for a subsequent check to return from createConnections().
1295+
// condition := func() bool {
1296+
// checkOutWaiting := p.newConnWait.len() > 0
1297+
// poolHasSpace := p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
1298+
// cancelled := ctx.Err() != nil
1299+
// return (checkOutWaiting && poolHasSpace) || cancelled
1300+
// }
1301+
1302+
// // wait waits for there to be an available wantConn and for the pool to have space for a new
1303+
// // connection. When the condition becomes true, it creates a new connection and returns the
1304+
// // waiting wantConn and new connection. If the Context is cancelled or there are any
1305+
// // errors, wait returns with "ok = false".
1306+
// wait := func() (*wantConn, *connection, bool) {
1307+
// p.createConnectionsCond.L.Lock()
1308+
// defer p.createConnectionsCond.L.Unlock()
1309+
1310+
// for !condition() {
1311+
// p.createConnectionsCond.Wait()
1312+
// }
1313+
1314+
// if ctx.Err() != nil {
1315+
// return nil, nil, false
1316+
// }
1317+
1318+
// p.newConnWait.cleanFront()
1319+
// w := p.newConnWait.popFront()
1320+
// if w == nil {
1321+
// return nil, nil, false
1322+
// }
1323+
1324+
// conn := newConnection(p.address, p.connOpts...)
1325+
// conn.pool = p
1326+
// conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1)
1327+
// p.conns[conn.driverConnectionID] = conn
1328+
1329+
// return w, conn, true
1330+
// }
1331+
1332+
// for ctx.Err() == nil {
1333+
// w, conn, ok := wait()
1334+
// if !ok {
1335+
// continue
1336+
// }
1337+
1338+
// if mustLogPoolMessage(p) {
1339+
// keysAndValues := logger.KeyValues{
1340+
// logger.KeyDriverConnectionID, conn.driverConnectionID,
1341+
// }
1342+
1343+
// logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
1344+
// }
1345+
1346+
// if p.monitor != nil {
1347+
// p.monitor.Event(&event.PoolEvent{
1348+
// Type: event.ConnectionCreated,
1349+
// Address: p.address.String(),
1350+
// ConnectionID: conn.driverConnectionID,
1351+
// })
1352+
// }
1353+
1354+
// start := time.Now()
1355+
// // Pass the createConnections context to connect to allow pool close to
1356+
// // cancel connection establishment so shutdown doesn't block indefinitely if
1357+
// // connectTimeout=0.
1358+
// //
1359+
// // Per the specifications, an explicit value of connectTimeout=0 means the
1360+
// // timeout is "infinite".
1361+
1362+
// var cancel context.CancelFunc
1363+
1364+
// connctx := context.Background()
1365+
// if p.connectTimeout != 0 {
1366+
// connctx, cancel = context.WithTimeout(ctx, p.connectTimeout)
1367+
// }
1368+
1369+
// err := conn.connect(connctx)
1370+
1371+
// if cancel != nil {
1372+
// cancel()
1373+
// }
1374+
1375+
// if err != nil {
1376+
// w.tryDeliver(nil, err)
1377+
1378+
// // If there's an error connecting the new connection, call the handshake error handler
1379+
// // that implements the SDAM handshake error handling logic. This must be called after
1380+
// // delivering the connection error to the waiting wantConn. If it's called before, the
1381+
// // handshake error handler may clear the connection pool, leading to a different error
1382+
// // message being delivered to the same waiting wantConn in idleConnWait when the wait
1383+
// // queues are cleared.
1384+
// if p.handshakeErrFn != nil {
1385+
// p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
1386+
// }
1387+
1388+
// _ = p.removeConnection(conn, reason{
1389+
// loggerConn: logger.ReasonConnClosedError,
1390+
// event: event.ReasonError,
1391+
// }, err)
1392+
1393+
// _ = p.closeConnection(conn)
1394+
1395+
// continue
1396+
// }
1397+
1398+
// duration := time.Since(start)
1399+
// if mustLogPoolMessage(p) {
1400+
// keysAndValues := logger.KeyValues{
1401+
// logger.KeyDriverConnectionID, conn.driverConnectionID,
1402+
// logger.KeyDurationMS, duration.Milliseconds(),
1403+
// }
1404+
1405+
// logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
1406+
// }
1407+
1408+
// if p.monitor != nil {
1409+
// p.monitor.Event(&event.PoolEvent{
1410+
// Type: event.ConnectionReady,
1411+
// Address: p.address.String(),
1412+
// ConnectionID: conn.driverConnectionID,
1413+
// Duration: duration,
1414+
// })
1415+
// }
1416+
1417+
// if w.tryDeliver(conn, nil) {
1418+
// continue
1419+
// }
1420+
1421+
// _ = p.checkInNoEvent(conn)
1422+
// }
1423+
// }
1424+
12831425
func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) {
12841426
defer wg.Done()
12851427

0 commit comments

Comments
 (0)
Please sign in to comment.