diff --git a/airbyte_cdk/sources/declarative/checks/__init__.py b/airbyte_cdk/sources/declarative/checks/__init__.py index 6362e0613..87bcaa24d 100644 --- a/airbyte_cdk/sources/declarative/checks/__init__.py +++ b/airbyte_cdk/sources/declarative/checks/__init__.py @@ -7,7 +7,10 @@ from pydantic.v1 import BaseModel from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream -from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream +from airbyte_cdk.sources.declarative.checks.check_stream import ( + CheckStream, + DynamicStreamCheckConfig, +) from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.models import ( CheckDynamicStream as CheckDynamicStreamModel, @@ -21,4 +24,4 @@ "CheckDynamicStream": CheckDynamicStreamModel, } -__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"] +__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"] diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index c45159ec9..1123349cb 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -5,13 +5,23 @@ import logging import traceback from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Tuple +from typing import Any, Dict, List, Mapping, Optional, Tuple from airbyte_cdk import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy +@dataclass(frozen=True) +class DynamicStreamCheckConfig: + """Defines the configuration for dynamic stream during connection checking. This class specifies + what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation + and type enforcement.""" + + dynamic_stream_name: str + stream_count: int = 0 + + @dataclass class CheckStream(ConnectionChecker): """ @@ -23,34 +33,126 @@ class CheckStream(ConnectionChecker): stream_names: List[str] parameters: InitVar[Mapping[str, Any]] + dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters + if self.dynamic_streams_check_configs is None: + self.dynamic_streams_check_configs = [] + + def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]: + """Logs an error and returns a formatted error message.""" + error_message = f"Encountered an error while {action}. Error: {error}" + logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True) + return False, error_message def check_connection( self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] ) -> Tuple[bool, Any]: - streams = source.streams(config=config) + """Checks the connection to the source and its streams.""" + try: + streams = source.streams(config=config) + if not streams: + return False, f"No streams to connect to from source {source}" + except Exception as error: + return self._log_error(logger, "discovering streams", error) + stream_name_to_stream = {s.name: s for s in streams} - if len(streams) == 0: - return False, f"No streams to connect to from source {source}" for stream_name in self.stream_names: - if stream_name not in stream_name_to_stream.keys(): + if stream_name not in stream_name_to_stream: raise ValueError( - f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}." + f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." ) + stream_availability, message = self._check_stream_availability( + stream_name_to_stream, stream_name, logger + ) + if not stream_availability: + return stream_availability, message + + should_check_dynamic_streams = ( + hasattr(source, "resolved_manifest") + and hasattr(source, "dynamic_streams") + and self.dynamic_streams_check_configs + ) + + if should_check_dynamic_streams: + return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger) + + return True, None + + def _check_stream_availability( + self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger + ) -> Tuple[bool, Any]: + """Checks if streams are available.""" + availability_strategy = HttpAvailabilityStrategy() + try: stream = stream_name_to_stream[stream_name] - availability_strategy = HttpAvailabilityStrategy() + stream_is_available, reason = availability_strategy.check_availability(stream, logger) + if not stream_is_available: + message = f"Stream {stream_name} is not available: {reason}" + logger.warning(message) + return stream_is_available, message + except Exception as error: + return self._log_error(logger, f"checking availability of stream {stream_name}", error) + return True, None + + def _check_dynamic_streams_availability( + self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger + ) -> Tuple[bool, Any]: + """Checks the availability of dynamic streams.""" + dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method + dynamic_stream_name_to_dynamic_stream = { + ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams) + } + generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method + + for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__ + if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream: + return ( + False, + f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.", + ) + + generated = generated_streams.get(check_config.dynamic_stream_name, []) + stream_availability, message = self._check_generated_streams_availability( + generated, stream_name_to_stream, logger, check_config.stream_count + ) + if not stream_availability: + return stream_availability, message + + return True, None + + def _map_generated_streams( + self, dynamic_streams: List[Dict[str, Any]] + ) -> Dict[str, List[Dict[str, Any]]]: + """Maps dynamic stream names to their corresponding generated streams.""" + mapped_streams: Dict[str, List[Dict[str, Any]]] = {} + for stream in dynamic_streams: + mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream) + return mapped_streams + + def _check_generated_streams_availability( + self, + generated_streams: List[Dict[str, Any]], + stream_name_to_stream: Dict[str, Any], + logger: logging.Logger, + max_count: int, + ) -> Tuple[bool, Any]: + """Checks availability of generated dynamic streams.""" + availability_strategy = HttpAvailabilityStrategy() + for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]: + stream = stream_name_to_stream[declarative_stream["name"]] try: stream_is_available, reason = availability_strategy.check_availability( stream, logger ) if not stream_is_available: - return False, reason + message = f"Dynamic Stream {stream.name} is not available: {reason}" + logger.warning(message) + return False, message except Exception as error: - logger.error( - f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}" + return self._log_error( + logger, f"checking availability of dynamic stream {stream.name}", error ) - return False, f"Unable to connect to stream {stream_name} - {error}" return True, None diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2573e8b8a..46a202b19 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -316,7 +316,6 @@ definitions: type: object required: - type - - stream_names properties: type: type: string @@ -330,6 +329,28 @@ definitions: examples: - ["users"] - ["users", "contacts"] + dynamic_streams_check_configs: + type: array + items: + "$ref": "#/definitions/DynamicStreamCheckConfig" + DynamicStreamCheckConfig: + type: object + required: + - type + - dynamic_stream_name + properties: + type: + type: string + enum: [ DynamicStreamCheckConfig ] + dynamic_stream_name: + title: Dynamic Stream Name + description: The dynamic stream name. + type: string + stream_count: + title: Stream Count + description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used. + type: integer + default: 0 CheckDynamicStream: title: Dynamic Streams to Check description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 021a99729..c67cd958b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -42,13 +42,15 @@ class BearerAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class CheckStream(BaseModel): - type: Literal["CheckStream"] - stream_names: List[str] = Field( - ..., - description="Names of the streams to try reading from when running a check operation.", - examples=[["users"], ["users", "contacts"]], - title="Stream Names", +class DynamicStreamCheckConfig(BaseModel): + type: Literal["DynamicStreamCheckConfig"] + dynamic_stream_name: str = Field( + ..., description="The dynamic stream name.", title="Dynamic Stream Name" + ) + stream_count: Optional[int] = Field( + 0, + description="Numbers of the streams to try reading from when running a check operation.", + title="Stream Count", ) @@ -1523,6 +1525,17 @@ class AuthFlow(BaseModel): oauth_config_specification: Optional[OAuthConfigSpecification] = None +class CheckStream(BaseModel): + type: Literal["CheckStream"] + stream_names: Optional[List[str]] = Field( + None, + description="Names of the streams to try reading from when running a check operation.", + examples=[["users"], ["users", "contacts"]], + title="Stream Names", + ) + dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None + + class IncrementingCountCursor(BaseModel): type: Literal["IncrementingCountCursor"] cursor_field: str = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 22ef82112..4f4638190 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -54,7 +54,11 @@ SessionTokenProvider, TokenProvider, ) -from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream +from airbyte_cdk.sources.declarative.checks import ( + CheckDynamicStream, + CheckStream, + DynamicStreamCheckConfig, +) from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream @@ -218,6 +222,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DynamicSchemaLoader as DynamicSchemaLoaderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DynamicStreamCheckConfig as DynamicStreamCheckConfigModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) @@ -559,6 +566,7 @@ def _init_mappings(self) -> None: BasicHttpAuthenticatorModel: self.create_basic_http_authenticator, BearerAuthenticatorModel: self.create_bearer_authenticator, CheckStreamModel: self.create_check_stream, + DynamicStreamCheckConfigModel: self.create_dynamic_stream_check_config, CheckDynamicStreamModel: self.create_check_dynamic_stream, CompositeErrorHandlerModel: self.create_composite_error_handler, ConcurrencyLevelModel: self.create_concurrency_level, @@ -936,8 +944,36 @@ def create_bearer_authenticator( ) @staticmethod - def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream: - return CheckStream(stream_names=model.stream_names, parameters={}) + def create_dynamic_stream_check_config( + model: DynamicStreamCheckConfigModel, config: Config, **kwargs: Any + ) -> DynamicStreamCheckConfig: + return DynamicStreamCheckConfig( + dynamic_stream_name=model.dynamic_stream_name, + stream_count=model.stream_count or 0, + ) + + def create_check_stream( + self, model: CheckStreamModel, config: Config, **kwargs: Any + ) -> CheckStream: + if model.dynamic_streams_check_configs is None and model.stream_names is None: + raise ValueError( + "Expected either stream_names or dynamic_streams_check_configs to be set for CheckStream" + ) + + dynamic_streams_check_configs = ( + [ + self._create_component_from_model(model=dynamic_stream_check_config, config=config) + for dynamic_stream_check_config in model.dynamic_streams_check_configs + ] + if model.dynamic_streams_check_configs + else [] + ) + + return CheckStream( + stream_names=model.stream_names or [], + dynamic_streams_check_configs=dynamic_streams_check_configs, + parameters={}, + ) @staticmethod def create_check_dynamic_stream( diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index b5367b2b0..3cbaf8fd8 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -2,15 +2,22 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import json import logging +from copy import deepcopy from typing import Any, Iterable, Mapping, Optional from unittest.mock import MagicMock import pytest import requests +from jsonschema.exceptions import ValidationError from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse logger = logging.getLogger("test") config = dict() @@ -157,3 +164,529 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp assert stream_is_available == available_expectation for message in expected_messages: assert message in reason + + +_CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", + "custom_streams": [ + {"id": 3, "name": "item_3"}, + {"id": 4, "name": "item_4"}, + ], +} + +_MANIFEST_WITHOUT_CHECK_COMPONENT = { + "version": "6.7.0", + "type": "DeclarativeSource", + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "name": "http_dynamic_stream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + }, + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "ConfigComponentsResolver", + "stream_config": { + "type": "StreamConfig", + "configs_pointer": ["custom_streams"], + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + }, + ], + "streams": [ + { + "type": "DeclarativeStream", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/static", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "name": "static_stream", + "primary_key": "id", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + }, + "type": "object", + }, + }, + } + ], +} + + +@pytest.mark.parametrize( + "check_component, expected_result, expectation, response_code, expected_messages, request_count", + [ + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + True, + False, + 200, + [{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}], + 0, + id="test_check_only_static_streams", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 1, + } + ], + } + }, + True, + False, + 200, + [], + 0, + id="test_check_static_streams_and_http_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + } + ], + } + }, + True, + False, + 200, + [], + 0, + id="test_check_static_streams_and_config_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + True, + False, + 200, + [], + 0, + id="test_check_http_dynamic_stream_and_config_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + True, + False, + 200, + [], + 0, + id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 1000, + }, + ], + } + }, + True, + False, + 200, + [], + 1, + id="test_stream_count_gt_generated_streams", + ), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}}, + False, + True, + 200, + [], + 0, + id="test_non_existent_static_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "unknown_dynamic_stream", + "stream_count": 1, + } + ], + } + }, + False, + False, + 200, + [], + 0, + id="test_non_existent_dynamic_stream", + ), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + False, + False, + 404, + ["Not found. The requested resource was not found on the server."], + 0, + id="test_stream_unavailable_unhandled_error", + ), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + False, + False, + 403, + ["Forbidden. You don't have permission to access this resource."], + 0, + id="test_stream_unavailable_handled_error", + ), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + False, + False, + 401, + ["Unauthorized. Please ensure you are authenticated correctly."], + 0, + id="test_stream_unauthorized_error", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + False, + False, + 404, + ["Not found. The requested resource was not found on the server."], + 0, + id="test_dynamic_stream_unavailable_unhandled_error", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + False, + False, + 403, + ["Forbidden. You don't have permission to access this resource."], + 0, + id="test_dynamic_stream_unavailable_handled_error", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + False, + False, + 401, + ["Unauthorized. Please ensure you are authenticated correctly."], + 0, + id="test_dynamic_stream_unauthorized_error", + ), + ], +) +def test_check_stream1( + check_component, expected_result, expectation, response_code, expected_messages, request_count +): + manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **check_component} + + with HttpMocker() as http_mocker: + static_stream_request = HttpRequest(url="https://api.test.com/static") + static_stream_response = HttpResponse( + body=json.dumps(expected_messages), status_code=response_code + ) + http_mocker.get(static_stream_request, static_stream_response) + + items_request = HttpRequest(url="https://api.test.com/items") + items_response = HttpResponse( + body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}]) + ) + http_mocker.get(items_request, items_response) + + item_request_1 = HttpRequest(url="https://api.test.com/items/1") + item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) + http_mocker.get(item_request_1, item_response) + + item_request_2 = HttpRequest(url="https://api.test.com/items/2") + item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) + http_mocker.get(item_request_2, item_response) + + item_request_3 = HttpRequest(url="https://api.test.com/items/3") + item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) + http_mocker.get(item_request_3, item_response) + + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=_CONFIG, + catalog=None, + state=None, + ) + if expectation: + with pytest.raises(ValueError): + source.check_connection(logger, _CONFIG) + else: + stream_is_available, reason = source.check_connection(logger, _CONFIG) + http_mocker.assert_number_of_calls(item_request_2, request_count) + assert stream_is_available == expected_result + + +def test_check_stream_missing_fields(): + """Test if ValueError is raised when dynamic_streams_check_configs is missing required fields.""" + manifest = { + **deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), + **{ + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig"}], + } + }, + } + with pytest.raises(ValidationError): + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=_CONFIG, + catalog=None, + state=None, + ) + + +def test_check_stream_only_type_provided(): + manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}} + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=_CONFIG, + catalog=None, + state=None, + ) + with pytest.raises(ValueError): + source.check_connection(logger, _CONFIG)