Skip to content

Commit dd49c3d

Browse files
committed
Add unit test for duplicate partition after cursor cleanup
1 parent 262370e commit dd49c3d

File tree

1 file changed

+71
-2
lines changed

1 file changed

+71
-2
lines changed

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3793,14 +3793,14 @@ def test_given_global_state_when_read_then_state_is_not_per_partition() -> None:
37933793
def _make_inner_cursor(ts: str) -> MagicMock:
37943794
"""Return an inner cursor that yields exactly one slice and has a proper state."""
37953795
inner = MagicMock()
3796-
inner.stream_slices.return_value = iter([{"dummy": "slice"}])
3796+
inner.stream_slices.side_effect = lambda: iter([{"dummy": "slice"}])
37973797
inner.state = {"updated_at": ts}
37983798
inner.close_partition.return_value = None
37993799
inner.observe.return_value = None
38003800
return inner
38013801

38023802

3803-
def test_duplicate_partition_after_cleanup():
3803+
def test_duplicate_partition_after_closing_partition_cursor_deleted():
38043804
inner_cursors = [
38053805
_make_inner_cursor("2024-01-01T00:00:00Z"), # for first "1"
38063806
_make_inner_cursor("2024-01-02T00:00:00Z"), # for "2"
@@ -3861,6 +3861,74 @@ def test_duplicate_partition_after_cleanup():
38613861
assert len(cursor._semaphore_per_partition) == 0
38623862
assert len(cursor._processing_partitions_indexes) == 0
38633863
assert len(cursor._partition_key_to_index) == 0
3864+
assert len(cursor._partitions_done_generating_stream_slices) == 0
3865+
3866+
3867+
def test_duplicate_partition_after_closing_partition_cursor_exists():
3868+
inner_cursors = [
3869+
_make_inner_cursor("2024-01-01T00:00:00Z"), # for first "1"
3870+
_make_inner_cursor("2024-01-02T00:00:00Z"), # for "2"
3871+
_make_inner_cursor("2024-01-03T00:00:00Z"), # for second "1"
3872+
]
3873+
cursor_factory_mock = MagicMock()
3874+
cursor_factory_mock.create.side_effect = inner_cursors
3875+
3876+
converter = CustomFormatConcurrentStreamStateConverter(
3877+
datetime_format="%Y-%m-%dT%H:%M:%SZ",
3878+
input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"],
3879+
is_sequential_state=True,
3880+
cursor_granularity=timedelta(0),
3881+
)
3882+
3883+
cursor = ConcurrentPerPartitionCursor(
3884+
cursor_factory=cursor_factory_mock,
3885+
partition_router=MagicMock(),
3886+
stream_name="dup_stream",
3887+
stream_namespace=None,
3888+
stream_state={},
3889+
message_repository=MagicMock(),
3890+
connector_state_manager=MagicMock(),
3891+
connector_state_converter=converter,
3892+
cursor_field=CursorField(cursor_field_key="updated_at"),
3893+
)
3894+
3895+
# ── Partition sequence: 1 → 2 → 1 ──────────────────────────────────
3896+
partitions = [
3897+
StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}),
3898+
StreamSlice(partition={"id": "2"}, cursor_slice={}, extra_fields={}),
3899+
StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}),
3900+
]
3901+
pr = cursor._partition_router
3902+
pr.stream_slices.return_value = iter(partitions)
3903+
pr.get_stream_state.return_value = {}
3904+
3905+
# Iterate lazily so that the first "1" gets cleaned before
3906+
# the second "1" arrives.
3907+
slice_gen = cursor.stream_slices()
3908+
3909+
first_1 = next(slice_gen)
3910+
cursor.close_partition(
3911+
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), first_1)
3912+
)
3913+
3914+
two = next(slice_gen)
3915+
cursor.close_partition(DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), two))
3916+
3917+
# Second “1” should appear because the semaphore was cleaned up
3918+
second_1 = next(slice_gen)
3919+
cursor.close_partition(
3920+
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), second_1)
3921+
)
3922+
3923+
with pytest.raises(StopIteration):
3924+
next(slice_gen)
3925+
3926+
assert cursor._IS_PARTITION_DUPLICATION_LOGGED is False # no duplicate warning
3927+
assert len(cursor._cursor_per_partition) == 2 # only “1” & “2” kept
3928+
assert len(cursor._semaphore_per_partition) == 0 # all semaphores cleaned
3929+
assert len(cursor._processing_partitions_indexes) == 0
3930+
assert len(cursor._partition_key_to_index) == 0
3931+
assert len(cursor._partitions_done_generating_stream_slices) == 0
38643932

38653933

38663934
def test_duplicate_partition_while_processing():
@@ -3912,3 +3980,4 @@ def test_duplicate_partition_while_processing():
39123980
assert len(cursor._semaphore_per_partition) == 0
39133981
assert len(cursor._processing_partitions_indexes) == 0
39143982
assert len(cursor._partition_key_to_index) == 0
3983+
assert len(cursor._partitions_done_generating_stream_slices) == 0

0 commit comments

Comments
 (0)