diff --git a/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py b/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py index 8ef1c89a4..cb39f56ba 100644 --- a/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py @@ -21,6 +21,7 @@ ) from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState +from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths @dataclass @@ -122,6 +123,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: if not self.cursor_datetime_formats: self.cursor_datetime_formats = [self.datetime_format] + _validate_component_request_option_paths( + self.config, self.start_time_option, self.end_time_option + ) + def get_stream_state(self) -> StreamState: return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py b/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py index 6fb412cd9..bd640ad19 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py @@ -23,6 +23,9 @@ ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState +from airbyte_cdk.utils.mapping_helpers import ( + _validate_component_request_option_paths, +) @dataclass @@ -113,6 +116,13 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: if isinstance(self.url_base, str): self.url_base = InterpolatedString(string=self.url_base, parameters=parameters) + if self.page_token_option and not isinstance(self.page_token_option, RequestPath): + _validate_component_request_option_paths( + self.config, + self.page_size_option, + self.page_token_option, + ) + def get_initial_token(self) -> Optional[Any]: """ Return the page token that should be used for the first request of a stream diff --git a/airbyte_cdk/utils/mapping_helpers.py b/airbyte_cdk/utils/mapping_helpers.py index c5682c288..bce3a849b 100644 --- a/airbyte_cdk/utils/mapping_helpers.py +++ b/airbyte_cdk/utils/mapping_helpers.py @@ -6,6 +6,12 @@ import copy from typing import Any, Dict, List, Mapping, Optional, Union +from airbyte_cdk.sources.declarative.requesters.request_option import ( + RequestOption, + RequestOptionType, +) +from airbyte_cdk.sources.types import Config + def _merge_mappings( target: Dict[str, Any], @@ -33,13 +39,17 @@ def _merge_mappings( if isinstance(target_value, dict) and isinstance(source_value, dict): # Only body_json supports nested_structures if not allow_same_value_merge: - raise ValueError(f"Duplicate keys found: {'.'.join(current_path)}") + raise ValueError( + f"Request body collision, duplicate keys detected at key path: {'.'.join(current_path)}. Please ensure that all keys in the request are unique." + ) # If both are dictionaries, recursively merge them _merge_mappings(target_value, source_value, current_path, allow_same_value_merge) elif not allow_same_value_merge or target_value != source_value: # If same key has different values, that's a conflict - raise ValueError(f"Duplicate keys found: {'.'.join(current_path)}") + raise ValueError( + f"Request body collision, duplicate keys detected at key path: {'.'.join(current_path)}. Please ensure that all keys in the request are unique." + ) else: # No conflict, just copy the value (using deepcopy for nested structures) target[key] = copy.deepcopy(source_value) @@ -102,3 +112,34 @@ def combine_mappings( _merge_mappings(result, mapping, allow_same_value_merge=allow_same_value_merge) return result + + +def _validate_component_request_option_paths( + config: Config, *request_options: Optional[RequestOption] +) -> None: + """ + Validates that a component with multiple request options does not have conflicting paths. + Uses dummy values for validation since actual values might not be available at init time. + """ + grouped_options: Dict[RequestOptionType, List[RequestOption]] = {} + for option in request_options: + if option: + grouped_options.setdefault(option.inject_into, []).append(option) + + for inject_type, options in grouped_options.items(): + if len(options) <= 1: + continue + + option_dicts: List[Optional[Union[Mapping[str, Any], str]]] = [] + for i, option in enumerate(options): + option_dict: Dict[str, Any] = {} + # Use indexed dummy values to ensure we catch conflicts + option.inject_into_request(option_dict, f"dummy_value_{i}", config) + option_dicts.append(option_dict) + + try: + combine_mappings( + option_dicts, allow_same_value_merge=(inject_type == RequestOptionType.body_json) + ) + except ValueError as error: + raise ValueError(error) diff --git a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py index 57b6d9d34..944a0eda9 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py @@ -447,3 +447,29 @@ def test_paginator_with_page_option_no_page_size(): parameters={}, ), ) + + +def test_request_option_mapping_validator(): + pagination_strategy = PageIncrement( + config={}, page_size=1, start_from_page=0, parameters={}, inject_on_first_request=True + ) + + with pytest.raises(ValueError): + ( + DefaultPaginator( + page_size_option=RequestOption( + field_path=["variables", "limit"], + inject_into=RequestOptionType.body_json, + parameters={}, + ), + page_token_option=RequestOption( + field_path=["variables", "limit"], + inject_into=RequestOptionType.body_json, + parameters={}, + ), + pagination_strategy=pagination_strategy, + config=MagicMock(), + url_base=MagicMock(), + parameters={}, + ), + ) diff --git a/unit_tests/utils/test_mapping_helpers.py b/unit_tests/utils/test_mapping_helpers.py index 124bf4565..f0370ba86 100644 --- a/unit_tests/utils/test_mapping_helpers.py +++ b/unit_tests/utils/test_mapping_helpers.py @@ -1,6 +1,11 @@ import pytest -from airbyte_cdk.utils.mapping_helpers import combine_mappings +from airbyte_cdk.utils.mapping_helpers import ( + RequestOption, + RequestOptionType, + _validate_component_request_option_paths, + combine_mappings, +) @pytest.mark.parametrize( @@ -46,14 +51,14 @@ def test_string_handling(test_name, mappings, expected_result, expected_error): @pytest.mark.parametrize( "test_name, mappings, expected_error", [ - ("duplicate_keys_same_value", [{"a": 1}, {"a": 1}], "Duplicate keys found"), - ("duplicate_keys_different_value", [{"a": 1}, {"a": 2}], "Duplicate keys found"), + ("duplicate_keys_same_value", [{"a": 1}, {"a": 1}], "duplicate keys detected"), + ("duplicate_keys_different_value", [{"a": 1}, {"a": 2}], "duplicate keys detected"), ( "nested_structure_not_allowed", [{"a": {"b": 1}}, {"a": {"c": 2}}], - "Duplicate keys found", + "duplicate keys detected", ), - ("any_nesting_not_allowed", [{"a": {"b": 1}}, {"a": {"d": 2}}], "Duplicate keys found"), + ("any_nesting_not_allowed", [{"a": {"b": 1}}, {"a": {"d": 2}}], "duplicate keys detected"), ], ) def test_non_body_json_requests(test_name, mappings, expected_error): @@ -96,13 +101,13 @@ def test_non_body_json_requests(test_name, mappings, expected_error): "nested_conflict", [{"a": {"b": 1}}, {"a": {"b": 2}}], None, - "Duplicate keys found", + "duplicate keys detected", ), ( "type_conflict", [{"a": 1}, {"a": {"b": 2}}], None, - "Duplicate keys found", + "duplicate keys detected", ), ], ) @@ -113,3 +118,104 @@ def test_body_json_requests(test_name, mappings, expected_result, expected_error combine_mappings(mappings, allow_same_value_merge=True) else: assert combine_mappings(mappings, allow_same_value_merge=True) == expected_result + + +@pytest.fixture +def mock_config() -> dict[str, str]: + return {"test": "config"} + + +@pytest.mark.parametrize( + "test_name, option1, option2, should_raise", + [ + ( + "different_fields", + RequestOption( + field_name="field1", inject_into=RequestOptionType.body_json, parameters={} + ), + RequestOption( + field_name="field2", inject_into=RequestOptionType.body_json, parameters={} + ), + False, + ), + ( + "same_field_name_header", + RequestOption(field_name="field", inject_into=RequestOptionType.header, parameters={}), + RequestOption(field_name="field", inject_into=RequestOptionType.header, parameters={}), + True, + ), + ( + "different_nested_paths", + RequestOption( + field_path=["data", "query1", "limit"], + inject_into=RequestOptionType.body_json, + parameters={}, + ), + RequestOption( + field_path=["data", "query2", "limit"], + inject_into=RequestOptionType.body_json, + parameters={}, + ), + False, + ), + ( + "same_nested_paths", + RequestOption( + field_path=["data", "query", "limit"], + inject_into=RequestOptionType.body_json, + parameters={}, + ), + RequestOption( + field_path=["data", "query", "limit"], + inject_into=RequestOptionType.body_json, + parameters={}, + ), + True, + ), + ( + "different_inject_types", + RequestOption(field_name="field", inject_into=RequestOptionType.header, parameters={}), + RequestOption( + field_name="field", inject_into=RequestOptionType.body_json, parameters={} + ), + False, + ), + ], +) +def test_request_option_validation(test_name, option1, option2, should_raise, mock_config): + """Test various combinations of request option validation""" + if should_raise: + with pytest.raises(ValueError, match="duplicate keys detected"): + _validate_component_request_option_paths(mock_config, option1, option2) + else: + _validate_component_request_option_paths(mock_config, option1, option2) + + +@pytest.mark.parametrize( + "test_name, options", + [ + ( + "none_options", + [ + None, + RequestOption( + field_name="field", inject_into=RequestOptionType.header, parameters={} + ), + None, + ], + ), + ( + "single_option", + [ + RequestOption( + field_name="field", inject_into=RequestOptionType.header, parameters={} + ) + ], + ), + ("all_none", [None, None, None]), + ("empty_list", []), + ], +) +def test_edge_cases(test_name, options, mock_config): + """Test edge cases like None values and single options""" + _validate_component_request_option_paths(mock_config, *options)