Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
707a6c6
Add API Budget
tolik0 Feb 5, 2025
b6bcdd7
Refactor to move api_budget to root level
tolik0 Feb 6, 2025
040ff9e
Format
tolik0 Feb 6, 2025
824d2c6
Merge branch 'main' into tolik0/add-api-budget
tolik0 Feb 7, 2025
15f830c
Update for backward compatibility
tolik0 Feb 7, 2025
1285668
Add unit tests
tolik0 Feb 9, 2025
7be9842
Add FixedWindowCallRatePolicy unit test
tolik0 Feb 9, 2025
8d3bfce
Change the partitions limit to 1000
tolik0 Feb 10, 2025
509ea05
Refactored switching logic
tolik0 Feb 10, 2025
8d44150
Increase the limit for number of partitions in memory
tolik0 Feb 10, 2025
b3f9897
Merge branch 'tolik0/add-api-budget-limit-1000' into tolik0/refactor-…
tolik0 Feb 11, 2025
342375c
Refactor ConcurrentPerPartitionCursor to not use ConcurrentCursor wit…
tolik0 Feb 12, 2025
05f4db7
Delete code from another branch
tolik0 Feb 12, 2025
c0bc645
Fix cursor value from record
tolik0 Feb 12, 2025
52b95e3
Add throttling for state emitting in ConcurrentPerPartitionCursor
tolik0 Feb 13, 2025
1166a7a
Fix unit tests
tolik0 Feb 17, 2025
4a7d9ec
Move switching to global logic
tolik0 Feb 17, 2025
19ad269
Revert test limits
tolik0 Feb 17, 2025
667700f
Merge branch 'main' into tolik0/refactor-concurrent-global-cursor
tolik0 Feb 17, 2025
6498528
Fix format
tolik0 Feb 17, 2025
d3e7fe2
Add parent state updates
tolik0 Feb 17, 2025
7b4964e
Move acquiring the semaphore
tolik0 Feb 17, 2025
8617cc8
Merge branch 'tolik0/refactor-concurrent-global-cursor' into tolik0/c…
tolik0 Feb 17, 2025
a8db6b6
Merge branch 'main' into tolik0/concurrent-perpartition-add-parent-st…
tolik0 Feb 18, 2025
203c131
Refactor to store only unique states
tolik0 Feb 18, 2025
671fab4
Add intermediate states validation to unit tests
tolik0 Feb 18, 2025
a1d98fb
Fix format
tolik0 Feb 18, 2025
eff25ec
Add unit tests
tolik0 Feb 19, 2025
c51f840
Update unit tests
tolik0 Feb 21, 2025
4a18954
Add deleting finished semaphores
tolik0 Feb 21, 2025
a7ece97
Delete testing prints
tolik0 Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def __init__(
# the oldest partitions can be efficiently removed, maintaining the most recent partitions.
self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()

# Parent-state tracking: store each partition’s parent state in creation order
self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict()

self._finished_partitions: set[str] = set()
self._lock = threading.Lock()
self._timer = Timer()
Expand Down Expand Up @@ -155,7 +159,45 @@ def close_partition(self, partition: Partition) -> None:
and self._semaphore_per_partition[partition_key]._value == 0
):
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
self._emit_state_message()

self._check_and_update_parent_state()

self._emit_state_message()

def _check_and_update_parent_state(self) -> None:
"""
Pop the leftmost partition state from _partition_parent_state_map only if
*all partitions* up to (and including) that partition key in _semaphore_per_partition
are fully finished (i.e. in _finished_partitions and semaphore._value == 0).
"""
last_closed_state = None

while self._partition_parent_state_map:
# Look at the earliest partition key in creation order
earliest_key = next(iter(self._partition_parent_state_map))

# Verify ALL partitions from the left up to earliest_key are finished
all_left_finished = True
for p_key, sem in self._semaphore_per_partition.items():
# If any earlier partition is still not finished, we must stop
if p_key not in self._finished_partitions or sem._value != 0:
all_left_finished = False
break
# Once we've reached earliest_key in the semaphore order, we can stop checking
if p_key == earliest_key:
break

# If the partitions up to earliest_key are not all finished, break the while-loop
if not all_left_finished:
break

# Otherwise, pop the leftmost entry from parent-state map
_, closed_parent_state = self._partition_parent_state_map.popitem(last=False)
last_closed_state = closed_parent_state

# Update _parent_state if we actually popped at least one partition
if last_closed_state is not None:
self._parent_state = last_closed_state

def ensure_at_least_one_state_emitted(self) -> None:
"""
Expand Down Expand Up @@ -201,13 +243,19 @@ def stream_slices(self) -> Iterable[StreamSlice]:

slices = self._partition_router.stream_slices()
self._timer.start()
for partition in slices:
yield from self._generate_slices_from_partition(partition)
for partition, last, parent_state in iterate_with_last_flag_and_state(
slices, self._partition_router.get_stream_state
):
yield from self._generate_slices_from_partition(partition, parent_state)

def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
def _generate_slices_from_partition(
self, partition: StreamSlice, parent_state: Mapping[str, Any]
) -> Iterable[StreamSlice]:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()

partition_key = self._to_partition_key(partition.partition)

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
cursor = self._create_cursor(
Expand All @@ -216,18 +264,26 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
)
with self._lock:
self._number_of_partitions += 1
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
self._semaphore_per_partition[self._to_partition_key(partition.partition)] = (
threading.Semaphore(0)
)
self._cursor_per_partition[partition_key] = cursor
self._semaphore_per_partition[partition_key] = threading.Semaphore(0)

with self._lock:
if (
len(self._partition_parent_state_map) == 0
or self._partition_parent_state_map[
next(reversed(self._partition_parent_state_map))
]
!= parent_state
):
self._partition_parent_state_map[partition_key] = deepcopy(parent_state)

for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
cursor.stream_slices(),
lambda: None,
):
self._semaphore_per_partition[self._to_partition_key(partition.partition)].release()
self._semaphore_per_partition[partition_key].release()
if is_last_slice:
self._finished_partitions.add(self._to_partition_key(partition.partition))
self._finished_partitions.add(partition_key)
yield StreamSlice(
partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
)
Expand Down Expand Up @@ -338,9 +394,6 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
self._create_cursor(state["cursor"])
)
self._semaphore_per_partition[self._to_partition_key(state["partition"])] = (
threading.Semaphore(0)
)

# set default state for missing partitions if it is per partition with fallback to global
if self._GLOBAL_STATE_KEY in stream_state:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,8 @@ def test_incremental_parent_state_no_records(
"cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1},
}
],
"state": {},
"use_global_cursor": False,
"parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}},
}
},
Expand Down
Loading