Skip to content

Commit cb2df0f

Browse files
committed
update tests and migrate stop condition too
1 parent d7337a6 commit cb2df0f

File tree

6 files changed

+87
-68
lines changed

6 files changed

+87
-68
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,13 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
516516
raise ValueError(
517517
"Invalid state as stream slices that are emitted should refer to an existing cursor"
518518
)
519+
520+
if self._use_global_cursor:
521+
return self._create_cursor(
522+
self._global_cursor,
523+
self._lookback_window if self._global_cursor else 0,
524+
)
525+
519526
partition_key = self._to_partition_key(record.associated_slice.partition)
520527
if (
521528
partition_key not in self._cursor_per_partition

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@
603603
WeekClampingStrategy,
604604
Weekday,
605605
)
606-
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
606+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, Cursor
607607
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
608608
CustomFormatConcurrentStreamStateConverter,
609609
DateTimeStreamStateConverter,
@@ -1932,17 +1932,17 @@ def create_declarative_stream(
19321932
and hasattr(model.incremental_sync, "is_data_feed")
19331933
and model.incremental_sync.is_data_feed
19341934
)
1935-
client_side_incremental_sync = None
1936-
if (
1935+
client_side_filetering_enabled = (
19371936
model.incremental_sync
19381937
and hasattr(model.incremental_sync, "is_client_side_incremental")
19391938
and model.incremental_sync.is_client_side_incremental
1940-
):
1939+
)
1940+
concurrent_cursor = None
1941+
if stop_condition_on_cursor or client_side_filetering_enabled:
19411942
stream_slicer = self._build_stream_slicer_from_partition_router(
19421943
model.retriever, config, stream_name=model.name
19431944
)
1944-
cursor = self._build_concurrent_cursor(model, stream_slicer, config)
1945-
client_side_incremental_sync = {"cursor": cursor}
1945+
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)
19461946

19471947
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
19481948
cursor_model = model.incremental_sync
@@ -2017,8 +2017,10 @@ def create_declarative_stream(
20172017
primary_key=primary_key,
20182018
stream_slicer=combined_slicers,
20192019
request_options_provider=request_options_provider,
2020-
stop_condition_on_cursor=stop_condition_on_cursor,
2021-
client_side_incremental_sync=client_side_incremental_sync,
2020+
stop_condition_cursor=concurrent_cursor,
2021+
client_side_incremental_sync={"cursor": concurrent_cursor}
2022+
if client_side_filetering_enabled
2023+
else None,
20222024
transformations=transformations,
20232025
file_uploader=file_uploader,
20242026
incremental_sync=model.incremental_sync,
@@ -3194,7 +3196,7 @@ def create_simple_retriever(
31943196
primary_key: Optional[Union[str, List[str], List[List[str]]]],
31953197
stream_slicer: Optional[StreamSlicer],
31963198
request_options_provider: Optional[RequestOptionsProvider] = None,
3197-
stop_condition_on_cursor: bool = False,
3199+
stop_condition_cursor: Optional[Cursor] = None,
31983200
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
31993201
transformations: List[RecordTransformation],
32003202
file_uploader: Optional[DefaultFileUploader] = None,
@@ -3325,7 +3327,7 @@ def _get_url() -> str:
33253327
),
33263328
)
33273329

3328-
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
3330+
cursor_used_for_stop_condition = cursor if stop_condition_cursor else None
33293331
paginator = (
33303332
self._create_component_from_model(
33313333
model=model.paginator,

airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77

88
import requests
99

10-
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
1110
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
1211
PaginationStrategy,
1312
)
14-
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
13+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1514
from airbyte_cdk.sources.types import Record
1615

1716

@@ -29,8 +28,7 @@ def is_met(self, record: Record) -> bool:
2928
class CursorStopCondition(PaginationStopCondition):
3029
def __init__(
3130
self,
32-
cursor: DeclarativeCursor
33-
| ConcurrentCursor, # migrate to use both old and concurrent versions
31+
cursor: Cursor,
3432
):
3533
self._cursor = cursor
3634

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ def should_be_synced(self, record: Record) -> bool:
492492
except ValueError:
493493
self._log_for_record_without_cursor_value()
494494
return True
495-
return self.start <= record_cursor_value
495+
return self.start <= record_cursor_value <= self._end_provider()
496496

497497
def _log_for_record_without_cursor_value(self) -> None:
498498
if not self._should_be_synced_logger_triggered:

unit_tests/sources/declarative/extractors/test_record_filter.py

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
from datetime import datetime, timedelta, timezone
45
from typing import List, Mapping, Optional
6+
from unittest.mock import Mock
57

68
import pytest
79

@@ -11,11 +13,10 @@
1113
RecordFilter,
1214
)
1315
from airbyte_cdk.sources.declarative.incremental import (
14-
CursorFactory,
15-
DatetimeBasedCursor,
16-
GlobalSubstreamCursor,
17-
PerPartitionWithGlobalCursor,
16+
ConcurrentPerPartitionCursor,
17+
ConcurrentCursorFactory,
1818
)
19+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
1920
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
2021
from airbyte_cdk.sources.declarative.models import (
2122
CustomRetriever,
@@ -24,7 +25,11 @@
2425
)
2526
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
2627
from airbyte_cdk.sources.declarative.types import StreamSlice
28+
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
29+
CustomFormatConcurrentStreamStateConverter,
30+
)
2731
from airbyte_cdk.sources.types import Record
32+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse, ab_datetime_now
2833

