Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,20 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
"<cursor_field>" : "<cursor_value>"
}
"""
if stream_state:
for key, value in stream_state.items():
if isinstance(value, dict):
keys = list(value.keys())
if len(keys) != 1:
# The input partitioned state should only have one key
return False
if keys[0] != self._cursor_field:
# Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
return False
if not stream_state:
return False
for key, value in stream_state.items():
# it is expected the internal value to be a dictionary according to docstring
if not isinstance(value, dict):
return False
keys = list(value.keys())
if len(keys) != 1:
# The input partitioned state should only have one key
return False
if keys[0] != self._cursor_field:
# Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
return False

return True

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,9 +928,9 @@ def create_legacy_to_per_partition_state_migration(
declarative_stream: DeclarativeStreamModel,
) -> LegacyToPerPartitionStateMigration:
retriever = declarative_stream.retriever
if not isinstance(retriever, SimpleRetrieverModel):
if not isinstance(retriever, (SimpleRetrieverModel, AsyncRetrieverModel)):
raise ValueError(
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got {type(retriever)}"
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got {type(retriever)}"
)
partition_router = retriever.partition_router
if not isinstance(
Expand Down Expand Up @@ -1999,6 +1999,17 @@ def _build_incremental_cursor(
stream_state = self._connector_state_manager.get_stream_state(
stream_name, stream_namespace
)
state_transformations = (
[
self._create_component_from_model(
state_migration, config, declarative_stream=model
)
for state_migration in model.state_migrations
]
if model.state_migrations
else []
)

return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand All @@ -2007,6 +2018,7 @@ def _build_incremental_cursor(
stream_namespace=stream_namespace,
config=config or {},
stream_state=stream_state,
stream_state_migrations=state_transformations,
partition_router=stream_slicer,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ def test_migrate_a_valid_legacy_state_to_per_partition():
},
id="test_should_not_migrate_if_the_partitioned_state_key_is_not_the_cursor_field",
),
pytest.param(
{"last_changed": "2022-12-27T08:34:39+00:00"},
id="test_should_not_migrate_if_the_partitioned_state_is_not_in_correct_format",
),
pytest.param(
{},
id="test_should_not_migrate_if_not_state_is_passed",
),
],
)
def test_should_not_migrate(input_state):
Expand Down Expand Up @@ -277,7 +285,7 @@ def _migrator_with_multiple_parent_streams():
CustomPartitionRouter,
True,
ValueError,
"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got <class 'unittest.mock.MagicMock'>",
"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got <class 'unittest.mock.MagicMock'>",
),
(
SimpleRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,60 @@ list_stream:
cursor_field: TimePeriod
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S%z"
state_migrations:
- type: LegacyToPerPartitionStateMigration
retriever:
type: AsyncRetriever
name: "{{ parameters['name'] }}"
decoder:
$ref: "#/decoder"
partition_router:
type: ListPartitionRouter
values: "{{config['repos']}}"
cursor_field: a_key
request_option:
inject_into: header
field_name: a_key
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: Id
partition_field: account_id
stream:
type: DeclarativeStream
name: lists_parent
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_sendgrid/schemas/{{ parameters.name }}.json"
retriever:
type: SimpleRetriever
name: "{{ parameters['name'] }}"
decoder:
$ref: "#/decoder"
partition_router:
type: ListPartitionRouter
values: "{{config['repos']}}"
cursor_field: a_key
request_option:
inject_into: header
field_name: a_key
paginator:
type: DefaultPaginator
page_size_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
type: RequestPath
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
page_size: 10
requester:
$ref: "#/requester"
path: "{{ next_page_token['next_page_url'] }}"
record_selector:
$ref: "#/selector"
$parameters:
name: "lists_parent"
primary_key: "id"
extractor:
$ref: "#/extractor"
field_path: [ "{{ parameters['name'] }}" ]
status_mapping:
failed:
- Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,23 @@ def test_stream_with_incremental_and_retriever_with_partition_router():
assert list_stream_slicer._cursor_field.string == "a_key"


@pytest.mark.parametrize(
"use_legacy_state",
[
False,
True,
],
ids=[
"running_with_newest_state",
"running_with_legacy_state",
],
)
@freezegun.freeze_time("2025-05-14")
def test_stream_with_incremental_and_async_retriever_with_partition_router():
def test_stream_with_incremental_and_async_retriever_with_partition_router(use_legacy_state):
"""
This test is to check the behavior of the stream with async retriever and partition router
when the state is in the legacy format or the newest format.
"""
content = read_yaml_file(
"resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml"
)
Expand All @@ -956,26 +971,34 @@ def test_stream_with_incremental_and_async_retriever_with_partition_router():
)
cursor_time_period_value = "2025-05-06T12:00:00+0000"
cursor_field_key = "TimePeriod"
parent_user_id = "102023653"
per_partition_key = {
"account_id": 999999999,
"parent_slice": {"parent_slice": {}, "user_id": parent_user_id},
}
account_id = 999999999
per_partition_key = {"account_id": account_id}

legacy_stream_state = {account_id: {cursor_field_key: cursor_time_period_value}}
states = [
{"partition": per_partition_key, "cursor": {cursor_field_key: cursor_time_period_value}}
]

stream_state = {
"use_global_cursor": False,
"states": [
{"partition": per_partition_key, "cursor": {cursor_field_key: cursor_time_period_value}}
],
"state": {cursor_field_key: "2025-05-12T12:00:00+0000"},
"lookback_window": 46,
"states": states,
"lookback_window": 0,
}
if not use_legacy_state:
# to check it keeps other data in the newest state format
stream_state["state"] = {cursor_field_key: "2025-05-12T12:00:00+0000"}
stream_state["lookback_window"] = 46
stream_state["use_global_cursor"] = False
per_partition_key["parent_slice"] = {"parent_slice": {}, "user_id": "102023653"}

state_to_test = legacy_stream_state if use_legacy_state else stream_state
connector_state_manager = ConnectorStateManager(
state=[
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="lists"),
stream_state=AirbyteStateBlob(stream_state),
stream_state=AirbyteStateBlob(state_to_test),
),
)
]
Expand Down
Loading