From bfe0067f5560173ab16b76fdcac7eab71ea37382 Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Tue, 24 Jun 2025 13:09:56 -0400 Subject: [PATCH 1/9] add TransientTransactionError label to poolClearedError --- x/mongo/driver/topology/pool.go | 9 +++++---- x/mongo/driver/topology/pool_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 162bb9c1af..0192909c0d 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -50,8 +50,9 @@ func (pe PoolError) Error() string { return string(pe) } // poolClearedError is an error returned when the connection pool is cleared or currently paused. It // is a retryable error. type poolClearedError struct { - err error - address address.Address + err error + address address.Address + errorLabels []string } func (pce poolClearedError) Error() string { @@ -503,7 +504,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { } return nil, ErrPoolClosed case poolPaused: - err := poolClearedError{err: p.lastClearErr, address: p.address} + err := poolClearedError{err: p.lastClearErr, address: p.address, errorLabels: []string{"TransientTransactionError"}} p.stateMu.RUnlock() duration := time.Since(start) @@ -1049,7 +1050,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec } if serviceID == nil { - pcErr := poolClearedError{err: err, address: p.address} + pcErr := poolClearedError{err: err, address: p.address, errorLabels: []string{"TransientTransactionError"}} // Clear the idle connections wait queue. p.idleMu.Lock() diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index f58e1cf204..76a915d436 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -1584,3 +1584,29 @@ func TestPool_PoolMonitor(t *testing.T) { "expected ConnectionCheckOutFailed Duration to be set") }) } + +func TestPool_Error(t *testing.T) { + + t.Parallel() + + t.Run("should have TransientTransactionError", func(t *testing.T) { + t.Parallel() + + p := newPool(poolConfig{}) + assert.Equalf(t, poolPaused, p.getState(), "expected new pool to be paused") + + // Since new pool is paused, checkout should throw poolClearedError. + _, err := p.checkOut(context.Background()) + + var pce poolClearedError + if errors.As(err, &pce) { + expectedLabel := "TransientTransactionError" + assert.Contains(t, pce.errorLabels, expectedLabel, `expected error to include the "TransientTransactionError" label`) + } else { + t.Errorf("expected poolClearedError, got %v", err) + } + + p.close(context.Background()) + + }) +} From 06308d28242520d31ec089d683dde98a1d62c36e Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Tue, 24 Jun 2025 13:14:19 -0400 Subject: [PATCH 2/9] cleanup --- x/mongo/driver/topology/pool_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index 76a915d436..230405c1de 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -1586,7 +1586,6 @@ func TestPool_PoolMonitor(t *testing.T) { } func TestPool_Error(t *testing.T) { - t.Parallel() t.Run("should have TransientTransactionError", func(t *testing.T) { @@ -1607,6 +1606,5 @@ func TestPool_Error(t *testing.T) { } p.close(context.Background()) - }) } From f4fc9f4ef4cbdfe3bae0b1a675db6e834e030a5c Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Tue, 24 Jun 2025 13:19:29 -0400 Subject: [PATCH 3/9] cleanup --- x/mongo/driver/topology/pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index 230405c1de..2a7db8b3e7 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -1594,7 +1594,7 @@ func TestPool_Error(t *testing.T) { p := newPool(poolConfig{}) assert.Equalf(t, poolPaused, p.getState(), "expected new pool to be paused") - // Since new pool is paused, checkout should throw poolClearedError. + // Since new pool is paused, checkout should throw PoolClearedError. _, err := p.checkOut(context.Background()) var pce poolClearedError From 2bf70b3861c09b4b3eef888a7d0bd03ac1876256 Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Tue, 24 Jun 2025 14:53:01 -0400 Subject: [PATCH 4/9] update to driver.TransientTransactionError --- x/mongo/driver/topology/pool.go | 4 ++-- x/mongo/driver/topology/pool_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 0192909c0d..1172483596 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -504,7 +504,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { } return nil, ErrPoolClosed case poolPaused: - err := poolClearedError{err: p.lastClearErr, address: p.address, errorLabels: []string{"TransientTransactionError"}} + err := poolClearedError{err: p.lastClearErr, address: p.address, errorLabels: []string{driver.TransientTransactionError}} p.stateMu.RUnlock() duration := time.Since(start) @@ -1050,7 +1050,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec } if serviceID == nil { - pcErr := poolClearedError{err: err, address: p.address, errorLabels: []string{"TransientTransactionError"}} + pcErr := poolClearedError{err: err, address: p.address, errorLabels: []string{driver.TransientTransactionError}} // Clear the idle connections wait queue. p.idleMu.Lock() diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index 2a7db8b3e7..d3c1a2d58a 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -21,6 +21,7 @@ import ( "go.mongodb.org/mongo-driver/v2/internal/eventtest" "go.mongodb.org/mongo-driver/v2/internal/require" "go.mongodb.org/mongo-driver/v2/mongo/address" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" ) @@ -1599,8 +1600,7 @@ func TestPool_Error(t *testing.T) { var pce poolClearedError if errors.As(err, &pce) { - expectedLabel := "TransientTransactionError" - assert.Contains(t, pce.errorLabels, expectedLabel, `expected error to include the "TransientTransactionError" label`) + assert.Contains(t, pce.errorLabels, driver.TransientTransactionError, `expected error to include the "TransientTransactionError" label`) } else { t.Errorf("expected poolClearedError, got %v", err) } From 38d2589b5e3204785e17bd48ea3d50eceb51d73a Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Tue, 24 Jun 2025 17:36:19 -0400 Subject: [PATCH 5/9] wrap error instead of adding errorLabels --- x/mongo/driver/topology/pool.go | 17 ++++++++--------- x/mongo/driver/topology/pool_test.go | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 1172483596..d37fb06525 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -50,16 +50,15 @@ func (pe PoolError) Error() string { return string(pe) } // poolClearedError is an error returned when the connection pool is cleared or currently paused. It // is a retryable error. type poolClearedError struct { - err error - address address.Address - errorLabels []string + err error + address address.Address } func (pce poolClearedError) Error() string { - return fmt.Sprintf( - "connection pool for %v was cleared because another operation failed with: %v", - pce.address, - pce.err) + wrappedErr := fmt.Errorf( + "%v: connection pool for %v was cleared because another operation failed with: %v %w", + driver.TransientTransactionError, pce.address, pce.err, pce) + return wrappedErr.Error() } // Retryable returns true. All poolClearedErrors are retryable. @@ -504,7 +503,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { } return nil, ErrPoolClosed case poolPaused: - err := poolClearedError{err: p.lastClearErr, address: p.address, errorLabels: []string{driver.TransientTransactionError}} + err := poolClearedError{err: p.lastClearErr, address: p.address} p.stateMu.RUnlock() duration := time.Since(start) @@ -1050,7 +1049,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec } if serviceID == nil { - pcErr := poolClearedError{err: err, address: p.address, errorLabels: []string{driver.TransientTransactionError}} + pcErr := poolClearedError{err: err, address: p.address} // Clear the idle connections wait queue. p.idleMu.Lock() diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index d3c1a2d58a..cd26e80b39 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -1600,7 +1600,7 @@ func TestPool_Error(t *testing.T) { var pce poolClearedError if errors.As(err, &pce) { - assert.Contains(t, pce.errorLabels, driver.TransientTransactionError, `expected error to include the "TransientTransactionError" label`) + assert.Contains(t, pce, driver.TransientTransactionError, `expected error to include the "TransientTransactionError" label`) } else { t.Errorf("expected poolClearedError, got %v", err) } From d15e4c6b936347868202728f17add0944a75b01d Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Wed, 25 Jun 2025 11:36:49 -0400 Subject: [PATCH 6/9] wrap error in driver.error --- x/mongo/driver/topology/pool.go | 15 ++++++++++----- x/mongo/driver/topology/pool_test.go | 10 +++++----- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index d37fb06525..d701230114 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -55,10 +55,10 @@ type poolClearedError struct { } func (pce poolClearedError) Error() string { - wrappedErr := fmt.Errorf( - "%v: connection pool for %v was cleared because another operation failed with: %v %w", - driver.TransientTransactionError, pce.address, pce.err, pce) - return wrappedErr.Error() + return fmt.Sprintf( + "connection pool for %v was cleared because another operation failed with: %v", + pce.address, + pce.err) } // Retryable returns true. All poolClearedErrors are retryable. @@ -503,7 +503,12 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { } return nil, ErrPoolClosed case poolPaused: - err := poolClearedError{err: p.lastClearErr, address: p.address} + pcErr := poolClearedError{err: p.lastClearErr, address: p.address} + err := driver.Error{ + Message: pcErr.Error(), + Labels: []string{driver.TransientTransactionError}, + Wrapped: pcErr, + } p.stateMu.RUnlock() duration := time.Since(start) diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index cd26e80b39..17e803ea49 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -1597,12 +1597,12 @@ func TestPool_Error(t *testing.T) { // Since new pool is paused, checkout should throw PoolClearedError. _, err := p.checkOut(context.Background()) - - var pce poolClearedError - if errors.As(err, &pce) { - assert.Contains(t, pce, driver.TransientTransactionError, `expected error to include the "TransientTransactionError" label`) + var le driver.Error + if errors.As(err, &le) { + assert.ErrorIs(t, poolClearedError{}, le.Unwrap(), "expect error to be PoolClearedError") + assert.True(t, le.HasErrorLabel(driver.TransientTransactionError), `expected error to include the "TransientTransactionError" label`) } else { - t.Errorf("expected poolClearedError, got %v", err) + t.Errorf("expected labeled error, got %v", err) } p.close(context.Background()) From 46de325504aba4b7b2bca34e69ffa428750298f0 Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Wed, 25 Jun 2025 13:05:51 -0400 Subject: [PATCH 7/9] wrap all instances of pce --- x/mongo/driver/topology/pool.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index d701230114..e174cfc61a 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -1055,7 +1055,11 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec if serviceID == nil { pcErr := poolClearedError{err: err, address: p.address} - + labeledErr := driver.Error{ + Message: pcErr.Error(), + Labels: []string{driver.TransientTransactionError}, + Wrapped: pcErr, + } // Clear the idle connections wait queue. p.idleMu.Lock() for { @@ -1063,7 +1067,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec if w == nil { break } - w.tryDeliver(nil, pcErr) + w.tryDeliver(nil, labeledErr) } p.idleMu.Unlock() @@ -1076,7 +1080,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec if w == nil { break } - w.tryDeliver(nil, pcErr) + w.tryDeliver(nil, labeledErr) } p.createConnectionsCond.L.Unlock() } From 7f6ed4d4cfb92c4ea9ff4dfbfffd0fa6841be751 Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Wed, 25 Jun 2025 13:26:02 -0400 Subject: [PATCH 8/9] pull back idle connections pcerr --- x/mongo/driver/topology/pool.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index e174cfc61a..d701230114 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -1055,11 +1055,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec if serviceID == nil { pcErr := poolClearedError{err: err, address: p.address} - labeledErr := driver.Error{ - Message: pcErr.Error(), - Labels: []string{driver.TransientTransactionError}, - Wrapped: pcErr, - } + // Clear the idle connections wait queue. p.idleMu.Lock() for { @@ -1067,7 +1063,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec if w == nil { break } - w.tryDeliver(nil, labeledErr) + w.tryDeliver(nil, pcErr) } p.idleMu.Unlock() @@ -1080,7 +1076,7 @@ func (p *pool) clearImpl(err error, serviceID *bson.ObjectID, interruptAllConnec if w == nil { break } - w.tryDeliver(nil, labeledErr) + w.tryDeliver(nil, pcErr) } p.createConnectionsCond.L.Unlock() } From 9696918c6ef4f520abee851c0236701b05df925a Mon Sep 17 00:00:00 2001 From: Selena Zhou Date: Mon, 30 Jun 2025 11:08:39 -0400 Subject: [PATCH 9/9] add recommended comment --- x/mongo/driver/topology/pool.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index d701230114..1dd236dfdc 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -503,6 +503,13 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { } return nil, ErrPoolClosed case poolPaused: + // Wrap poolCleared in a driver.Error so we can add the + // "TransientTransactionError" label. This will add + // "TransientTransactionError" to all poolClearedError instances, not + // just those that happened during transactions. While that behavior is + // different than other places we add "TransientTransactionError", it is + // consistent with the Transactions specification and simplifies the + // code. pcErr := poolClearedError{err: p.lastClearErr, address: p.address} err := driver.Error{ Message: pcErr.Error(),