diff --git a/airbyte_cdk/cli/source_declarative_manifest/_run.py b/airbyte_cdk/cli/source_declarative_manifest/_run.py index df36b3df1..b2c24fe5c 100644 --- a/airbyte_cdk/cli/source_declarative_manifest/_run.py +++ b/airbyte_cdk/cli/source_declarative_manifest/_run.py @@ -58,6 +58,7 @@ def __init__( catalog: ConfiguredAirbyteCatalog | None, config: MutableMapping[str, Any] | None, state: TState, + config_path: str | None = None, **kwargs: Any, ) -> None: """ @@ -76,6 +77,7 @@ def __init__( config=config, state=state, # type: ignore [arg-type] path_to_yaml="manifest.yaml", + config_path=config_path, ) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 572e487dc..ba08f8b94 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -74,6 +74,7 @@ def __init__( debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[ModelToComponentFactory] = None, + config_path: Optional[str] = None, **kwargs: Any, ) -> None: # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source @@ -96,6 +97,7 @@ def __init__( debug=debug, emit_connector_builder_messages=emit_connector_builder_messages, component_factory=component_factory, + config_path=config_path, ) concurrency_level_from_manifest = self._source_config.get("concurrency_level") diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index c98372be7..70c417054 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import json @@ -8,13 +8,15 @@ from copy import deepcopy from importlib import metadata from types import ModuleType -from typing import Any, Dict, Iterator, List, Mapping, Optional, Set +from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set +import orjson import yaml from jsonschema.exceptions import ValidationError from jsonschema.validators import validate from packaging.version import InvalidVersion, Version +from airbyte_cdk.config_observation import create_connector_config_control_message from airbyte_cdk.connector_builder.models import ( LogMessage as ConnectorBuilderLogMessage, ) @@ -29,6 +31,7 @@ ConnectorSpecification, FailureType, ) +from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource @@ -57,9 +60,10 @@ ModelToComponentFactory, ) from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING +from airbyte_cdk.sources.declarative.spec.spec import Spec from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.core import Stream -from airbyte_cdk.sources.types import ConnectionDefinition +from airbyte_cdk.sources.types import Config, ConnectionDefinition from airbyte_cdk.sources.utils.slice_logger import ( AlwaysLogSliceLogger, DebugSliceLogger, @@ -99,6 +103,7 @@ def __init__( component_factory: Optional[ModelToComponentFactory] = None, migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False, + config_path: Optional[str] = None, ) -> None: """ Args: @@ -108,6 +113,7 @@ def __init__( emit_connector_builder_messages: True if messages should be emitted to the connector builder. component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. normalize_manifest: Optional flag to indicate if the manifest should be normalized. + config_path: Optional path to the config file. """ self.logger = logging.getLogger(f"airbyte.{self.name}") self._should_normalize = normalize_manifest @@ -130,7 +136,6 @@ def __init__( self._slice_logger: SliceLogger = ( AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() ) - self._config = config or {} # resolve all components in the manifest self._source_config = self._pre_process_manifest(dict(source_config)) @@ -139,6 +144,12 @@ def __init__( # apply additional post-processing to the manifest self._post_process_manifest() + spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") + self._spec_component: Optional[Spec] = ( + self._constructor.create_component(SpecModel, spec, dict()) if spec else None + ) + self._config = self._migrate_and_transform_config(config_path, config) or {} + @property def resolved_manifest(self) -> Mapping[str, Any]: """ @@ -199,6 +210,30 @@ def _normalize_manifest(self) -> None: normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) self._source_config = normalizer.normalize() + def _migrate_and_transform_config( + self, + config_path: Optional[str], + config: Optional[Config], + ) -> Optional[Config]: + if not config: + return None + if not self._spec_component: + return config + mutable_config = dict(config) + self._spec_component.migrate_config(mutable_config) + if mutable_config != config: + if config_path: + with open(config_path, "w") as f: + json.dump(mutable_config, f) + self.message_repository.emit_message( + create_connector_config_control_message(mutable_config) + ) + # We have no mechanism for consuming the queue, so we print the messages to stdout + for message in self.message_repository.consume_queue(): + print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) + self._spec_component.transform_config(mutable_config) + return mutable_config + def _migrate_manifest(self) -> None: """ This method is used to migrate the manifest. It should be called after the manifest has been validated. @@ -255,6 +290,9 @@ def connection_checker(self) -> ConnectionChecker: ) def streams(self, config: Mapping[str, Any]) -> List[Stream]: + if self._spec_component: + self._spec_component.validate_config(config) + self._emit_manifest_debug_message( extra_args={ "source_name": self.name, @@ -355,14 +393,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: } ) - spec = self._source_config.get("spec") - if spec: - if "type" not in spec: - spec["type"] = "Spec" - spec_component = self._constructor.create_component(SpecModel, spec, dict()) - return spec_component.generate_spec() - else: - return super().spec(logger) + return ( + self._spec_component.generate_spec() if self._spec_component else super().spec(logger) + ) def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: self._configure_logger_level(logger) diff --git a/airbyte_cdk/sources/declarative/spec/spec.py b/airbyte_cdk/sources/declarative/spec/spec.py index 945289686..20fb3c5c9 100644 --- a/airbyte_cdk/sources/declarative/spec/spec.py +++ b/airbyte_cdk/sources/declarative/spec/spec.py @@ -1,28 +1,21 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # -import json from dataclasses import InitVar, dataclass, field from typing import Any, List, Mapping, MutableMapping, Optional -import orjson - -from airbyte_cdk.config_observation import create_connector_config_control_message -from airbyte_cdk.entrypoint import AirbyteEntrypoint from airbyte_cdk.models import ( AdvancedAuth, ConnectorSpecification, ConnectorSpecificationSerializer, ) -from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer from airbyte_cdk.sources.declarative.models.declarative_component_schema import AuthFlow from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import ( ConfigTransformation, ) from airbyte_cdk.sources.declarative.validators.validator import Validator from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository -from airbyte_cdk.sources.source import Source @dataclass @@ -48,7 +41,6 @@ class Spec: config_migrations: List[ConfigMigration] = field(default_factory=list) config_transformations: List[ConfigTransformation] = field(default_factory=list) config_validations: List[Validator] = field(default_factory=list) - message_repository: MessageRepository = InMemoryMessageRepository() def generate_spec(self) -> ConnectorSpecification: """ @@ -69,34 +61,15 @@ def generate_spec(self) -> ConnectorSpecification: # We remap these keys to camel case because that's the existing format expected by the rest of the platform return ConnectorSpecificationSerializer.load(obj) - def migrate_config( - self, args: List[str], source: Source, config: MutableMapping[str, Any] - ) -> None: + def migrate_config(self, config: MutableMapping[str, Any]) -> None: """ - Apply all specified config transformations to the provided config and save the modified config to the given path and emit a control message. + Apply all specified config transformations to the provided config and emit a control message. - :param args: Command line arguments - :param source: Source instance :param config: The user-provided config to migrate """ - config_path = AirbyteEntrypoint(source).extract_config(args) - - if not config_path: - return - - mutable_config = dict(config) for migration in self.config_migrations: for transformation in migration.transformations: - transformation.transform(mutable_config) - - if mutable_config != config: - with open(config_path, "w") as f: - json.dump(mutable_config, f) - self.message_repository.emit_message( - create_connector_config_control_message(mutable_config) - ) - for message in self.message_repository.consume_queue(): - print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) + transformation.transform(config) def transform_config(self, config: MutableMapping[str, Any]) -> None: """ @@ -107,7 +80,7 @@ def transform_config(self, config: MutableMapping[str, Any]) -> None: for transformation in self.config_transformations: transformation.transform(config) - def validate_config(self, config: MutableMapping[str, Any]) -> None: + def validate_config(self, config: Mapping[str, Any]) -> None: """ Apply all config validations to the provided config. diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 93bdc55e9..003578738 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -24,6 +24,7 @@ def __init__( catalog: Optional[ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[AirbyteStateMessage]] = None, + config_path: Optional[str] = None, ) -> None: """ :param path_to_yaml: Path to the yaml file describing the source @@ -36,6 +37,7 @@ def __init__( config=config or {}, state=state or [], source_config=source_config, + config_path=config_path, ) def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: diff --git a/unit_tests/sources/declarative/spec/test_spec.py b/unit_tests/sources/declarative/spec/test_spec.py index e25e692a9..75287082c 100644 --- a/unit_tests/sources/declarative/spec/test_spec.py +++ b/unit_tests/sources/declarative/spec/test_spec.py @@ -2,8 +2,6 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # -from unittest.mock import Mock, mock_open - import pytest from airbyte_cdk.models import ( @@ -161,108 +159,6 @@ def test_spec(spec, expected_connection_specification) -> None: assert spec.generate_spec() == expected_connection_specification -@pytest.fixture -def migration_mocks(monkeypatch): - mock_message_repository = Mock() - mock_message_repository.consume_queue.return_value = [Mock()] - - mock_source = Mock() - mock_entrypoint = Mock() - mock_entrypoint.extract_config.return_value = "/fake/config/path" - monkeypatch.setattr( - "airbyte_cdk.sources.declarative.spec.spec.AirbyteEntrypoint", lambda _: mock_entrypoint - ) - - _mock_open = mock_open() - mock_json_dump = Mock() - mock_print = Mock() - mock_serializer_dump = Mock() - - mock_decoded_bytes = Mock() - mock_decoded_bytes.decode.return_value = "decoded_message" - mock_orjson_dumps = Mock(return_value=mock_decoded_bytes) - - monkeypatch.setattr("builtins.open", _mock_open) - monkeypatch.setattr("json.dump", mock_json_dump) - monkeypatch.setattr("builtins.print", mock_print) - monkeypatch.setattr( - "airbyte_cdk.models.airbyte_protocol_serializers.AirbyteMessageSerializer.dump", - mock_serializer_dump, - ) - monkeypatch.setattr("airbyte_cdk.sources.declarative.spec.spec.orjson.dumps", mock_orjson_dumps) - - return { - "message_repository": mock_message_repository, - "source": mock_source, - "open": _mock_open, - "json_dump": mock_json_dump, - "print": mock_print, - "serializer_dump": mock_serializer_dump, - "orjson_dumps": mock_orjson_dumps, - "decoded_bytes": mock_decoded_bytes, - } - - -def test_given_unmigrated_config_when_migrating_then_config_is_migrated(migration_mocks) -> None: - input_config = {"planet": "CRSC"} - - spec = component_spec( - connection_specification={}, - parameters={}, - config_migrations=[ - ConfigMigration( - description="Test migration", - transformations=[ - ConfigRemapField( - map={"CRSC": "Coruscant"}, field_path=["planet"], config=input_config - ) - ], - ) - ], - ) - spec.message_repository = migration_mocks["message_repository"] - - spec.migrate_config(["spec"], migration_mocks["source"], input_config) - - migration_mocks["message_repository"].emit_message.assert_called_once() - migration_mocks["open"].assert_called_once_with("/fake/config/path", "w") - migration_mocks["json_dump"].assert_called_once() - migration_mocks["print"].assert_called() - migration_mocks["serializer_dump"].assert_called() - migration_mocks["orjson_dumps"].assert_called() - migration_mocks["decoded_bytes"].decode.assert_called() - - -def test_given_already_migrated_config_no_control_message_is_emitted(migration_mocks) -> None: - input_config = {"planet": "Coruscant"} - - spec = component_spec( - connection_specification={}, - parameters={}, - config_migrations=[ - ConfigMigration( - description="Test migration", - transformations=[ - ConfigRemapField( - map={"CRSC": "Coruscant"}, field_path=["planet"], config=input_config - ) - ], - ) - ], - ) - spec.message_repository = migration_mocks["message_repository"] - - spec.migrate_config(["spec"], migration_mocks["source"], input_config) - - migration_mocks["message_repository"].emit_message.assert_not_called() - migration_mocks["open"].assert_not_called() - migration_mocks["json_dump"].assert_not_called() - migration_mocks["print"].assert_not_called() - migration_mocks["serializer_dump"].assert_not_called() - migration_mocks["orjson_dumps"].assert_not_called() - migration_mocks["decoded_bytes"].decode.assert_not_called() - - def test_given_list_of_transformations_when_transform_config_then_config_is_transformed() -> None: input_config = {"planet_code": "CRSC"} expected_config = { diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index d6de73db8..f48d8fd83 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -9,7 +9,7 @@ from copy import deepcopy from pathlib import Path from typing import Any, List, Mapping -from unittest.mock import call, patch +from unittest.mock import Mock, call, mock_open, patch import pytest import requests @@ -30,6 +30,9 @@ ) from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( + ModelToComponentFactory, +) from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever logger = logging.getLogger("airbyte") @@ -2099,3 +2102,362 @@ def test_slice_checkpoint(test_name, manifest, pages, expected_states_qty): with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages): states = [message.state for message in _run_read(manifest, _stream_name) if message.state] assert len(states) == expected_states_qty + + +@pytest.fixture +def migration_mocks(monkeypatch): + mock_message_repository = Mock() + mock_message_repository.consume_queue.return_value = [Mock()] + + _mock_open = mock_open() + mock_json_dump = Mock() + mock_print = Mock() + mock_serializer_dump = Mock() + + mock_decoded_bytes = Mock() + mock_decoded_bytes.decode.return_value = "decoded_message" + mock_orjson_dumps = Mock(return_value=mock_decoded_bytes) + + monkeypatch.setattr("builtins.open", _mock_open) + monkeypatch.setattr("json.dump", mock_json_dump) + monkeypatch.setattr("builtins.print", mock_print) + monkeypatch.setattr( + "airbyte_cdk.models.airbyte_protocol_serializers.AirbyteMessageSerializer.dump", + mock_serializer_dump, + ) + monkeypatch.setattr( + "airbyte_cdk.sources.declarative.manifest_declarative_source.orjson.dumps", + mock_orjson_dumps, + ) + + return { + "message_repository": mock_message_repository, + "open": _mock_open, + "json_dump": mock_json_dump, + "print": mock_print, + "serializer_dump": mock_serializer_dump, + "orjson_dumps": mock_orjson_dumps, + "decoded_bytes": mock_decoded_bytes, + } + + +def test_given_unmigrated_config_when_migrating_then_config_is_migrated(migration_mocks) -> None: + input_config = {"planet": "CRSC"} + + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + "config_normalization_rules": { + "config_migrations": [ + { + "type": "ConfigMigration", + "description": "Test migration", + "transformations": [ + { + "type": "ConfigRemapField", + "map": {"CRSC": "Coruscant"}, + "field_path": ["planet"], + } + ], + } + ], + }, + }, + } + + ManifestDeclarativeSource( + source_config=manifest, + config=input_config, + config_path="/fake/config/path", + component_factory=ModelToComponentFactory( + message_repository=migration_mocks["message_repository"], + ), + ) + + migration_mocks["message_repository"].emit_message.assert_called_once() + migration_mocks["open"].assert_called_once_with("/fake/config/path", "w") + migration_mocks["json_dump"].assert_called_once() + migration_mocks["print"].assert_called() + migration_mocks["serializer_dump"].assert_called() + migration_mocks["orjson_dumps"].assert_called() + migration_mocks["decoded_bytes"].decode.assert_called() + + +def test_given_already_migrated_config_no_control_message_is_emitted(migration_mocks) -> None: + input_config = {"planet": "Coruscant"} + + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + "config_normalization_rules": { + "config_migrations": [ + { + "type": "ConfigMigration", + "description": "Test migration", + "transformations": [ + { + "type": "ConfigRemapField", + "map": {"CRSC": "Coruscant"}, + "field_path": ["planet"], + } + ], + } + ], + }, + }, + } + + ManifestDeclarativeSource( + source_config=manifest, + config=input_config, + config_path="/fake/config/path", + component_factory=ModelToComponentFactory( + message_repository=migration_mocks["message_repository"], + ), + ) + + migration_mocks["message_repository"].emit_message.assert_not_called() + migration_mocks["open"].assert_not_called() + migration_mocks["json_dump"].assert_not_called() + migration_mocks["print"].assert_not_called() + migration_mocks["serializer_dump"].assert_not_called() + migration_mocks["orjson_dumps"].assert_not_called() + migration_mocks["decoded_bytes"].decode.assert_not_called() + + +def test_given_transformations_config_is_transformed(): + input_config = {"planet": "Coruscant"} + + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + "config_normalization_rules": { + "transformations": [ + { + "type": "ConfigRemapField", + "map": {"CRSC": "Coruscant"}, + "field_path": ["planet"], + } + ], + }, + }, + } + + source = ManifestDeclarativeSource( + source_config=manifest, + config=input_config, + ) + + assert source._config == {"planet": "Coruscant"} + + +def test_given_valid_config_streams_validates_config_and_does_not_raise(): + input_config = {"schema_to_validate": {"planet": "Coruscant"}} + + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + "parameters": {}, + "config_normalization_rules": { + "validations": [ + { + "type": "DpathValidator", + "field_path": ["schema_to_validate"], + "validation_strategy": { + "type": "ValidateAdheresToSchema", + "base_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Test Spec", + "type": "object", + "properties": {"planet": {"type": "string"}}, + "required": ["planet"], + "additionalProperties": False, + }, + }, + } + ], + }, + }, + } + + source = ManifestDeclarativeSource( + source_config=manifest, + ) + + source.streams(input_config) + + +def test_given_invalid_config_streams_validates_config_and_raises(): + input_config = {"schema_to_validate": {"will_fail": "Coruscant"}} + + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + "parameters": {}, + "config_normalization_rules": { + "validations": [ + { + "type": "DpathValidator", + "field_path": ["schema_to_validate"], + "validation_strategy": { + "type": "ValidateAdheresToSchema", + "base_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Test Spec", + "type": "object", + "properties": {"planet": {"type": "string"}}, + "required": ["planet"], + "additionalProperties": False, + }, + }, + } + ], + }, + }, + } + source = ManifestDeclarativeSource( + source_config=manifest, + ) + + with pytest.raises(ValueError): + source.streams(input_config)