Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6694cd1
implement validators and transformer
pnilan May 6, 2025
4dbd363
create config transformations
pnilan May 6, 2025
03e776c
remove unnecessary validation strategies
pnilan May 7, 2025
41f376b
chore: format code
pnilan May 7, 2025
454cb78
add tests for dpath validator
pnilan May 7, 2025
54f9f9f
add predicate validator tests
pnilan May 7, 2025
7881f9f
add tests for RemapField
pnilan May 8, 2025
f8c252b
create tests for ValidateAdheresToSchema
pnilan May 8, 2025
c492b81
chore: type check
pnilan May 8, 2025
48e5ab0
chore: lint
pnilan May 8, 2025
3623325
add test for json strings
pnilan May 8, 2025
dad6100
fix errant inclusion
pnilan May 8, 2025
565b709
add json string parsing to ValidateAdheresToSchema
pnilan May 8, 2025
01415e2
chore: lint
pnilan May 8, 2025
7149962
Merge branch 'main' into pnilan/feat/implement-validators
pnilan May 8, 2025
953ad41
Merge branch 'main' into pnilan/feat/implement-validators
pnilan May 8, 2025
53a5724
fix anyio issue
pnilan May 8, 2025
d3287a1
add declarative component schema
pnilan May 8, 2025
f72efc0
fix assertions
pnilan May 8, 2025
7927a14
remove re-raise
pnilan May 8, 2025
cf1b01c
update tests and error handling for dpath validator
pnilan May 9, 2025
4727b28
fix predicate validator test
pnilan May 9, 2025
a29e424
Merge branch 'pnilan/feat/implement-validators' into pnilan/feat/exte…
pnilan May 12, 2025
5235e6c
update typo
pnilan May 12, 2025
4bdc3a7
initialize empty arrays
pnilan May 12, 2025
45b47f4
initialize empty list via field
pnilan May 12, 2025
fa04b9d
update create_spec to build migration/transform/validation components
pnilan May 12, 2025
036c2e5
chore: type-check, lint, format
pnilan May 12, 2025
711384c
implement config transformations: AddFields and RemoveFields`
pnilan May 13, 2025
70785b8
Merge branch 'pnilan/feat/implement-validators' into pnilan/feat/exte…
pnilan May 13, 2025
fb7d1e9
fix module and classname conflicts
pnilan May 13, 2025
c64e588
chore: lint
pnilan May 13, 2025
01cf5a6
update remap to handle interpolated keys/values
pnilan May 13, 2025
a2dc105
chore: format
pnilan May 13, 2025
18154a1
Merge branch 'pnilan/feat/implement-validators' into pnilan/feat/exte…
pnilan May 13, 2025
4e6ed3b
update component schema for new transformations
pnilan May 14, 2025
833d9e7
update transformations per comments
pnilan May 14, 2025
c251f42
Merge branch 'pnilan/feat/implement-validators' into pnilan/feat/exte…
pnilan May 14, 2025
ff10aa3
add ConfigMigration class and new spec tests
pnilan May 15, 2025
e36067a
Merge branch 'main' into pnilan/feat/extend-spec-class-for-config-mig…
pnilan May 15, 2025
12043e5
chore: format
pnilan May 15, 2025
9cc4d11
remove errant dependencies
pnilan May 15, 2025
1d908a4
fix errors
pnilan May 15, 2025
5f3f0c1
revert erroneous AI deletion
pnilan May 15, 2025
ded5fcb
revert erroneous AI deletion
pnilan May 15, 2025
3638129
Merge branch 'main' into pnilan/feat/extend-spec-class-for-config-mig…
pnilan May 19, 2025
3983495
update per comments
pnilan May 19, 2025
2b684e7
remove dagger-io
pnilan May 19, 2025
b116bc9
Merge branch 'pnilan/feat/extend-spec-class-for-config-migrations' in…
pnilan May 19, 2025
07c96e0
fix test
pnilan May 19, 2025
5a6e5fc
Merge branch 'pnilan/feat/extend-spec-class-for-config-migrations' in…
pnilan May 19, 2025
2f6bb13
add config migration/transformation/validation execution
pnilan May 20, 2025
8193331
Merge branch 'main' into pnilan/feat/update-source-to-invoke-config-m…
pnilan May 20, 2025
9530e66
fix merge error
pnilan May 20, 2025
c2fdffa
update docstring
pnilan May 20, 2025
1fe042e
update config path
pnilan May 20, 2025
4c19c87
update config migration/transformation/validation flow
pnilan May 21, 2025
c7f090e
remove unused imports from spec.py
pnilan May 21, 2025
bdcacbb
fix migration tests
pnilan May 21, 2025
b945a3c
add manifest declarative source validation test
pnilan May 21, 2025
7646d22
add transformation test
pnilan May 21, 2025
0b02e76
fix errant imports
pnilan May 21, 2025
a07aa39
add config_path param
pnilan May 21, 2025
f8199e1
update per comments
pnilan May 22, 2025
4e2e8fc
revert to include spec/spec_component check
pnilan May 22, 2025
5ca4561
encapsulate migrate and transform and return Config
pnilan May 22, 2025
273a886
update typing
pnilan May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
catalog: ConfiguredAirbyteCatalog | None,
config: MutableMapping[str, Any] | None,
state: TState,
config_path: str | None = None,
**kwargs: Any,
) -> None:
"""
Expand All @@ -76,6 +77,7 @@ def __init__(
config=config,
state=state, # type: ignore [arg-type]
path_to_yaml="manifest.yaml",
config_path=config_path,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(
debug: bool = False,
emit_connector_builder_messages: bool = False,
component_factory: Optional[ModelToComponentFactory] = None,
config_path: Optional[str] = None,
**kwargs: Any,
) -> None:
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
Expand All @@ -96,6 +97,7 @@ def __init__(
debug=debug,
emit_connector_builder_messages=emit_connector_builder_messages,
component_factory=component_factory,
config_path=config_path,
)

concurrency_level_from_manifest = self._source_config.get("concurrency_level")
Expand Down
57 changes: 45 additions & 12 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import json
Expand All @@ -8,13 +8,15 @@
from copy import deepcopy
from importlib import metadata
from types import ModuleType
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set

import orjson
import yaml
from jsonschema.exceptions import ValidationError
from jsonschema.validators import validate
from packaging.version import InvalidVersion, Version

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.connector_builder.models import (
LogMessage as ConnectorBuilderLogMessage,
)
Expand All @@ -29,6 +31,7 @@
ConnectorSpecification,
FailureType,
)
from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand Down Expand Up @@ -57,9 +60,10 @@
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import ConnectionDefinition
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.slice_logger import (
AlwaysLogSliceLogger,
DebugSliceLogger,
Expand Down Expand Up @@ -99,6 +103,7 @@ def __init__(
component_factory: Optional[ModelToComponentFactory] = None,
migrate_manifest: Optional[bool] = False,
normalize_manifest: Optional[bool] = False,
config_path: Optional[str] = None,
) -> None:
"""
Args:
Expand All @@ -108,6 +113,7 @@ def __init__(
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
normalize_manifest: Optional flag to indicate if the manifest should be normalized.
config_path: Optional path to the config file.
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._should_normalize = normalize_manifest
Expand All @@ -130,7 +136,6 @@ def __init__(
self._slice_logger: SliceLogger = (
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
)
self._config = config or {}

# resolve all components in the manifest
self._source_config = self._pre_process_manifest(dict(source_config))
Expand All @@ -139,6 +144,12 @@ def __init__(
# apply additional post-processing to the manifest
self._post_process_manifest()

spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
self._spec_component: Optional[Spec] = (
self._constructor.create_component(SpecModel, spec, dict()) if spec else None
)
self._config = self._migrate_and_transform_config(config_path, config) or {}

@property
def resolved_manifest(self) -> Mapping[str, Any]:
"""
Expand Down Expand Up @@ -199,6 +210,30 @@ def _normalize_manifest(self) -> None:
normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
self._source_config = normalizer.normalize()

def _migrate_and_transform_config(
self,
config_path: Optional[str],
config: Optional[Config],
) -> Optional[Config]:
if not config:
return None
if not self._spec_component:
return config
mutable_config = dict(config)
self._spec_component.migrate_config(mutable_config)
if mutable_config != config:
if config_path:
with open(config_path, "w") as f:
json.dump(mutable_config, f)
self.message_repository.emit_message(
create_connector_config_control_message(mutable_config)
)
# We have no mechanism for consuming the queue, so we print the messages to stdout
for message in self.message_repository.consume_queue():
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
self._spec_component.transform_config(mutable_config)
return mutable_config

def _migrate_manifest(self) -> None:
"""
This method is used to migrate the manifest. It should be called after the manifest has been validated.
Expand Down Expand Up @@ -255,6 +290,9 @@ def connection_checker(self) -> ConnectionChecker:
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
if self._spec_component:
self._spec_component.validate_config(config)

self._emit_manifest_debug_message(
extra_args={
"source_name": self.name,
Expand Down Expand Up @@ -355,14 +393,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
}
)

spec = self._source_config.get("spec")
if spec:
if "type" not in spec:
spec["type"] = "Spec"
spec_component = self._constructor.create_component(SpecModel, spec, dict())
return spec_component.generate_spec()
else:
return super().spec(logger)
return (
self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
self._configure_logger_level(logger)
Expand Down
37 changes: 5 additions & 32 deletions airbyte_cdk/sources/declarative/spec/spec.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import json
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, MutableMapping, Optional

import orjson

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import (
AdvancedAuth,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)
from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
from airbyte_cdk.sources.declarative.models.declarative_component_schema import AuthFlow
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
ConfigTransformation,
)
from airbyte_cdk.sources.declarative.validators.validator import Validator
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository
from airbyte_cdk.sources.source import Source


@dataclass
Expand All @@ -48,7 +41,6 @@ class Spec:
config_migrations: List[ConfigMigration] = field(default_factory=list)
config_transformations: List[ConfigTransformation] = field(default_factory=list)
config_validations: List[Validator] = field(default_factory=list)
message_repository: MessageRepository = InMemoryMessageRepository()

def generate_spec(self) -> ConnectorSpecification:
"""
Expand All @@ -69,34 +61,15 @@ def generate_spec(self) -> ConnectorSpecification:
# We remap these keys to camel case because that's the existing format expected by the rest of the platform
return ConnectorSpecificationSerializer.load(obj)

def migrate_config(
self, args: List[str], source: Source, config: MutableMapping[str, Any]
) -> None:
def migrate_config(self, config: MutableMapping[str, Any]) -> None:
"""
Apply all specified config transformations to the provided config and save the modified config to the given path and emit a control message.
Apply all specified config transformations to the provided config and emit a control message.

:param args: Command line arguments
:param source: Source instance
:param config: The user-provided config to migrate
"""
config_path = AirbyteEntrypoint(source).extract_config(args)

if not config_path:
return

mutable_config = dict(config)
for migration in self.config_migrations:
for transformation in migration.transformations:
transformation.transform(mutable_config)

if mutable_config != config:
with open(config_path, "w") as f:
json.dump(mutable_config, f)
self.message_repository.emit_message(
create_connector_config_control_message(mutable_config)
)
for message in self.message_repository.consume_queue():
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
transformation.transform(config)

def transform_config(self, config: MutableMapping[str, Any]) -> None:
"""
Expand All @@ -107,7 +80,7 @@ def transform_config(self, config: MutableMapping[str, Any]) -> None:
for transformation in self.config_transformations:
transformation.transform(config)

def validate_config(self, config: MutableMapping[str, Any]) -> None:
def validate_config(self, config: Mapping[str, Any]) -> None:
"""
Apply all config validations to the provided config.

Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/yaml_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
config_path: Optional[str] = None,
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
Expand All @@ -36,6 +37,7 @@ def __init__(
config=config or {},
state=state or [],
source_config=source_config,
config_path=config_path,
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
Expand Down
104 changes: 0 additions & 104 deletions unit_tests/sources/declarative/spec/test_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from unittest.mock import Mock, mock_open

import pytest

from airbyte_cdk.models import (
Expand Down Expand Up @@ -161,108 +159,6 @@ def test_spec(spec, expected_connection_specification) -> None:
assert spec.generate_spec() == expected_connection_specification


@pytest.fixture
def migration_mocks(monkeypatch):
mock_message_repository = Mock()
mock_message_repository.consume_queue.return_value = [Mock()]

mock_source = Mock()
mock_entrypoint = Mock()
mock_entrypoint.extract_config.return_value = "/fake/config/path"
monkeypatch.setattr(
"airbyte_cdk.sources.declarative.spec.spec.AirbyteEntrypoint", lambda _: mock_entrypoint
)

_mock_open = mock_open()
mock_json_dump = Mock()
mock_print = Mock()
mock_serializer_dump = Mock()

mock_decoded_bytes = Mock()
mock_decoded_bytes.decode.return_value = "decoded_message"
mock_orjson_dumps = Mock(return_value=mock_decoded_bytes)

monkeypatch.setattr("builtins.open", _mock_open)
monkeypatch.setattr("json.dump", mock_json_dump)
monkeypatch.setattr("builtins.print", mock_print)
monkeypatch.setattr(
"airbyte_cdk.models.airbyte_protocol_serializers.AirbyteMessageSerializer.dump",
mock_serializer_dump,
)
monkeypatch.setattr("airbyte_cdk.sources.declarative.spec.spec.orjson.dumps", mock_orjson_dumps)

return {
"message_repository": mock_message_repository,
"source": mock_source,
"open": _mock_open,
"json_dump": mock_json_dump,
"print": mock_print,
"serializer_dump": mock_serializer_dump,
"orjson_dumps": mock_orjson_dumps,
"decoded_bytes": mock_decoded_bytes,
}


def test_given_unmigrated_config_when_migrating_then_config_is_migrated(migration_mocks) -> None:
input_config = {"planet": "CRSC"}

spec = component_spec(
connection_specification={},
parameters={},
config_migrations=[
ConfigMigration(
description="Test migration",
transformations=[
ConfigRemapField(
map={"CRSC": "Coruscant"}, field_path=["planet"], config=input_config
)
],
)
],
)
spec.message_repository = migration_mocks["message_repository"]

spec.migrate_config(["spec"], migration_mocks["source"], input_config)

migration_mocks["message_repository"].emit_message.assert_called_once()
migration_mocks["open"].assert_called_once_with("/fake/config/path", "w")
migration_mocks["json_dump"].assert_called_once()
migration_mocks["print"].assert_called()
migration_mocks["serializer_dump"].assert_called()
migration_mocks["orjson_dumps"].assert_called()
migration_mocks["decoded_bytes"].decode.assert_called()


def test_given_already_migrated_config_no_control_message_is_emitted(migration_mocks) -> None:
input_config = {"planet": "Coruscant"}

spec = component_spec(
connection_specification={},
parameters={},
config_migrations=[
ConfigMigration(
description="Test migration",
transformations=[
ConfigRemapField(
map={"CRSC": "Coruscant"}, field_path=["planet"], config=input_config
)
],
)
],
)
spec.message_repository = migration_mocks["message_repository"]

spec.migrate_config(["spec"], migration_mocks["source"], input_config)

migration_mocks["message_repository"].emit_message.assert_not_called()
migration_mocks["open"].assert_not_called()
migration_mocks["json_dump"].assert_not_called()
migration_mocks["print"].assert_not_called()
migration_mocks["serializer_dump"].assert_not_called()
migration_mocks["orjson_dumps"].assert_not_called()
migration_mocks["decoded_bytes"].decode.assert_not_called()


def test_given_list_of_transformations_when_transform_config_then_config_is_transformed() -> None:
input_config = {"planet_code": "CRSC"}
expected_config = {
Expand Down
Loading
Loading