Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,12 @@ def create_concurrent_cursor_from_datetime_based_cursor(
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
)

# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
if "$parameters" not in component_definition and "parameters" in component_definition:
component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict
datetime_based_cursor_model = model_type.parse_obj(component_definition)

if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel):
Expand Down Expand Up @@ -1582,6 +1588,12 @@ def create_concurrent_cursor_from_perpartition_cursor(
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
)

# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
if "$parameters" not in component_definition and "parameters" in component_definition:
component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict
datetime_based_cursor_model = model_type.parse_obj(component_definition)

if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4649,3 +4649,93 @@ def test_given_invalid_config_streams_validates_config_and_raises():

with pytest.raises(ValueError):
source.streams(input_config)


def test_parameter_propagation_for_concurrent_cursor():
cursor_field_parameter_override = "created_at"
manifest = {
"version": "5.0.0",
"definitions": {
"selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"requester": {
"type": "HttpRequester",
"url_base": "https://persona.metaverse.com",
"http_method": "GET",
},
"retriever": {
"type": "SimpleRetriever",
"record_selector": {"$ref": "#/definitions/selector"},
"paginator": {"type": "NoPagination"},
"requester": {"$ref": "#/definitions/requester"},
},
"incremental_cursor": {
"type": "DatetimeBasedCursor",
"start_datetime": {"datetime": "2024-01-01"},
"end_datetime": "2024-12-31",
"datetime_format": "%Y-%m-%d",
"cursor_datetime_formats": ["%Y-%m-%d"],
"cursor_granularity": "P1D",
"step": "P400D",
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
"start_time_option": {
"type": "RequestOption",
"field_name": "start",
"inject_into": "request_parameter",
},
"end_time_option": {
"type": "RequestOption",
"field_name": "end",
"inject_into": "request_parameter",
},
},
"base_stream": {"retriever": {"$ref": "#/definitions/retriever"}},
"incremental_stream": {
"retriever": {
"$ref": "#/definitions/retriever",
"requester": {"$ref": "#/definitions/requester"},
},
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
"$parameters": {
"name": "stream_name",
"primary_key": "id",
"path": "/path",
"cursor_field": cursor_field_parameter_override,
},
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"description": "The identifier",
"type": ["null", "string"],
},
},
},
},
},
},
"streams": [
"#/definitions/incremental_stream",
],
"check": {"stream_names": ["stream_name"]},
"concurrency_level": {
"type": "ConcurrencyLevel",
"default_concurrency": "{{ config['num_workers'] or 10 }}",
"max_concurrency": 25,
},
}

source = ConcurrentDeclarativeSource(
source_config=manifest,
config={},
catalog=create_catalog("stream_name"),
state=None,
)
streams = source.streams({})

assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override
Loading