Skip to content

Commit 0ccfb19

Browse files
committed
Fix unit tests
1 parent 45d3eaf commit 0ccfb19

File tree

2 files changed

+40
-19
lines changed

2 files changed

+40
-19
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,23 +99,29 @@ def __init__(
9999
self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()
100100

101101
# Parent-state tracking: store each partition’s parent state in creation order
102-
self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = OrderedDict()
102+
self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = (
103+
OrderedDict()
104+
)
105+
self._parent_state: Optional[StreamState] = None
103106

107+
# Tracks when the last slice for partition is emitted
104108
self._finished_partitions: set[str] = set()
109+
# Used to track the sequence numbers of open partitions
105110
self._open_seqs: deque[int] = deque()
106111
self._next_seq: int = 0
112+
# Dictionary to map partition keys to their sequence numbers
107113
self._seq_by_partition: dict[str, int] = {}
108114

109115
self._lock = threading.Lock()
110-
self._timer = Timer()
111-
self._new_global_cursor: Optional[StreamState] = None
112116
self._lookback_window: int = 0
113-
self._parent_state: Optional[StreamState] = None
117+
self._new_global_cursor: Optional[StreamState] = None
114118
self._number_of_partitions: int = 0
115119
self._use_global_cursor: bool = use_global_cursor
116120
self._partition_serializer = PerPartitionKeySerializer()
121+
117122
# Track the last time a state message was emitted
118123
self._last_emission_time: float = 0.0
124+
self._timer = Timer()
119125

120126
self._set_initial_state(stream_state)
121127

@@ -177,8 +183,9 @@ def _check_and_update_parent_state(self) -> None:
177183
last_closed_state = None
178184

179185
while self._partition_parent_state_map:
180-
earliest_key, (candidate_state, candidate_seq) = \
181-
next(iter(self._partition_parent_state_map.items()))
186+
earliest_key, (candidate_state, candidate_seq) = next(
187+
iter(self._partition_parent_state_map.items())
188+
)
182189

183190
# if any partition that started <= candidate_seq is still open, we must wait
184191
if self._open_seqs and self._open_seqs[0] <= candidate_seq:
@@ -459,10 +466,10 @@ def _cleanup_if_done(self, partition_key: str) -> None:
459466
cursor, semaphore, flag inside `_finished_partitions`
460467
"""
461468
if not (
462-
partition_key in self._finished_partitions
463-
and self._semaphore_per_partition[partition_key]._value == 0
469+
partition_key in self._finished_partitions
470+
and self._semaphore_per_partition[partition_key]._value == 0
464471
):
465-
return
472+
return
466473

467474
self._semaphore_per_partition.pop(partition_key, None)
468475
self._finished_partitions.discard(partition_key)

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3436,8 +3436,12 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update():
34363436
}
34373437
assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition
34383438
assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition
3439-
assert len(cursor._semaphore_per_partition) == 1 # Semaphore cleaned after partiton is completed
3440-
# ToDo: Add check for other interal values
3439+
3440+
assert len(cursor._semaphore_per_partition) == 1
3441+
assert len(cursor._finished_partitions) == 1
3442+
assert len(cursor._open_seqs) == 1
3443+
assert len(cursor._seq_by_partition) == 1
3444+
34413445

34423446
def test_given_unfinished_last_parent_partition_with_partial_parent_state_update():
34433447
# Create two mock cursors with different states for each partition
@@ -3520,8 +3524,11 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update
35203524
}
35213525
assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition
35223526
assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition
3523-
assert len(cursor._semaphore_per_partition) == 0
3524-
# ToDo: Add check for other interal values
3527+
3528+
assert len(cursor._semaphore_per_partition) == 1
3529+
assert len(cursor._finished_partitions) == 1
3530+
assert len(cursor._open_seqs) == 1
3531+
assert len(cursor._seq_by_partition) == 1
35253532

35263533

35273534
def test_given_all_partitions_finished_when_close_partition_then_final_state_emitted():
@@ -3596,7 +3603,12 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi
35963603
assert final_state["lookback_window"] == 1
35973604
assert cursor._message_repository.emit_message.call_count == 2
35983605
assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition
3599-
assert len(cursor._semaphore_per_partition) == 1
3606+
3607+
# Checks that all internal variables are cleaned up
3608+
assert len(cursor._semaphore_per_partition) == 0
3609+
assert len(cursor._finished_partitions) == 0
3610+
assert len(cursor._open_seqs) == 0
3611+
assert len(cursor._seq_by_partition) == 0
36003612

36013613

36023614
def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_global_cursor():
@@ -3715,18 +3727,20 @@ def test_semaphore_cleanup():
37153727
# Verify initial state
37163728
assert len(cursor._semaphore_per_partition) == 2
37173729
assert len(cursor._partition_parent_state_map) == 2
3718-
assert cursor._partition_parent_state_map['{"id":"1"}'] == {"parent": {"state": "state1"}}
3719-
assert cursor._partition_parent_state_map['{"id":"2"}'] == {"parent": {"state": "state2"}}
3730+
assert len(cursor._open_seqs) == 2
3731+
assert len(cursor._seq_by_partition) == 2
3732+
assert cursor._partition_parent_state_map['{"id":"1"}'][0] == {"parent": {"state": "state1"}}
3733+
assert cursor._partition_parent_state_map['{"id":"2"}'][0] == {"parent": {"state": "state2"}}
37203734

37213735
# Close partitions to acquire semaphores (value back to 0)
37223736
for s in generated_slices:
37233737
cursor.close_partition(DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), s))
37243738

37253739
# Check state after closing partitions
3726-
assert len(cursor._finished_partitions) == 2
3740+
assert len(cursor._finished_partitions) == 0
37273741
assert len(cursor._semaphore_per_partition) == 0
3728-
assert '{"id":"1"}' not in cursor._semaphore_per_partition
3729-
assert '{"id":"2"}' not in cursor._semaphore_per_partition
3742+
assert len(cursor._open_seqs) == 0
3743+
assert len(cursor._seq_by_partition) == 0
37303744
assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped
37313745
assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state
37323746

0 commit comments

Comments
 (0)