@@ -295,20 +295,16 @@ def _generate_slices_from_partition(
295
295
):
296
296
self ._partition_parent_state_map [partition_key ] = (deepcopy (parent_state ), seq )
297
297
298
- try :
299
- for cursor_slice , is_last_slice , _ in iterate_with_last_flag_and_state (
300
- cursor .stream_slices (),
301
- lambda : None ,
302
- ):
303
- self ._semaphore_per_partition [partition_key ].release ()
304
- if is_last_slice :
305
- self ._partitions_done_generating_stream_slices .add (partition_key )
306
- yield StreamSlice (
307
- partition = partition , cursor_slice = cursor_slice , extra_fields = partition .extra_fields
308
- )
309
- finally :
310
- del cursor
311
- del partition
298
+ for cursor_slice , is_last_slice , _ in iterate_with_last_flag_and_state (
299
+ cursor .stream_slices (),
300
+ lambda : None ,
301
+ ):
302
+ self ._semaphore_per_partition [partition_key ].release ()
303
+ if is_last_slice :
304
+ self ._partitions_done_generating_stream_slices .add (partition_key )
305
+ yield StreamSlice (
306
+ partition = partition , cursor_slice = cursor_slice , extra_fields = partition .extra_fields
307
+ )
312
308
313
309
def _ensure_partition_limit (self ) -> None :
314
310
"""
@@ -496,10 +492,11 @@ def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
496
492
def _create_cursor (
497
493
self , cursor_state : Any , runtime_lookback_window : int = 0
498
494
) -> ConcurrentCursor :
499
- return self ._cursor_factory .create (
495
+ cursor = self ._cursor_factory .create (
500
496
stream_state = deepcopy (cursor_state ),
501
497
runtime_lookback_window = timedelta (seconds = runtime_lookback_window ),
502
498
)
499
+ return cursor
503
500
504
501
def should_be_synced (self , record : Record ) -> bool :
505
502
return self ._get_cursor (record ).should_be_synced (record )
@@ -514,7 +511,8 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
514
511
raise ValueError (
515
512
"Invalid state as stream slices that are emitted should refer to an existing cursor"
516
513
)
517
- return self ._cursor_per_partition [partition_key ]
514
+ cursor = self ._cursor_per_partition [partition_key ]
515
+ return cursor
518
516
519
517
def limit_reached (self ) -> bool :
520
518
return self ._number_of_partitions > self .SWITCH_TO_GLOBAL_LIMIT
0 commit comments