Skip to content

Commit 2aa7b58

Browse files
authored
fix: parameters propagation issues for concurrent cursors (#726)
1 parent 4475c57 commit 2aa7b58

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,12 @@ def create_concurrent_cursor_from_datetime_based_cursor(
12781278
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
12791279
)
12801280

1281+
# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
1282+
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
1283+
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
1284+
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
1285+
if "$parameters" not in component_definition and "parameters" in component_definition:
1286+
component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict
12811287
datetime_based_cursor_model = model_type.parse_obj(component_definition)
12821288

12831289
if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel):
@@ -1582,6 +1588,12 @@ def create_concurrent_cursor_from_perpartition_cursor(
15821588
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
15831589
)
15841590

1591+
# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
1592+
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
1593+
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
1594+
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
1595+
if "$parameters" not in component_definition and "parameters" in component_definition:
1596+
component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict
15851597
datetime_based_cursor_model = model_type.parse_obj(component_definition)
15861598

15871599
if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel):

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4649,3 +4649,93 @@ def test_given_invalid_config_streams_validates_config_and_raises():
46494649

46504650
with pytest.raises(ValueError):
46514651
source.streams(input_config)
4652+
4653+
4654+
def test_parameter_propagation_for_concurrent_cursor():
4655+
cursor_field_parameter_override = "created_at"
4656+
manifest = {
4657+
"version": "5.0.0",
4658+
"definitions": {
4659+
"selector": {
4660+
"type": "RecordSelector",
4661+
"extractor": {"type": "DpathExtractor", "field_path": []},
4662+
},
4663+
"requester": {
4664+
"type": "HttpRequester",
4665+
"url_base": "https://persona.metaverse.com",
4666+
"http_method": "GET",
4667+
},
4668+
"retriever": {
4669+
"type": "SimpleRetriever",
4670+
"record_selector": {"$ref": "#/definitions/selector"},
4671+
"paginator": {"type": "NoPagination"},
4672+
"requester": {"$ref": "#/definitions/requester"},
4673+
},
4674+
"incremental_cursor": {
4675+
"type": "DatetimeBasedCursor",
4676+
"start_datetime": {"datetime": "2024-01-01"},
4677+
"end_datetime": "2024-12-31",
4678+
"datetime_format": "%Y-%m-%d",
4679+
"cursor_datetime_formats": ["%Y-%m-%d"],
4680+
"cursor_granularity": "P1D",
4681+
"step": "P400D",
4682+
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
4683+
"start_time_option": {
4684+
"type": "RequestOption",
4685+
"field_name": "start",
4686+
"inject_into": "request_parameter",
4687+
},
4688+
"end_time_option": {
4689+
"type": "RequestOption",
4690+
"field_name": "end",
4691+
"inject_into": "request_parameter",
4692+
},
4693+
},
4694+
"base_stream": {"retriever": {"$ref": "#/definitions/retriever"}},
4695+
"incremental_stream": {
4696+
"retriever": {
4697+
"$ref": "#/definitions/retriever",
4698+
"requester": {"$ref": "#/definitions/requester"},
4699+
},
4700+
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
4701+
"$parameters": {
4702+
"name": "stream_name",
4703+
"primary_key": "id",
4704+
"path": "/path",
4705+
"cursor_field": cursor_field_parameter_override,
4706+
},
4707+
"schema_loader": {
4708+
"type": "InlineSchemaLoader",
4709+
"schema": {
4710+
"$schema": "https://json-schema.org/draft-07/schema#",
4711+
"type": "object",
4712+
"properties": {
4713+
"id": {
4714+
"description": "The identifier",
4715+
"type": ["null", "string"],
4716+
},
4717+
},
4718+
},
4719+
},
4720+
},
4721+
},
4722+
"streams": [
4723+
"#/definitions/incremental_stream",
4724+
],
4725+
"check": {"stream_names": ["stream_name"]},
4726+
"concurrency_level": {
4727+
"type": "ConcurrencyLevel",
4728+
"default_concurrency": "{{ config['num_workers'] or 10 }}",
4729+
"max_concurrency": 25,
4730+
},
4731+
}
4732+
4733+
source = ConcurrentDeclarativeSource(
4734+
source_config=manifest,
4735+
config={},
4736+
catalog=create_catalog("stream_name"),
4737+
state=None,
4738+
)
4739+
streams = source.streams({})
4740+
4741+
assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override

0 commit comments

Comments
 (0)