2934
DATE_FORMAT = "%Y-%m-%d"
3035
RECORDS_TO_FILTER_DATE_FORMAT = [
@@ -272,25 +277,27 @@ def test_client_side_record_filter_decorator_no_parent_stream(
272277
records_to_filter: List[Mapping],
273278
expected_record_ids: List[int],
274279
):
275-
date_time_based_cursor = DatetimeBasedCursor(
276-
start_datetime=MinMaxDatetime(
277-
datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}
280+
datetime_based_cursor = ConcurrentCursor(
281+
stream_name="any_stream",
282+
stream_namespace=None,
283+
stream_state=stream_state,
284+
message_repository=Mock(),
285+
connector_state_manager=Mock(),
286+
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
287+
datetime_format=datetime_format
278288
),
279-
end_datetime=MinMaxDatetime(datetime=end_datetime, parameters={}) if end_datetime else None,
280-
step="P10Y",
281-
cursor_field=InterpolatedString.create("created_at", parameters={}),
282-
datetime_format=datetime_format,
283-
cursor_granularity="P1D",
284-
config={},
285-
parameters={},
289+
cursor_field=CursorField("created_at"),
290+
slice_boundary_fields=("start", "end"),
291+
start=datetime(2021, 1, 1, tzinfo=timezone.utc),
292+
end_provider=lambda: ab_datetime_parse(end_datetime) if end_datetime else ab_datetime_now(),
293+
slice_range=timedelta(days=365 * 10),
286294
)
287-
date_time_based_cursor.set_initial_state(stream_state)
288295

289296
record_filter_decorator = ClientSideIncrementalRecordFilterDecorator(
290297
config={},
291298
condition=record_filter_expression,
292299
parameters={},
293-
cursor=date_time_based_cursor,
300+
cursor=datetime_based_cursor,
294301
)
295302

296303
filtered_records = list(
@@ -341,7 +348,7 @@ def test_client_side_record_filter_decorator_no_parent_stream(
341348
}
342349
],
343350
},
344-
"per_partition_with_global",
351+
"global_substream",
345352
[2, 3],
346353
),
347354
# Use PerPartitionWithGlobalCursor with partition state missing, global cursor used
@@ -363,23 +370,26 @@ def test_client_side_record_filter_decorator_no_parent_stream(
363370
def test_client_side_record_filter_decorator_with_cursor_types(
364371
stream_state: Optional[Mapping], cursor_type: str, expected_record_ids: List[int]
365372
):
366-
def date_time_based_cursor_factory() -> DatetimeBasedCursor:
367-
return DatetimeBasedCursor(
368-
start_datetime=MinMaxDatetime(
369-
datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}
370-
),
371-
end_datetime=MinMaxDatetime(
372-
datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={}
373+
def date_time_based_cursor_factory(stream_state, runtime_lookback_window) -> ConcurrentCursor:
374+
return ConcurrentCursor(
375+
stream_name="any_stream",
376+
stream_namespace=None,
377+
stream_state=stream_state,
378+
message_repository=Mock(),
379+
connector_state_manager=Mock(),
380+
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
381+
datetime_format=DATE_FORMAT
373382
),
374-
step="P10Y",
375-
cursor_field=InterpolatedString.create("created_at", parameters={}),
376-
datetime_format=DATE_FORMAT,
377-
cursor_granularity="P1D",
378-
config={},
379-
parameters={},
383+
cursor_field=CursorField("created_at"),
384+
slice_boundary_fields=("start", "end"),
385+
start=datetime(2021, 1, 1, tzinfo=timezone.utc),
386+
end_provider=lambda: datetime(2021, 1, 5, tzinfo=timezone.utc),
387+
slice_range=timedelta(days=365 * 10),
388+
cursor_granularity=timedelta(days=1),
389+
lookback_window=runtime_lookback_window,
380390
)
381391

382-
date_time_based_cursor = date_time_based_cursor_factory()
392+
date_time_based_cursor = date_time_based_cursor_factory(stream_state, timedelta(0))
383393

384394
substream_cursor = None
385395
partition_router = SubstreamPartitionRouter(
@@ -401,29 +411,26 @@ def date_time_based_cursor_factory() -> DatetimeBasedCursor:
401411
if cursor_type == "datetime":
402412
# Use only DatetimeBasedCursor
403413
pass # No additional cursor needed
404-
elif cursor_type == "global_substream":
414+
elif cursor_type in ["global_substream", "per_partition_with_global"]:
405415
# Create GlobalSubstreamCursor instance
406-
substream_cursor = GlobalSubstreamCursor(
407-
stream_cursor=date_time_based_cursor,
416+
substream_cursor = ConcurrentPerPartitionCursor(
417+
cursor_factory=ConcurrentCursorFactory(date_time_based_cursor_factory),
408418
partition_router=partition_router,
409-
)
410-
if stream_state:
411-
substream_cursor.set_initial_state(stream_state)
412-
elif cursor_type == "per_partition_with_global":
413-
# Create PerPartitionWithGlobalCursor instance
414-
substream_cursor = PerPartitionWithGlobalCursor(
415-
cursor_factory=CursorFactory(date_time_based_cursor_factory),
416-
partition_router=partition_router,
417-
stream_cursor=date_time_based_cursor,
419+
stream_name="a_stream",
420+
stream_namespace=None,
421+
stream_state=stream_state,
422+
message_repository=Mock(),
423+
connector_state_manager=Mock(),
424+
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
425+
datetime_format=DATE_FORMAT
426+
),
427+
cursor_field=CursorField("created_at"),
428+
use_global_cursor=cursor_type == "global_substream",
429+
attempt_to_create_cursor_if_not_provided=True,
418430
)
419431
else:
420432
raise ValueError(f"Unsupported cursor type: {cursor_type}")
421433

422-
if substream_cursor and stream_state:
423-
substream_cursor.set_initial_state(stream_state)
424-
elif stream_state:
425-
date_time_based_cursor.set_initial_state(stream_state)
426-
427434
# Create the record_filter_decorator with appropriate cursor
428435
record_filter_decorator = ClientSideIncrementalRecordFilterDecorator(
429436
config={},

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,12 @@
188188

189189
transformer = ManifestComponentTransformer()
190190

191-
input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}
191+
input_config = {
192+
"apikey": "verysecrettoken",
193+
"repos": ["airbyte", "airbyte-cloud"],
194+
"start_time": "2024-01-01T00:00:00.000+00:00",
195+
"end_time": "2025-01-01T00:00:00.000+00:00",
196+
}
192197

193198

194199
def get_factory_with_parameters(
@@ -1997,7 +2002,7 @@ def test_create_default_paginator():
19972002
"subcomponent_field_with_hint",
19982003
DpathExtractor(
19992004
field_path=[],
2000-
config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]},
2005+
config=input_config,
20012006
decoder=JsonDecoder(parameters={}),
20022007
parameters={},
20032008
),
@@ -2013,7 +2018,7 @@ def test_create_default_paginator():
20132018
"subcomponent_field_with_hint",
20142019
DpathExtractor(
20152020
field_path=[],
2016-
config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]},
2021+
config=input_config,
20172022
parameters={},
20182023
),
20192024
None,
@@ -2102,11 +2107,11 @@ def test_create_default_paginator():
21022107
pagination_strategy=OffsetIncrement(
21032108
page_size=10,
21042109
extractor=None,
2105-
config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]},
2110+
config=input_config,
21062111
parameters={},
21072112
),
21082113
url_base="https://physical_100.com",
2109-
config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]},
2114+
config=input_config,
21102115
parameters={"decoder": {"type": "JsonDecoder"}},
21112116
),
21122117
None,

0 commit comments

Comments
 (0)