@@ -167,7 +167,7 @@ def test_try_next_runs_one_getmore(self):
167
167
client = rs_or_single_client (event_listeners = [listener ])
168
168
# Connect to the cluster.
169
169
client .admin .command ("ping" )
170
- listener .results . clear ()
170
+ listener .reset ()
171
171
# ChangeStreams only read majority committed data so use w:majority.
172
172
coll = self .watched_collection ().with_options (write_concern = WriteConcern ("majority" ))
173
173
coll .drop ()
@@ -177,33 +177,33 @@ def test_try_next_runs_one_getmore(self):
177
177
self .addCleanup (coll .drop )
178
178
with self .change_stream_with_client (client , max_await_time_ms = 250 ) as stream :
179
179
self .assertEqual (listener .started_command_names (), ["aggregate" ])
180
- listener .results . clear ()
180
+ listener .reset ()
181
181
182
182
# Confirm that only a single getMore is run even when no documents
183
183
# are returned.
184
184
self .assertIsNone (stream .try_next ())
185
185
self .assertEqual (listener .started_command_names (), ["getMore" ])
186
- listener .results . clear ()
186
+ listener .reset ()
187
187
self .assertIsNone (stream .try_next ())
188
188
self .assertEqual (listener .started_command_names (), ["getMore" ])
189
- listener .results . clear ()
189
+ listener .reset ()
190
190
191
191
# Get at least one change before resuming.
192
192
coll .insert_one ({"_id" : 2 })
193
193
wait_until (lambda : stream .try_next () is not None , "get change from try_next" )
194
- listener .results . clear ()
194
+ listener .reset ()
195
195
196
196
# Cause the next request to initiate the resume process.
197
197
self .kill_change_stream_cursor (stream )
198
- listener .results . clear ()
198
+ listener .reset ()
199
199
200
200
# The sequence should be:
201
201
# - getMore, fail
202
202
# - resume with aggregate command
203
203
# - no results, return immediately without another getMore
204
204
self .assertIsNone (stream .try_next ())
205
205
self .assertEqual (listener .started_command_names (), ["getMore" , "aggregate" ])
206
- listener .results . clear ()
206
+ listener .reset ()
207
207
208
208
# Stream still works after a resume.
209
209
coll .insert_one ({"_id" : 3 })
@@ -217,7 +217,7 @@ def test_batch_size_is_honored(self):
217
217
client = rs_or_single_client (event_listeners = [listener ])
218
218
# Connect to the cluster.
219
219
client .admin .command ("ping" )
220
- listener .results . clear ()
220
+ listener .reset ()
221
221
# ChangeStreams only read majority committed data so use w:majority.
222
222
coll = self .watched_collection ().with_options (write_concern = WriteConcern ("majority" ))
223
223
coll .drop ()
@@ -229,12 +229,12 @@ def test_batch_size_is_honored(self):
229
229
expected = {"batchSize" : 23 }
230
230
with self .change_stream_with_client (client , max_await_time_ms = 250 , batch_size = 23 ) as stream :
231
231
# Confirm that batchSize is honored for initial batch.
232
- cmd = listener .results [ "started" ] [0 ].command
232
+ cmd = listener .started_events [0 ].command
233
233
self .assertEqual (cmd ["cursor" ], expected )
234
- listener .results . clear ()
234
+ listener .reset ()
235
235
# Confirm that batchSize is honored by getMores.
236
236
self .assertIsNone (stream .try_next ())
237
- cmd = listener .results [ "started" ] [0 ].command
237
+ cmd = listener .started_events [0 ].command
238
238
key = next (iter (expected ))
239
239
self .assertEqual (expected [key ], cmd [key ])
240
240
@@ -255,12 +255,11 @@ def test_start_at_operation_time(self):
255
255
@no_type_check
256
256
def _test_full_pipeline (self , expected_cs_stage ):
257
257
client , listener = self .client_with_listener ("aggregate" )
258
- results = listener .results
259
258
with self .change_stream_with_client (client , [{"$project" : {"foo" : 0 }}]) as _ :
260
259
pass
261
260
262
- self .assertEqual (1 , len (results [ "started" ] ))
263
- command = results [ "started" ] [0 ]
261
+ self .assertEqual (1 , len (listener . started_events ))
262
+ command = listener . started_events [0 ]
264
263
self .assertEqual ("aggregate" , command .command_name )
265
264
self .assertEqual (
266
265
[{"$changeStream" : expected_cs_stage }, {"$project" : {"foo" : 0 }}],
@@ -464,7 +463,7 @@ def _get_expected_resume_token_legacy(self, stream, listener, previous_change=No
464
463
versions that don't support postBatchResumeToken. Assumes the stream
465
464
has never returned any changes if previous_change is None."""
466
465
if previous_change is None :
467
- agg_cmd = listener .results [ "started" ] [0 ]
466
+ agg_cmd = listener .started_events [0 ]
468
467
stage = agg_cmd .command ["pipeline" ][0 ]["$changeStream" ]
469
468
return stage .get ("resumeAfter" ) or stage .get ("startAfter" )
470
469
@@ -481,7 +480,7 @@ def _get_expected_resume_token(self, stream, listener, previous_change=None):
481
480
if token is not None :
482
481
return token
483
482
484
- response = listener .results [ "succeeded" ] [- 1 ].reply
483
+ response = listener .succeeded_events [- 1 ].reply
485
484
return response ["cursor" ]["postBatchResumeToken" ]
486
485
487
486
@no_type_check
@@ -558,8 +557,8 @@ def test_no_resume_attempt_if_aggregate_command_fails(self):
558
557
pass
559
558
560
559
# Driver should have attempted aggregate command only once.
561
- self .assertEqual (len (listener .results [ "started" ] ), 1 )
562
- self .assertEqual (listener .results [ "started" ] [0 ].command_name , "aggregate" )
560
+ self .assertEqual (len (listener .started_events ), 1 )
561
+ self .assertEqual (listener .started_events [0 ].command_name , "aggregate" )
563
562
564
563
# Prose test no. 5 - REMOVED
565
564
# Prose test no. 6 - SKIPPED
@@ -603,20 +602,20 @@ def test_start_at_operation_time_caching(self):
603
602
with self .change_stream_with_client (client ) as cs :
604
603
self .kill_change_stream_cursor (cs )
605
604
cs .try_next ()
606
- cmd = listener .results [ "started" ] [- 1 ].command
605
+ cmd = listener .started_events [- 1 ].command
607
606
self .assertIsNotNone (cmd ["pipeline" ][0 ]["$changeStream" ].get ("startAtOperationTime" ))
608
607
609
608
# Case 2: change stream started with startAtOperationTime
610
- listener .results . clear ()
609
+ listener .reset ()
611
610
optime = self .get_start_at_operation_time ()
612
611
with self .change_stream_with_client (client , start_at_operation_time = optime ) as cs :
613
612
self .kill_change_stream_cursor (cs )
614
613
cs .try_next ()
615
- cmd = listener .results [ "started" ] [- 1 ].command
614
+ cmd = listener .started_events [- 1 ].command
616
615
self .assertEqual (
617
616
cmd ["pipeline" ][0 ]["$changeStream" ].get ("startAtOperationTime" ),
618
617
optime ,
619
- str ([k .command for k in listener .results [ "started" ] ]),
618
+ str ([k .command for k in listener .started_events ]),
620
619
)
621
620
622
621
# Prose test no. 10 - SKIPPED
@@ -631,7 +630,7 @@ def test_resumetoken_empty_batch(self):
631
630
self .assertIsNone (change_stream .try_next ())
632
631
resume_token = change_stream .resume_token
633
632
634
- response = listener .results [ "succeeded" ] [0 ].reply
633
+ response = listener .succeeded_events [0 ].reply
635
634
self .assertEqual (resume_token , response ["cursor" ]["postBatchResumeToken" ])
636
635
637
636
# Prose test no. 11
@@ -643,7 +642,7 @@ def test_resumetoken_exhausted_batch(self):
643
642
self ._populate_and_exhaust_change_stream (change_stream )
644
643
resume_token = change_stream .resume_token
645
644
646
- response = listener .results [ "succeeded" ] [- 1 ].reply
645
+ response = listener .succeeded_events [- 1 ].reply
647
646
self .assertEqual (resume_token , response ["cursor" ]["postBatchResumeToken" ])
648
647
649
648
# Prose test no. 12
@@ -737,7 +736,7 @@ def test_startafter_resume_uses_startafter_after_empty_getMore(self):
737
736
self .kill_change_stream_cursor (change_stream )
738
737
change_stream .try_next () # Resume attempt
739
738
740
- response = listener .results [ "started" ] [- 1 ]
739
+ response = listener .started_events [- 1 ]
741
740
self .assertIsNone (response .command ["pipeline" ][0 ]["$changeStream" ].get ("resumeAfter" ))
742
741
self .assertIsNotNone (response .command ["pipeline" ][0 ]["$changeStream" ].get ("startAfter" ))
743
742
@@ -756,7 +755,7 @@ def test_startafter_resume_uses_resumeafter_after_nonempty_getMore(self):
756
755
self .kill_change_stream_cursor (change_stream )
757
756
change_stream .try_next () # Resume attempt
758
757
759
- response = listener .results [ "started" ] [- 1 ]
758
+ response = listener .started_events [- 1 ]
760
759
self .assertIsNotNone (response .command ["pipeline" ][0 ]["$changeStream" ].get ("resumeAfter" ))
761
760
self .assertIsNone (response .command ["pipeline" ][0 ]["$changeStream" ].get ("startAfter" ))
762
761
@@ -1056,7 +1055,7 @@ def tearDownClass(cls):
1056
1055
1057
1056
def setUp (self ):
1058
1057
super (TestAllLegacyScenarios , self ).setUp ()
1059
- self .listener .results . clear ()
1058
+ self .listener .reset ()
1060
1059
1061
1060
def setUpCluster (self , scenario_dict ):
1062
1061
assets = [
@@ -1128,7 +1127,7 @@ def check_event(self, event, expectation_dict):
1128
1127
self .assertEqual (getattr (event , key ), value )
1129
1128
1130
1129
def tearDown (self ):
1131
- self .listener .results . clear ()
1130
+ self .listener .reset ()
1132
1131
1133
1132
1134
1133
_TEST_PATH = os .path .join (os .path .dirname (os .path .realpath (__file__ )), "change_streams" )
0 commit comments