Skip to content

Commit d6a087c

Browse files
fix(cdk): pass transformations to concurrent cursor and fix transformation (#546)
1 parent db07eff commit d6a087c

File tree

5 files changed

+120
-31
lines changed

5 files changed

+120
-31
lines changed

airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,20 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
7878
"<cursor_field>" : "<cursor_value>"
7979
}
8080
"""
81-
if stream_state:
82-
for key, value in stream_state.items():
83-
if isinstance(value, dict):
84-
keys = list(value.keys())
85-
if len(keys) != 1:
86-
# The input partitioned state should only have one key
87-
return False
88-
if keys[0] != self._cursor_field:
89-
# Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
90-
return False
81+
if not stream_state:
82+
return False
83+
for key, value in stream_state.items():
84+
# it is expected the internal value to be a dictionary according to docstring
85+
if not isinstance(value, dict):
86+
return False
87+
keys = list(value.keys())
88+
if len(keys) != 1:
89+
# The input partitioned state should only have one key
90+
return False
91+
if keys[0] != self._cursor_field:
92+
# Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
93+
return False
94+
9195
return True
9296

9397
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -928,9 +928,9 @@ def create_legacy_to_per_partition_state_migration(
928928
declarative_stream: DeclarativeStreamModel,
929929
) -> LegacyToPerPartitionStateMigration:
930930
retriever = declarative_stream.retriever
931-
if not isinstance(retriever, SimpleRetrieverModel):
931+
if not isinstance(retriever, (SimpleRetrieverModel, AsyncRetrieverModel)):
932932
raise ValueError(
933-
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got {type(retriever)}"
933+
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got {type(retriever)}"
934934
)
935935
partition_router = retriever.partition_router
936936
if not isinstance(
@@ -1999,6 +1999,17 @@ def _build_incremental_cursor(
19991999
stream_state = self._connector_state_manager.get_stream_state(
20002000
stream_name, stream_namespace
20012001
)
2002+
state_transformations = (
2003+
[
2004+
self._create_component_from_model(
2005+
state_migration, config, declarative_stream=model
2006+
)
2007+
for state_migration in model.state_migrations
2008+
]
2009+
if model.state_migrations
2010+
else []
2011+
)
2012+
20022013
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
20032014
state_manager=self._connector_state_manager,
20042015
model_type=DatetimeBasedCursorModel,
@@ -2007,6 +2018,7 @@ def _build_incremental_cursor(
20072018
stream_namespace=stream_namespace,
20082019
config=config or {},
20092020
stream_state=stream_state,
2021+
stream_state_migrations=state_transformations,
20102022
partition_router=stream_slicer,
20112023
)
20122024

unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ def test_migrate_a_valid_legacy_state_to_per_partition():
189189
},
190190
id="test_should_not_migrate_if_the_partitioned_state_key_is_not_the_cursor_field",
191191
),
192+
pytest.param(
193+
{"last_changed": "2022-12-27T08:34:39+00:00"},
194+
id="test_should_not_migrate_if_the_partitioned_state_is_not_in_correct_format",
195+
),
196+
pytest.param(
197+
{},
198+
id="test_should_not_migrate_if_not_state_is_passed",
199+
),
192200
],
193201
)
194202
def test_should_not_migrate(input_state):
@@ -277,7 +285,7 @@ def _migrator_with_multiple_parent_streams():
277285
CustomPartitionRouter,
278286
True,
279287
ValueError,
280-
"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got <class 'unittest.mock.MagicMock'>",
288+
"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got <class 'unittest.mock.MagicMock'>",
281289
),
282290
(
283291
SimpleRetriever,

unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,60 @@ list_stream:
5656
cursor_field: TimePeriod
5757
cursor_datetime_formats:
5858
- "%Y-%m-%dT%H:%M:%S%z"
59+
state_migrations:
60+
- type: LegacyToPerPartitionStateMigration
5961
retriever:
6062
type: AsyncRetriever
6163
name: "{{ parameters['name'] }}"
6264
decoder:
6365
$ref: "#/decoder"
6466
partition_router:
65-
type: ListPartitionRouter
66-
values: "{{config['repos']}}"
67-
cursor_field: a_key
68-
request_option:
69-
inject_into: header
70-
field_name: a_key
67+
type: SubstreamPartitionRouter
68+
parent_stream_configs:
69+
- type: ParentStreamConfig
70+
parent_key: Id
71+
partition_field: account_id
72+
stream:
73+
type: DeclarativeStream
74+
name: lists_parent
75+
schema_loader:
76+
type: JsonFileSchemaLoader
77+
file_path: "./source_sendgrid/schemas/{{ parameters.name }}.json"
78+
retriever:
79+
type: SimpleRetriever
80+
name: "{{ parameters['name'] }}"
81+
decoder:
82+
$ref: "#/decoder"
83+
partition_router:
84+
type: ListPartitionRouter
85+
values: "{{config['repos']}}"
86+
cursor_field: a_key
87+
request_option:
88+
inject_into: header
89+
field_name: a_key
90+
paginator:
91+
type: DefaultPaginator
92+
page_size_option:
93+
inject_into: request_parameter
94+
field_name: page_size
95+
page_token_option:
96+
inject_into: path
97+
type: RequestPath
98+
pagination_strategy:
99+
type: "CursorPagination"
100+
cursor_value: "{{ response._metadata.next }}"
101+
page_size: 10
102+
requester:
103+
$ref: "#/requester"
104+
path: "{{ next_page_token['next_page_url'] }}"
105+
record_selector:
106+
$ref: "#/selector"
107+
$parameters:
108+
name: "lists_parent"
109+
primary_key: "id"
110+
extractor:
111+
$ref: "#/extractor"
112+
field_path: [ "{{ parameters['name'] }}" ]
71113
status_mapping:
72114
failed:
73115
- Error

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -944,8 +944,23 @@ def test_stream_with_incremental_and_retriever_with_partition_router():
944944
assert list_stream_slicer._cursor_field.string == "a_key"
945945

946946

947+
@pytest.mark.parametrize(
948+
"use_legacy_state",
949+
[
950+
False,
951+
True,
952+
],
953+
ids=[
954+
"running_with_newest_state",
955+
"running_with_legacy_state",
956+
],
957+
)
947958
@freezegun.freeze_time("2025-05-14")
948-
def test_stream_with_incremental_and_async_retriever_with_partition_router():
959+
def test_stream_with_incremental_and_async_retriever_with_partition_router(use_legacy_state):
960+
"""
961+
This test is to check the behavior of the stream with async retriever and partition router
962+
when the state is in the legacy format or the newest format.
963+
"""
949964
content = read_yaml_file(
950965
"resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml"
951966
)
@@ -956,26 +971,34 @@ def test_stream_with_incremental_and_async_retriever_with_partition_router():
956971
)
957972
cursor_time_period_value = "2025-05-06T12:00:00+0000"
958973
cursor_field_key = "TimePeriod"
959-
parent_user_id = "102023653"
960-
per_partition_key = {
961-
"account_id": 999999999,
962-
"parent_slice": {"parent_slice": {}, "user_id": parent_user_id},
963-
}
974+
account_id = 999999999
975+
per_partition_key = {"account_id": account_id}
976+
977+
legacy_stream_state = {account_id: {cursor_field_key: cursor_time_period_value}}
978+
states = [
979+
{"partition": per_partition_key, "cursor": {cursor_field_key: cursor_time_period_value}}
980+
]
981+
964982
stream_state = {
965983
"use_global_cursor": False,
966-
"states": [
967-
{"partition": per_partition_key, "cursor": {cursor_field_key: cursor_time_period_value}}
968-
],
969-
"state": {cursor_field_key: "2025-05-12T12:00:00+0000"},
970-
"lookback_window": 46,
984+
"states": states,
985+
"lookback_window": 0,
971986
}
987+
if not use_legacy_state:
988+
# to check it keeps other data in the newest state format
989+
stream_state["state"] = {cursor_field_key: "2025-05-12T12:00:00+0000"}
990+
stream_state["lookback_window"] = 46
991+
stream_state["use_global_cursor"] = False
992+
per_partition_key["parent_slice"] = {"parent_slice": {}, "user_id": "102023653"}
993+
994+
state_to_test = legacy_stream_state if use_legacy_state else stream_state
972995
connector_state_manager = ConnectorStateManager(
973996
state=[
974997
AirbyteStateMessage(
975998
type=AirbyteStateType.STREAM,
976999
stream=AirbyteStreamState(
9771000
stream_descriptor=StreamDescriptor(name="lists"),
978-
stream_state=AirbyteStateBlob(stream_state),
1001+
stream_state=AirbyteStateBlob(state_to_test),
9791002
),
9801003
)
9811004
]

0 commit comments

Comments
 (0)