Skip to content

Commit 84650e4

Browse files
authored
GODRIVER-3181 Read server responses in the background after op timeout. (#1719)
1 parent ccf35c9 commit 84650e4

File tree

16 files changed

+800
-137
lines changed

16 files changed

+800
-137
lines changed

internal/integration/csot_test.go

Lines changed: 522 additions & 0 deletions
Large diffs are not rendered by default.

internal/integration/mtest/mongotest.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,15 @@ func (t *T) cleanup() {
208208
// Run creates a new T instance for a sub-test and runs the given callback. It also creates a new collection using the
209209
// given name which is available to the callback through the T.Coll variable and is dropped after the callback
210210
// returns.
211-
func (t *T) Run(name string, callback func(*T)) {
211+
func (t *T) Run(name string, callback func(mt *T)) {
212212
t.RunOpts(name, NewOptions(), callback)
213213
}
214214

215215
// RunOpts creates a new T instance for a sub-test with the given options. If the current environment does not satisfy
216216
// constraints specified in the options, the new sub-test will be skipped automatically. If the test is not skipped,
217217
// the callback will be run with the new T instance. RunOpts creates a new collection with the given name which is
218218
// available to the callback through the T.Coll variable and is dropped after the callback returns.
219-
func (t *T) RunOpts(name string, opts *Options, callback func(*T)) {
219+
func (t *T) RunOpts(name string, opts *Options, callback func(mt *T)) {
220220
t.T.Run(name, func(wrapped *testing.T) {
221221
sub := newT(wrapped, t.baseOpts, opts)
222222

internal/integration/unified/unified_spec_runner.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,31 @@ var (
6161
"unpin when a new transaction is started": "Implement GODRIVER-3034",
6262
"unpin when a non-transaction write operation uses a session": "Implement GODRIVER-3034",
6363
"unpin when a non-transaction read operation uses a session": "Implement GODRIVER-3034",
64+
65+
// DRIVERS-2722: Setting "maxTimeMS" on a command that creates a cursor
66+
// also limits the lifetime of the cursor. That may be surprising to
67+
// users, so omit "maxTimeMS" from operations that return user-managed
68+
// cursors.
69+
"timeoutMS can be overridden for a find": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
70+
"timeoutMS can be configured for an operation - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
71+
"timeoutMS can be configured for an operation - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
72+
"timeoutMS can be configured for an operation - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
73+
"timeoutMS can be configured on a MongoClient - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
74+
"timeoutMS can be configured on a MongoClient - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
75+
"timeoutMS can be configured on a MongoClient - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
76+
"operation is retried multiple times for non-zero timeoutMS - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
77+
"operation is retried multiple times for non-zero timeoutMS - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
78+
"operation is retried multiple times for non-zero timeoutMS - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
79+
"timeoutMS applied to find command": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
80+
81+
// DRIVERS-2953: This test requires that the driver sends a "getMore"
82+
// with "maxTimeMS" set. However, "getMore" can only include "maxTimeMS"
83+
// for tailable awaitData cursors. Including "maxTimeMS" on "getMore"
84+
// for any other cursor type results in a server error:
85+
//
86+
// (BadValue) cannot set maxTimeMS on getMore command for a non-awaitData cursor
87+
//
88+
"Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset": "maxTimeMS can't be set on a getMore. See DRIVERS-2953",
6489
}
6590

6691
logMessageValidatorTimeout = 10 * time.Millisecond

mongo/collection.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,12 @@ func aggregate(a aggregateParams, opts ...options.Lister[options.AggregateOption
949949
ServerAPI(a.client.serverAPI).
950950
HasOutputStage(hasOutputStage).
951951
Timeout(a.client.timeout).
952-
Authenticator(a.client.authenticator)
952+
Authenticator(a.client.authenticator).
953+
// Omit "maxTimeMS" from operations that return a user-managed cursor to
954+
// prevent confusing "cursor not found" errors.
955+
//
956+
// See DRIVERS-2722 for more detail.
957+
OmitMaxTimeMS(true)
953958

954959
if args.AllowDiskUse != nil {
955960
op.AllowDiskUse(*args.AllowDiskUse)
@@ -1293,11 +1298,20 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
12931298
if err != nil {
12941299
return nil, err
12951300
}
1296-
return coll.find(ctx, filter, args)
1301+
1302+
// Omit "maxTimeMS" from operations that return a user-managed cursor to
1303+
// prevent confusing "cursor not found" errors.
1304+
//
1305+
// See DRIVERS-2722 for more detail.
1306+
return coll.find(ctx, filter, true, args)
12971307
}
12981308

1299-
func (coll *Collection) find(ctx context.Context, filter interface{},
1300-
args *options.FindOptions) (cur *Cursor, err error) {
1309+
func (coll *Collection) find(
1310+
ctx context.Context,
1311+
filter interface{},
1312+
omitMaxTimeMS bool,
1313+
args *options.FindOptions,
1314+
) (cur *Cursor, err error) {
13011315

13021316
if ctx == nil {
13031317
ctx = context.Background()
@@ -1335,7 +1349,8 @@ func (coll *Collection) find(ctx context.Context, filter interface{},
13351349
CommandMonitor(coll.client.monitor).ServerSelector(selector).
13361350
ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name).
13371351
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI).
1338-
Timeout(coll.client.timeout).Logger(coll.client.logger).Authenticator(coll.client.authenticator)
1352+
Timeout(coll.client.timeout).Logger(coll.client.logger).Authenticator(coll.client.authenticator).
1353+
OmitMaxTimeMS(omitMaxTimeMS)
13391354

13401355
cursorOpts := coll.client.createBaseCursorOptions()
13411356

@@ -1500,7 +1515,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{},
15001515
if err != nil {
15011516
return nil
15021517
}
1503-
cursor, err := coll.find(ctx, filter, newFindArgsFromFindOneArgs(args))
1518+
cursor, err := coll.find(ctx, filter, false, newFindArgsFromFindOneArgs(args))
15041519
return &SingleResult{
15051520
ctx: ctx,
15061521
cur: cursor,

x/mongo/driver/batch_cursor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,12 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
441441
Crypt: bc.crypt,
442442
ServerAPI: bc.serverAPI,
443443

444+
// Omit the automatically-calculated maxTimeMS because setting maxTimeMS
445+
// on a non-awaitData cursor causes a server error. For awaitData
446+
// cursors, maxTimeMS is set when maxAwaitTime is specified by the above
447+
// CommandFn.
448+
OmitMaxTimeMS: true,
449+
444450
// No read preference is passed to the getMore command,
445451
// resulting in the default read preference: "primaryPreferred".
446452
// Since this could be confusing, and there is no requirement

x/mongo/driver/errors.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,14 +509,28 @@ func ExtractErrorFromServerResponse(doc bsoncore.Document) error {
509509
errmsg = "command failed"
510510
}
511511

512-
return Error{
512+
err := Error{
513513
Code: code,
514514
Message: errmsg,
515515
Name: codeName,
516516
Labels: labels,
517517
TopologyVersion: tv,
518518
Raw: doc,
519519
}
520+
521+
// If we get a MaxTimeMSExpired error, assume that the error was caused
522+
// by setting "maxTimeMS" on the command based on the context deadline
523+
// or on "timeoutMS". In that case, make the error wrap
524+
// context.DeadlineExceeded so that users can always check
525+
//
526+
// errors.Is(err, context.DeadlineExceeded)
527+
//
528+
// for either client-side or server-side timeouts.
529+
if err.Code == 50 {
530+
err.Wrapped = context.DeadlineExceeded
531+
}
532+
533+
return err
520534
}
521535

522536
if len(wcError.WriteErrors) > 0 || wcError.WriteConcernError != nil {

x/mongo/driver/integration/aggregate_test.go

Lines changed: 0 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -10,113 +10,21 @@ import (
1010
"bytes"
1111
"context"
1212
"testing"
13-
"time"
1413

1514
"go.mongodb.org/mongo-driver/v2/bson"
16-
"go.mongodb.org/mongo-driver/v2/event"
1715
"go.mongodb.org/mongo-driver/v2/internal/integtest"
18-
"go.mongodb.org/mongo-driver/v2/internal/require"
1916
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
2017
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
2118
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
2219
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
2320
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation"
24-
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/topology"
2521
)
2622

27-
func setUpMonitor() (*event.CommandMonitor, chan *event.CommandStartedEvent, chan *event.CommandSucceededEvent, chan *event.CommandFailedEvent) {
28-
started := make(chan *event.CommandStartedEvent, 1)
29-
succeeded := make(chan *event.CommandSucceededEvent, 1)
30-
failed := make(chan *event.CommandFailedEvent, 1)
31-
32-
return &event.CommandMonitor{
33-
Started: func(_ context.Context, e *event.CommandStartedEvent) {
34-
started <- e
35-
},
36-
Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) {
37-
succeeded <- e
38-
},
39-
Failed: func(_ context.Context, e *event.CommandFailedEvent) {
40-
failed <- e
41-
},
42-
}, started, succeeded, failed
43-
}
44-
45-
func skipIfBelow32(ctx context.Context, t *testing.T, topo *topology.Topology) {
46-
server, err := topo.SelectServer(ctx, &serverselector.Write{})
47-
noerr(t, err)
48-
49-
versionCmd := bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "serverStatus", 1))
50-
serverStatus, err := runCommand(server, dbName, versionCmd)
51-
noerr(t, err)
52-
version, err := serverStatus.LookupErr("version")
53-
noerr(t, err)
54-
55-
if integtest.CompareVersions(t, version.StringValue(), "3.2") < 0 {
56-
t.Skip()
57-
}
58-
}
59-
6023
func TestAggregate(t *testing.T) {
6124
if testing.Short() {
6225
t.Skip("skipping integration test in short mode")
6326
}
6427

65-
t.Run("TestMaxTimeMSInGetMore", func(t *testing.T) {
66-
ctx := context.Background()
67-
monitor, started, succeeded, failed := setUpMonitor()
68-
dbName := "TestAggMaxTimeDB"
69-
collName := "TestAggMaxTimeColl"
70-
top := integtest.MonitoredTopology(t, dbName, monitor)
71-
clearChannels(started, succeeded, failed)
72-
skipIfBelow32(ctx, t, top)
73-
74-
clearChannels(started, succeeded, failed)
75-
err := operation.NewInsert(
76-
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "x", 1)),
77-
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "x", 1)),
78-
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "x", 1)),
79-
).Collection(collName).Database(dbName).
80-
Deployment(top).ServerSelector(&serverselector.Write{}).Execute(context.Background())
81-
noerr(t, err)
82-
83-
clearChannels(started, succeeded, failed)
84-
op := operation.NewAggregate(bsoncore.BuildDocumentFromElements(nil)).
85-
Collection(collName).Database(dbName).Deployment(top).ServerSelector(&serverselector.Write{}).
86-
CommandMonitor(monitor).BatchSize(2)
87-
88-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
89-
defer cancel()
90-
91-
err = op.Execute(ctx)
92-
noerr(t, err)
93-
batchCursor, err := op.Result(driver.CursorOptions{BatchSize: 2, CommandMonitor: monitor})
94-
noerr(t, err)
95-
96-
var e *event.CommandStartedEvent
97-
select {
98-
case e = <-started:
99-
case <-time.After(2000 * time.Millisecond):
100-
t.Fatal("timed out waiting for aggregate")
101-
}
102-
103-
require.Equal(t, "aggregate", e.CommandName)
104-
105-
clearChannels(started, succeeded, failed)
106-
// first Next() should automatically return true
107-
require.True(t, batchCursor.Next(ctx), "expected true from first Next, got false")
108-
clearChannels(started, succeeded, failed)
109-
batchCursor.Next(ctx) // should do getMore
110-
111-
select {
112-
case e = <-started:
113-
case <-time.After(200 * time.Millisecond):
114-
t.Fatal("timed out waiting for getMore")
115-
}
116-
require.Equal(t, "getMore", e.CommandName)
117-
_, err = e.Command.LookupErr("maxTimeMS")
118-
noerr(t, err)
119-
})
12028
t.Run("Multiple Batches", func(t *testing.T) {
12129
ds := []bsoncore.Document{
12230
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "_id", 1)),
@@ -185,15 +93,3 @@ func TestAggregate(t *testing.T) {
18593
})
18694

18795
}
188-
189-
func clearChannels(s chan *event.CommandStartedEvent, succ chan *event.CommandSucceededEvent, f chan *event.CommandFailedEvent) {
190-
for len(s) > 0 {
191-
<-s
192-
}
193-
for len(succ) > 0 {
194-
<-succ
195-
}
196-
for len(f) > 0 {
197-
<-f
198-
}
199-
}

x/mongo/driver/integration/main_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,13 @@ func addCompressorToURI(uri string) string {
119119
return uri + "compressors=" + comp
120120
}
121121

122-
// runCommand runs an arbitrary command on a given database of target server
123-
func runCommand(s driver.Server, db string, cmd bsoncore.Document) (bsoncore.Document, error) {
122+
// runCommand runs an arbitrary command on a given database of the target
123+
// server.
124+
func runCommand(s driver.Server, db string, cmd bsoncore.Document) error {
124125
op := operation.NewCommand(cmd).
125-
Database(db).Deployment(driver.SingleServerDeployment{Server: s})
126-
err := op.Execute(context.Background())
127-
res := op.Result()
128-
return res, err
126+
Database(db).
127+
Deployment(driver.SingleServerDeployment{Server: s})
128+
return op.Execute(context.Background())
129129
}
130130

131131
// dropCollection drops the collection in the test cluster.

x/mongo/driver/integration/scram_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ func runScramAuthTest(t *testing.T, credential options.Credential) error {
147147
noerr(t, err)
148148

149149
cmd := bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "dbstats", 1))
150-
_, err = runCommand(server, integtest.DBName(t), cmd)
151-
return err
150+
return runCommand(server, integtest.DBName(t), cmd)
152151
}
153152

154153
func createScramUsers(t *testing.T, s driver.Server, cases []scramTestCase) error {
@@ -169,7 +168,7 @@ func createScramUsers(t *testing.T, s driver.Server, cases []scramTestCase) erro
169168
)),
170169
bsoncore.AppendArrayElement(nil, "mechanisms", bsoncore.BuildArray(nil, values...)),
171170
)
172-
_, err := runCommand(s, db, newUserCmd)
171+
err := runCommand(s, db, newUserCmd)
173172
if err != nil {
174173
return fmt.Errorf("Couldn't create user '%s' on db '%s': %w", c.username, integtest.DBName(t), err)
175174
}

0 commit comments

Comments
 (0)