Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3852,6 +3852,7 @@ definitions:
- "$ref": "#/definitions/ConfigRemapField"
- "$ref": "#/definitions/ConfigAddFields"
- "$ref": "#/definitions/ConfigRemoveFields"
- "$ref": "#/definitions/CustomConfigTransformation"
default: []
validations:
title: Validations
Expand Down Expand Up @@ -3885,6 +3886,7 @@ definitions:
- "$ref": "#/definitions/ConfigRemapField"
- "$ref": "#/definitions/ConfigAddFields"
- "$ref": "#/definitions/ConfigRemoveFields"
- "$ref": "#/definitions/CustomConfigTransformation"
default: []
SubstreamPartitionRouter:
title: Substream Partition Router
Expand Down Expand Up @@ -4556,6 +4558,26 @@ definitions:
- "{{ property is integer }}"
- "{{ property|length > 5 }}"
- "{{ property == 'some_string_to_match' }}"
CustomConfigTransformation:
title: Custom Config Transformation
description: A custom config transformation that can be used to transform the connector configuration.
type: object
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomConfigTransformation]
class_name:
type: string
description: Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_<name>.<package>.<class_name>`.
examples:
- "source_declarative_manifest.components.MyCustomConfigTransformation"
$parameters:
type: object
description: Additional parameters to be passed to the custom config transformation.
additionalProperties: true
interpolation:
variables:
- title: config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomConfigTransformation(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomConfigTransformation"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_<name>.<package>.<class_name>`.",
examples=["source_declarative_manifest.components.MyCustomConfigTransformation"],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomErrorHandler(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -2149,7 +2163,9 @@ class ConfigMigration(BaseModel):
description: Optional[str] = Field(
None, description="The description/purpose of the config migration."
)
transformations: List[Union[ConfigRemapField, ConfigAddFields, ConfigRemoveFields]] = Field(
transformations: List[
Union[ConfigRemapField, ConfigAddFields, ConfigRemoveFields, CustomConfigTransformation]
] = Field(
...,
description="The list of transformations that will attempt to be applied on an incoming unmigrated config. The transformations will be applied in the order they are defined.",
title="Transformations",
Expand All @@ -2166,7 +2182,9 @@ class Config:
title="Config Migrations",
)
transformations: Optional[
List[Union[ConfigRemapField, ConfigAddFields, ConfigRemoveFields]]
List[
Union[ConfigRemapField, ConfigAddFields, ConfigRemoveFields, CustomConfigTransformation]
]
] = Field(
[],
description="The list of transformations that will be applied on the incoming config at the start of each sync. The transformations will be applied in the order they are defined.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomBackoffStrategy as CustomBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomConfigTransformation as CustomConfigTransformationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomDecoder as CustomDecoderModel,
)
Expand Down Expand Up @@ -687,6 +690,7 @@ def _init_mappings(self) -> None:
CustomPartitionRouterModel: self.create_custom_component,
CustomTransformationModel: self.create_custom_component,
CustomValidationStrategyModel: self.create_custom_component,
CustomConfigTransformationModel: self.create_custom_component,
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
DeclarativeStreamModel: self.create_declarative_stream,
DefaultErrorHandlerModel: self.create_default_error_handler,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from typing import Any, Dict, MutableMapping, Optional

from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
ConfigTransformation,
)


class MockCustomConfigTransformation(ConfigTransformation):
"""
A mock custom config transformation for testing purposes.
This simulates what a real custom transformation would look like.
"""

def __init__(self, parameters: Optional[Dict[str, Any]] = None) -> None:
self.parameters = parameters or {}

def transform(self, config: MutableMapping[str, Any]) -> None:
"""
Transform the config by adding a test field.
This simulates the behavior of a real custom transformation.
"""
# Only modify user config keys, avoid framework-injected keys
# Check if there are any user keys (not starting with __)
has_user_keys = any(not key.startswith("__") for key in config.keys())
if has_user_keys:
config["transformed_field"] = "transformed_value"
if self.parameters.get("additional_field"):
config["additional_field"] = self.parameters["additional_field"]


def test_given_valid_config_when_transform_then_config_is_transformed():
"""Test that a custom config transformation properly transforms the config."""
transformation = MockCustomConfigTransformation()
config = {"original_field": "original_value"}

transformation.transform(config)

assert config["original_field"] == "original_value"
assert config["transformed_field"] == "transformed_value"


def test_given_config_with_parameters_when_transform_then_parameters_are_applied():
"""Test that custom config transformation respects parameters."""
parameters = {"additional_field": "parameter_value"}
transformation = MockCustomConfigTransformation(parameters=parameters)
config = {"original_field": "original_value"}

transformation.transform(config)

assert config["original_field"] == "original_value"
assert config["transformed_field"] == "transformed_value"
assert config["additional_field"] == "parameter_value"
Loading