Skip to content

Commit f4ca661

Browse files
pass parameters in ManifestComponentTransformer
1 parent 708ceeb commit f4ca661

File tree

3 files changed

+161
-29
lines changed

3 files changed

+161
-29
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,6 @@ def propagate_types_and_parameters(
120120
if found_type:
121121
propagated_component["type"] = found_type
122122

123-
# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
124-
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
125-
# be json_schema are not objects but we believe this is not likely in our case because:
126-
# * records are Mapping so objects hence SchemaLoader root should be an object
127-
# * connection_specification is a Mapping
128-
if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
129-
return propagated_component
130-
131123
# Combines parameters defined at the current level with parameters from parent components. Parameters at the current
132124
# level take precedence
133125
current_parameters = dict(copy.deepcopy(parent_parameters))
@@ -138,6 +130,29 @@ def propagate_types_and_parameters(
138130
else {**current_parameters, **component_parameters}
139131
)
140132

133+
# When processing request parameters which is an object that does not have a type, so $parameters will not be passes to the object.
134+
# But request parameters can have PropertyChunking object that needs to be updated with paranet $parameters.
135+
# When there is a PropertyChunking object _process_property_chunking_property() is called to update PropertyChunking object with $parameters
136+
# and set updated object to propagated_component, then it's returned without propagation.
137+
if "type" not in propagated_component and self._is_property_chunking_component(
138+
propagated_component
139+
):
140+
propagated_component = self._process_property_chunking_property(
141+
propagated_component,
142+
parent_field_identifier,
143+
current_parameters,
144+
use_parent_parameters,
145+
)
146+
147+
# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
148+
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
149+
# be json_schema are not objects but we believe this is not likely in our case because:
150+
# * records are Mapping so objects hence SchemaLoader root should be an object
151+
# * connection_specification is a Mapping
152+
153+
if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
154+
return propagated_component
155+
141156
# Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
142157
# both exist
143158
for parameter_key, parameter_value in current_parameters.items():
@@ -182,3 +197,30 @@ def propagate_types_and_parameters(
182197
@staticmethod
183198
def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
184199
return propagated_component.get("type") == "object"
200+
201+
@staticmethod
202+
def _is_property_chunking_component(propagated_component: Mapping[str, Any]) -> bool:
203+
has_property_chunking = False
204+
for k, v in propagated_component.items():
205+
if isinstance(v, dict) and v.get("type") == "QueryProperties":
206+
has_property_chunking = True
207+
return has_property_chunking
208+
209+
def _process_property_chunking_property(
210+
self,
211+
propagated_component: Dict[str, Any],
212+
parent_field_identifier: str,
213+
current_parameters: Mapping[str, Any],
214+
use_parent_parameters: Optional[bool] = None,
215+
) -> Dict[str, Any]:
216+
for k, v in propagated_component.items():
217+
if isinstance(v, dict) and v.get("type") == "QueryProperties":
218+
property_chunking_with_parameters = self.propagate_types_and_parameters(
219+
parent_field_identifier,
220+
v,
221+
current_parameters,
222+
use_parent_parameters=use_parent_parameters,
223+
)
224+
propagated_component[k] = property_chunking_with_parameters
225+
226+
return propagated_component

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2758,11 +2758,6 @@ def create_parent_stream_config(
27582758
def create_properties_from_endpoint(
27592759
self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any
27602760
) -> PropertiesFromEndpoint:
2761-
# CustomRetriever doesn't have requester parameter
2762-
if isinstance(model.retriever, SimpleRetrieverModel):
2763-
parameters = model.retriever.requester.parameters or {}
2764-
parameters.update(model.parameters or {})
2765-
model.retriever.requester.parameters = parameters
27662761
retriever = self._create_component_from_model(
27672762
model=model.retriever,
27682763
config=config,
@@ -2813,14 +2808,8 @@ def create_query_properties(
28132808
if isinstance(model.property_list, list):
28142809
property_list = model.property_list
28152810
else:
2816-
property_list_model = model.property_list
2817-
parameters = (
2818-
property_list_model.parameters if property_list_model.parameters is not None else {}
2819-
)
2820-
parameters.update(model.parameters or {})
2821-
property_list_model.parameters = parameters
28222811
property_list = self._create_component_from_model(
2823-
model=property_list_model, config=config, **kwargs
2812+
model=model.property_list, config=config, **kwargs
28242813
)
28252814

28262815
property_chunking = (
@@ -3057,16 +3046,8 @@ def _get_url() -> str:
30573046
)
30583047

30593048
if len(query_properties_definitions) == 1:
3060-
query_properties_definition_model = query_properties_definitions[0]
3061-
parameters = (
3062-
query_properties_definition_model.parameters
3063-
if query_properties_definition_model.parameters is not None
3064-
else {}
3065-
)
3066-
parameters.update(model.parameters or {})
3067-
query_properties_definition_model.parameters = parameters
30683049
query_properties = self._create_component_from_model(
3069-
model=query_properties_definition_model, config=config
3050+
model=query_properties_definitions[0], config=config
30703051
)
30713052
elif (
30723053
hasattr(model.requester, "fetch_properties_from_endpoint")

unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,3 +460,112 @@ def test_do_not_propagate_parameters_on_json_schema_object():
460460
actual_component = transformer.propagate_types_and_parameters("", component, {})
461461

462462
assert actual_component == expected_component
463+
464+
465+
def test_propagate_property_chunking():
466+
component = {
467+
"type": "DeclarativeStream",
468+
"streams": [
469+
{
470+
"type": "DeclarativeStream",
471+
"retriever": {
472+
"type": "SimpleRetriever",
473+
"requester": {
474+
"type": "HttpRequester",
475+
"url_base": "https://test.com",
476+
"request_parameters": {
477+
"properties": {
478+
"type": "QueryProperties",
479+
"property_list": {
480+
"type": "PropertiesFromEndpoint",
481+
"property_field_path": ["name"],
482+
"retriever": {
483+
"type": "SimpleRetriever",
484+
"requester": {
485+
"type": "HttpRequester",
486+
"url_base": "https://test.com",
487+
"authenticator": {
488+
"$ref": "#/definitions/authenticator"
489+
},
490+
"path": "/properties/{{ parameters.entity }}/properties",
491+
"http_method": "GET",
492+
"request_headers": {"Content-Type": "application/json"},
493+
},
494+
},
495+
},
496+
"property_chunking": {
497+
"type": "PropertyChunking",
498+
"property_limit_type": "characters",
499+
"property_limit": 15000,
500+
},
501+
}
502+
},
503+
},
504+
},
505+
"$parameters": {"entity": "test_entity"},
506+
}
507+
],
508+
}
509+
expected_component = {
510+
"streams": [
511+
{
512+
"$parameters": {"entity": "test_entity"},
513+
"entity": "test_entity",
514+
"retriever": {
515+
"$parameters": {"entity": "test_entity"},
516+
"entity": "test_entity",
517+
"requester": {
518+
"$parameters": {"entity": "test_entity"},
519+
"entity": "test_entity",
520+
"request_parameters": {
521+
"properties": {
522+
"$parameters": {"entity": "test_entity"},
523+
"entity": "test_entity",
524+
"property_chunking": {
525+
"$parameters": {"entity": "test_entity"},
526+
"entity": "test_entity",
527+
"property_limit": 15000,
528+
"property_limit_type": "characters",
529+
"type": "PropertyChunking",
530+
},
531+
"property_list": {
532+
"$parameters": {"entity": "test_entity"},
533+
"entity": "test_entity",
534+
"property_field_path": ["name"],
535+
"retriever": {
536+
"$parameters": {"entity": "test_entity"},
537+
"entity": "test_entity",
538+
"requester": {
539+
"$parameters": {"entity": "test_entity"},
540+
"authenticator": {
541+
"$ref": "#/definitions/authenticator"
542+
},
543+
"entity": "test_entity",
544+
"http_method": "GET",
545+
"path": "/properties/{{ "
546+
"parameters.entity "
547+
"}}/properties",
548+
"request_headers": {"Content-Type": "application/json"},
549+
"type": "HttpRequester",
550+
"url_base": "https://test.com",
551+
},
552+
"type": "SimpleRetriever",
553+
},
554+
"type": "PropertiesFromEndpoint",
555+
},
556+
"type": "QueryProperties",
557+
}
558+
},
559+
"type": "HttpRequester",
560+
"url_base": "https://test.com",
561+
},
562+
"type": "SimpleRetriever",
563+
},
564+
"type": "DeclarativeStream",
565+
}
566+
],
567+
"type": "DeclarativeStream",
568+
}
569+
transformer = ManifestComponentTransformer()
570+
actual_component = transformer.propagate_types_and_parameters("", component, {})
571+
assert actual_component == expected_component

0 commit comments

Comments
 (0)