Skip to content

Commit c827d82

Browse files
committed
Fix issues with error handling, refactor tests
1 parent daa6873 commit c827d82

File tree

3 files changed

+784
-679
lines changed

3 files changed

+784
-679
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, create_function: Callable[..., ConcurrentCursor]):
3232
self._create_function = create_function
3333

3434
def create(
35-
self, stream_state: Mapping[str, Any], runtime_lookback_window: Any
35+
self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta]
3636
) -> ConcurrentCursor:
3737
return self._create_function(
3838
stream_state=stream_state, runtime_lookback_window=runtime_lookback_window
@@ -187,7 +187,7 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
187187
if not cursor:
188188
cursor = self._create_cursor(
189189
self._global_cursor,
190-
self._lookback_window if self._global_cursor else self._NO_CURSOR_STATE,
190+
self._lookback_window if self._global_cursor else 0,
191191
)
192192
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
193193
self._semaphore_per_partition[self._to_partition_key(partition.partition)] = (
@@ -218,9 +218,6 @@ def _ensure_partition_limit(self) -> None:
218218
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
219219
)
220220

221-
def limit_reached(self) -> bool:
222-
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
223-
224221
def _set_initial_state(self, stream_state: StreamState) -> None:
225222
"""
226223
Initialize the cursor's state using the provided `stream_state`.
@@ -286,6 +283,10 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
286283
self._global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
287284
self._new_global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
288285

286+
# Set initial parent state
287+
if stream_state.get("parent_state"):
288+
self._parent_state = stream_state["parent_state"]
289+
289290
# Set parent state for partition routers based on parent streams
290291
self._partition_router.set_initial_state(stream_state)
291292

@@ -305,12 +306,11 @@ def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
305306
return self._partition_serializer.to_partition(partition_key)
306307

307308
def _create_cursor(
308-
self, cursor_state: Any, runtime_lookback_window: Any = None
309+
self, cursor_state: Any, runtime_lookback_window: int = 0
309310
) -> ConcurrentCursor:
310-
if runtime_lookback_window:
311-
runtime_lookback_window = timedelta(seconds=runtime_lookback_window)
312311
cursor = self._cursor_factory.create(
313-
stream_state=deepcopy(cursor_state), runtime_lookback_window=runtime_lookback_window
312+
stream_state=deepcopy(cursor_state),
313+
runtime_lookback_window=timedelta(seconds=runtime_lookback_window),
314314
)
315315
return cursor
316316

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
875875
config: Config,
876876
stream_state: MutableMapping[str, Any],
877877
message_repository: Optional[MessageRepository] = None,
878-
runtime_lookback_window: Optional[int] = None,
878+
runtime_lookback_window: Optional[datetime.timedelta] = None,
879879
**kwargs: Any,
880880
) -> ConcurrentCursor:
881881
component_type = component_definition.get("type")
@@ -933,11 +933,6 @@ def create_concurrent_cursor_from_datetime_based_cursor(
933933
if evaluated_lookback_window:
934934
lookback_window = parse_duration(evaluated_lookback_window)
935935

936-
if runtime_lookback_window and lookback_window:
937-
lookback_window = max(lookback_window, runtime_lookback_window)
938-
elif runtime_lookback_window:
939-
lookback_window = runtime_lookback_window
940-
941936
connector_state_converter: DateTimeStreamStateConverter
942937
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
943938
datetime_format=datetime_format,
@@ -946,6 +941,18 @@ def create_concurrent_cursor_from_datetime_based_cursor(
946941
cursor_granularity=cursor_granularity,
947942
)
948943

944+
# Adjusts the stream state by applying the runtime lookback window.
945+
# This is used to ensure correct state handling in case of failed partitions.
946+
stream_state_value = stream_state.get(cursor_field.cursor_field_key)
947+
if runtime_lookback_window and stream_state_value:
948+
new_stream_state = (
949+
connector_state_converter.parse_timestamp(stream_state_value)
950+
- runtime_lookback_window
951+
)
952+
stream_state[cursor_field.cursor_field_key] = connector_state_converter.output_format(
953+
new_stream_state
954+
)
955+
949956
start_date_runtime_value: Union[InterpolatedString, str, MinMaxDatetime]
950957
if isinstance(datetime_based_cursor_model.start_datetime, MinMaxDatetimeModel):
951958
start_date_runtime_value = self.create_min_max_datetime(

0 commit comments

Comments
 (0)