From 6d6bbedabd8ab6191f00c76d192bb94505be02d8 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Tue, 26 Aug 2025 15:15:42 -0400 Subject: [PATCH 1/5] fix parameters propagation issues for concurrent cursors --- .../parsers/model_to_component_factory.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 777ee1ccf..cfa6fafdc 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1278,6 +1278,12 @@ def create_concurrent_cursor_from_datetime_based_cursor( f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" ) + # FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases: + # * The ComponentDefinition comes from model.__dict__ in which case we have `parameters` + # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` + # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. + if "$parameters" not in component_definition and "parameters" in component_definition: + component_definition["$parameters"] = component_definition.get("parameters") datetime_based_cursor_model = model_type.parse_obj(component_definition) if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): @@ -1582,6 +1588,12 @@ def create_concurrent_cursor_from_perpartition_cursor( f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" ) + # FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases: + # * The ComponentDefinition comes from model.__dict__ in which case we have `parameters` + # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` + # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. + if "$parameters" not in component_definition and "parameters" in component_definition: + component_definition["$parameters"] = component_definition.get("parameters") datetime_based_cursor_model = model_type.parse_obj(component_definition) if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): From 1cd879fedbe1a824e73da1947d40ea9f8b03b9e3 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Tue, 26 Aug 2025 15:29:02 -0400 Subject: [PATCH 2/5] add unit test --- .../test_concurrent_declarative_source.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 9f96ee50f..fe2beb5c6 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4649,3 +4649,90 @@ def test_given_invalid_config_streams_validates_config_and_raises(): with pytest.raises(ValueError): source.streams(input_config) + + +def test_parameter_propagation_for_concurrent_cursor(): + cursor_field_parameter_override = "created_at" + manifest = { + "version": "5.0.0", + "definitions": { + "selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "requester": { + "type": "HttpRequester", + "url_base": "https://persona.metaverse.com", + "http_method": "GET", + }, + "retriever": { + "type": "SimpleRetriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "paginator": {"type": "NoPagination"}, + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_cursor": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "2024-01-01" + }, + "end_datetime": "2024-12-31", + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d"], + "cursor_granularity": "P1D", + "step": "P400D", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_time_option": { + "type": "RequestOption", + "field_name": "start", + "inject_into": "request_parameter", + }, + "end_time_option": { + "type": "RequestOption", + "field_name": "end", + "inject_into": "request_parameter", + }, + }, + "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, + "incremental_stream": { + "retriever": { + "$ref": "#/definitions/retriever", + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, + "$parameters": {"name": "stream_name", "primary_key": "id", "path": "/path", "cursor_field": cursor_field_parameter_override}, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + }, + }, + }, + }, + }, + "streams": [ + "#/definitions/incremental_stream", + ], + "check": {"stream_names": ["stream_name"]}, + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, + } + + source = ConcurrentDeclarativeSource( + source_config=manifest, + config={}, + catalog=create_catalog("stream_name"), + state=None, + ) + streams = source.streams({}) + + assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override From d739e47f373d2e46fada81ec83e69be56736f569 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Tue, 26 Aug 2025 15:30:13 -0400 Subject: [PATCH 3/5] fix mypy --- .../sources/declarative/parsers/model_to_component_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index cfa6fafdc..649e66b44 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1283,7 +1283,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. if "$parameters" not in component_definition and "parameters" in component_definition: - component_definition["$parameters"] = component_definition.get("parameters") + component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict datetime_based_cursor_model = model_type.parse_obj(component_definition) if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): @@ -1593,7 +1593,7 @@ def create_concurrent_cursor_from_perpartition_cursor( # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. if "$parameters" not in component_definition and "parameters" in component_definition: - component_definition["$parameters"] = component_definition.get("parameters") + component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict datetime_based_cursor_model = model_type.parse_obj(component_definition) if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): From c0e8b860bb95f07cf54f44cf1f8895104535a7fa Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Tue, 26 Aug 2025 15:33:03 -0400 Subject: [PATCH 4/5] format --- .../declarative/test_concurrent_declarative_source.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index fe2beb5c6..14d52f832 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4673,9 +4673,7 @@ def test_parameter_propagation_for_concurrent_cursor(): }, "incremental_cursor": { "type": "DatetimeBasedCursor", - "start_datetime": { - "datetime": "2024-01-01" - }, + "start_datetime": {"datetime": "2024-01-01"}, "end_datetime": "2024-12-31", "datetime_format": "%Y-%m-%d", "cursor_datetime_formats": ["%Y-%m-%d"], @@ -4700,7 +4698,12 @@ def test_parameter_propagation_for_concurrent_cursor(): "requester": {"$ref": "#/definitions/requester"}, }, "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, - "$parameters": {"name": "stream_name", "primary_key": "id", "path": "/path", "cursor_field": cursor_field_parameter_override}, + "$parameters": { + "name": "stream_name", + "primary_key": "id", + "path": "/path", + "cursor_field": cursor_field_parameter_override, + }, "schema_loader": { "type": "InlineSchemaLoader", "schema": { From 183262c459aec395bcfc46ee4c549eb1e715264d Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Tue, 26 Aug 2025 15:47:38 -0400 Subject: [PATCH 5/5] retrigger CI