From f359981de1f7c4bfdab52c7c36be455e4cab2035 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 15 May 2025 13:52:06 -0600 Subject: [PATCH 1/2] fix legacy migration when component is not in correct format --- ...legacy_to_per_partition_state_migration.py | 3 ++ .../parsers/model_to_component_factory.py | 16 +++++- .../test_legacy_to_per_partition_migration.py | 6 ++- ..._aync_retriever_with_partition_router.yaml | 54 ++++++++++++++++--- .../test_model_to_component_factory.py | 47 +++++++++++----- 5 files changed, 105 insertions(+), 21 deletions(-) diff --git a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index 830646fe9..21615f289 100644 --- a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -88,6 +88,9 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: if keys[0] != self._cursor_field: # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} return False + # it is expected the internal value to be a dictionary according to docstring + else: + return False return True def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 148b290ce..e7cdb6683 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -928,9 +928,9 @@ def create_legacy_to_per_partition_state_migration( declarative_stream: DeclarativeStreamModel, ) -> LegacyToPerPartitionStateMigration: retriever = declarative_stream.retriever - if not isinstance(retriever, SimpleRetrieverModel): + if not isinstance(retriever, (SimpleRetrieverModel, AsyncRetrieverModel)): raise ValueError( - f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got {type(retriever)}" + f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got {type(retriever)}" ) partition_router = retriever.partition_router if not isinstance( @@ -1999,6 +1999,17 @@ def _build_incremental_cursor( stream_state = self._connector_state_manager.get_stream_state( stream_name, stream_namespace ) + state_transformations = ( + [ + self._create_component_from_model( + state_migration, config, declarative_stream=model + ) + for state_migration in model.state_migrations + ] + if model.state_migrations + else [] + ) + return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing state_manager=self._connector_state_manager, model_type=DatetimeBasedCursorModel, @@ -2007,6 +2018,7 @@ def _build_incremental_cursor( stream_namespace=stream_namespace, config=config or {}, stream_state=stream_state, + stream_state_migrations=state_transformations, partition_router=stream_slicer, ) diff --git a/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py b/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py index 8e03d2268..fbf8615ad 100644 --- a/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py +++ b/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py @@ -189,6 +189,10 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): }, id="test_should_not_migrate_if_the_partitioned_state_key_is_not_the_cursor_field", ), + pytest.param( + {"last_changed": "2022-12-27T08:34:39+00:00"}, + id="test_should_not_migrate_if_the_partitioned_state_is_not_in_correct_format", + ), ], ) def test_should_not_migrate(input_state): @@ -277,7 +281,7 @@ def _migrator_with_multiple_parent_streams(): CustomPartitionRouter, True, ValueError, - "LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got ", + "LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got ", ), ( SimpleRetriever, diff --git a/unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml b/unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml index ef2160308..5564af11b 100644 --- a/unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml +++ b/unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml @@ -56,18 +56,60 @@ list_stream: cursor_field: TimePeriod cursor_datetime_formats: - "%Y-%m-%dT%H:%M:%S%z" + state_migrations: + - type: LegacyToPerPartitionStateMigration retriever: type: AsyncRetriever name: "{{ parameters['name'] }}" decoder: $ref: "#/decoder" partition_router: - type: ListPartitionRouter - values: "{{config['repos']}}" - cursor_field: a_key - request_option: - inject_into: header - field_name: a_key + type: SubstreamPartitionRouter + parent_stream_configs: + - type: ParentStreamConfig + parent_key: Id + partition_field: account_id + stream: + type: DeclarativeStream + name: lists_parent + schema_loader: + type: JsonFileSchemaLoader + file_path: "./source_sendgrid/schemas/{{ parameters.name }}.json" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + decoder: + $ref: "#/decoder" + partition_router: + type: ListPartitionRouter + values: "{{config['repos']}}" + cursor_field: a_key + request_option: + inject_into: header + field_name: a_key + paginator: + type: DefaultPaginator + page_size_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + type: RequestPath + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + page_size: 10 + requester: + $ref: "#/requester" + path: "{{ next_page_token['next_page_url'] }}" + record_selector: + $ref: "#/selector" + $parameters: + name: "lists_parent" + primary_key: "id" + extractor: + $ref: "#/extractor" + field_path: [ "{{ parameters['name'] }}" ] status_mapping: failed: - Error diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index fb337b8c0..0b97ee453 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -944,8 +944,23 @@ def test_stream_with_incremental_and_retriever_with_partition_router(): assert list_stream_slicer._cursor_field.string == "a_key" +@pytest.mark.parametrize( + "use_legacy_state", + [ + False, + True, + ], + ids=[ + "running_with_newest_state", + "running_with_legacy_state", + ], +) @freezegun.freeze_time("2025-05-14") -def test_stream_with_incremental_and_async_retriever_with_partition_router(): +def test_stream_with_incremental_and_async_retriever_with_partition_router(use_legacy_state): + """ + This test is to check the behavior of the stream with async retriever and partition router + when the state is in the legacy format or the newest format. + """ content = read_yaml_file( "resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml" ) @@ -956,26 +971,34 @@ def test_stream_with_incremental_and_async_retriever_with_partition_router(): ) cursor_time_period_value = "2025-05-06T12:00:00+0000" cursor_field_key = "TimePeriod" - parent_user_id = "102023653" - per_partition_key = { - "account_id": 999999999, - "parent_slice": {"parent_slice": {}, "user_id": parent_user_id}, - } + account_id = 999999999 + per_partition_key = {"account_id": account_id} + + legacy_stream_state = {account_id: {cursor_field_key: cursor_time_period_value}} + states = [ + {"partition": per_partition_key, "cursor": {cursor_field_key: cursor_time_period_value}} + ] + stream_state = { "use_global_cursor": False, - "states": [ - {"partition": per_partition_key, "cursor": {cursor_field_key: cursor_time_period_value}} - ], - "state": {cursor_field_key: "2025-05-12T12:00:00+0000"}, - "lookback_window": 46, + "states": states, + "lookback_window": 0, } + if not use_legacy_state: + # to check it keeps other data in the newest state format + stream_state["state"] = {cursor_field_key: "2025-05-12T12:00:00+0000"} + stream_state["lookback_window"] = 46 + stream_state["use_global_cursor"] = False + per_partition_key["parent_slice"] = {"parent_slice": {}, "user_id": "102023653"} + + state_to_test = legacy_stream_state if use_legacy_state else stream_state connector_state_manager = ConnectorStateManager( state=[ AirbyteStateMessage( type=AirbyteStateType.STREAM, stream=AirbyteStreamState( stream_descriptor=StreamDescriptor(name="lists"), - stream_state=AirbyteStateBlob(stream_state), + stream_state=AirbyteStateBlob(state_to_test), ), ) ] From 9d45658916654c89bbe6bde67dc7f6ec80513216 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 15 May 2025 15:30:32 -0600 Subject: [PATCH 2/2] handle case when no state is passed --- ...legacy_to_per_partition_state_migration.py | 27 ++++++++++--------- .../test_legacy_to_per_partition_migration.py | 4 +++ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index 21615f289..c8e93743d 100644 --- a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -78,19 +78,20 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: "" : "" } """ - if stream_state: - for key, value in stream_state.items(): - if isinstance(value, dict): - keys = list(value.keys()) - if len(keys) != 1: - # The input partitioned state should only have one key - return False - if keys[0] != self._cursor_field: - # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} - return False - # it is expected the internal value to be a dictionary according to docstring - else: - return False + if not stream_state: + return False + for key, value in stream_state.items(): + # it is expected the internal value to be a dictionary according to docstring + if not isinstance(value, dict): + return False + keys = list(value.keys()) + if len(keys) != 1: + # The input partitioned state should only have one key + return False + if keys[0] != self._cursor_field: + # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} + return False + return True def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: diff --git a/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py b/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py index fbf8615ad..11be8231a 100644 --- a/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py +++ b/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py @@ -193,6 +193,10 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): {"last_changed": "2022-12-27T08:34:39+00:00"}, id="test_should_not_migrate_if_the_partitioned_state_is_not_in_correct_format", ), + pytest.param( + {}, + id="test_should_not_migrate_if_not_state_is_passed", + ), ], ) def test_should_not_migrate(input_state):