Skip to content

Commit 3344441

Browse files
darynaishchenkooctavia-squidington-iii
andauthored
feat(low-code): added keys replace transformation (airbytehq#183)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 3ee710d commit 3344441

File tree

5 files changed

+251
-0
lines changed

5 files changed

+251
-0
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,7 @@ definitions:
12411241
- "$ref": "#/definitions/KeysToLower"
12421242
- "$ref": "#/definitions/KeysToSnakeCase"
12431243
- "$ref": "#/definitions/FlattenFields"
1244+
- "$ref": "#/definitions/KeysReplace"
12441245
state_migrations:
12451246
title: State Migrations
12461247
description: Array of state migrations to be applied on the input state
@@ -1785,6 +1786,7 @@ definitions:
17851786
- "$ref": "#/definitions/KeysToLower"
17861787
- "$ref": "#/definitions/KeysToSnakeCase"
17871788
- "$ref": "#/definitions/FlattenFields"
1789+
- "$ref": "#/definitions/KeysReplace"
17881790
schema_type_identifier:
17891791
"$ref": "#/definitions/SchemaTypeIdentifier"
17901792
$parameters:
@@ -1883,6 +1885,49 @@ definitions:
18831885
$parameters:
18841886
type: object
18851887
additionalProperties: true
1888+
KeysReplace:
1889+
title: Keys Replace
1890+
description: A transformation that replaces symbols in keys.
1891+
type: object
1892+
required:
1893+
- type
1894+
- old
1895+
- new
1896+
properties:
1897+
type:
1898+
type: string
1899+
enum: [KeysReplace]
1900+
old:
1901+
type: string
1902+
title: Old value
1903+
description: Old value to replace.
1904+
examples:
1905+
- " "
1906+
- "{{ record.id }}"
1907+
- "{{ config['id'] }}"
1908+
- "{{ stream_slice['id'] }}"
1909+
interpolation_context:
1910+
- config
1911+
- record
1912+
- stream_state
1913+
- stream_slice
1914+
new:
1915+
type: string
1916+
title: New value
1917+
description: New value to set.
1918+
examples:
1919+
- "_"
1920+
- "{{ record.id }}"
1921+
- "{{ config['id'] }}"
1922+
- "{{ stream_slice['id'] }}"
1923+
interpolation_context:
1924+
- config
1925+
- record
1926+
- stream_state
1927+
- stream_slice
1928+
$parameters:
1929+
type: object
1930+
additionalProperties: true
18861931
IterableDecoder:
18871932
title: Iterable Decoder
18881933
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,23 @@ class KeysToSnakeCase(BaseModel):
721721
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
722722

723723

724+
class KeysReplace(BaseModel):
725+
type: Literal["KeysReplace"]
726+
old: str = Field(
727+
...,
728+
description="Old value to replace.",
729+
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
730+
title="Old value",
731+
)
732+
new: str = Field(
733+
...,
734+
description="New value to set.",
735+
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
736+
title="New value",
737+
)
738+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
739+
740+
724741
class FlattenFields(BaseModel):
725742
type: Literal["FlattenFields"]
726743
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
@@ -1701,6 +1718,7 @@ class Config:
17011718
KeysToLower,
17021719
KeysToSnakeCase,
17031720
FlattenFields,
1721+
KeysReplace,
17041722
]
17051723
]
17061724
] = Field(
@@ -1875,6 +1893,7 @@ class DynamicSchemaLoader(BaseModel):
18751893
KeysToLower,
18761894
KeysToSnakeCase,
18771895
FlattenFields,
1896+
KeysReplace,
18781897
]
18791898
]
18801899
] = Field(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@
254254
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
255255
JwtPayload as JwtPayloadModel,
256256
)
257+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
258+
KeysReplace as KeysReplaceModel,
259+
)
257260
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
258261
KeysToLower as KeysToLowerModel,
259262
)
@@ -417,6 +420,9 @@
417420
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
418421
FlattenFields,
419422
)
423+
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
424+
KeysReplaceTransformation,
425+
)
420426
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
421427
KeysToLowerTransformation,
422428
)
@@ -509,6 +515,7 @@ def _init_mappings(self) -> None:
509515
GzipParserModel: self.create_gzip_parser,
510516
KeysToLowerModel: self.create_keys_to_lower_transformation,
511517
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
518+
KeysReplaceModel: self.create_keys_replace_transformation,
512519
FlattenFieldsModel: self.create_flatten_fields,
513520
IterableDecoderModel: self.create_iterable_decoder,
514521
XmlDecoderModel: self.create_xml_decoder,
@@ -630,6 +637,13 @@ def create_keys_to_snake_transformation(
630637
) -> KeysToSnakeCaseTransformation:
631638
return KeysToSnakeCaseTransformation()
632639

640+
def create_keys_replace_transformation(
641+
self, model: KeysReplaceModel, config: Config, **kwargs: Any
642+
) -> KeysReplaceTransformation:
643+
return KeysReplaceTransformation(
644+
old=model.old, new=model.new, parameters=model.parameters or {}
645+
)
646+
633647
def create_flatten_fields(
634648
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
635649
) -> FlattenFields:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Dict, Mapping, Optional
7+
8+
from airbyte_cdk import InterpolatedString
9+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
10+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
11+
12+
13+
@dataclass
14+
class KeysReplaceTransformation(RecordTransformation):
15+
"""
16+
Transformation that applies keys names replacement.
17+
18+
Example usage:
19+
- type: KeysReplace
20+
old: " "
21+
new: "_"
22+
Result:
23+
from: {"created time": ..., "customer id": ..., "user id": ...}
24+
to: {"created_time": ..., "customer_id": ..., "user_id": ...}
25+
"""
26+
27+
old: str
28+
new: str
29+
parameters: InitVar[Mapping[str, Any]]
30+
31+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
32+
self._old = InterpolatedString.create(self.old, parameters=parameters)
33+
self._new = InterpolatedString.create(self.new, parameters=parameters)
34+
35+
def transform(
36+
self,
37+
record: Dict[str, Any],
38+
config: Optional[Config] = None,
39+
stream_state: Optional[StreamState] = None,
40+
stream_slice: Optional[StreamSlice] = None,
41+
) -> None:
42+
if config is None:
43+
config = {}
44+
45+
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
46+
old_key = str(self._old.eval(config, **kwargs))
47+
new_key = str(self._new.eval(config, **kwargs))
48+
49+
def _transform(data: Dict[str, Any]) -> Dict[str, Any]:
50+
result = {}
51+
for key, value in data.items():
52+
updated_key = key.replace(old_key, new_key)
53+
if isinstance(value, dict):
54+
result[updated_key] = _transform(value)
55+
else:
56+
result[updated_key] = value
57+
return result
58+
59+
transformed_record = _transform(record)
60+
record.clear()
61+
record.update(transformed_record)
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
import pytest
5+
6+
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
7+
KeysReplaceTransformation,
8+
)
9+
10+
_ANY_VALUE = -1
11+
12+
13+
@pytest.mark.parametrize(
14+
[
15+
"input_record",
16+
"config",
17+
"stream_state",
18+
"stream_slice",
19+
"keys_replace_config",
20+
"expected_record",
21+
],
22+
[
23+
pytest.param(
24+
{"date time": _ANY_VALUE, "customer id": _ANY_VALUE},
25+
{},
26+
{},
27+
{},
28+
{"old": " ", "new": "_"},
29+
{"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE},
30+
id="simple keys replace config",
31+
),
32+
pytest.param(
33+
{
34+
"customer_id": 111111,
35+
"customer_name": "MainCustomer",
36+
"field_1_111111": _ANY_VALUE,
37+
"field_2_111111": _ANY_VALUE,
38+
},
39+
{},
40+
{},
41+
{},
42+
{"old": '{{ record["customer_id"] }}', "new": '{{ record["customer_name"] }}'},
43+
{
44+
"customer_id": 111111,
45+
"customer_name": "MainCustomer",
46+
"field_1_MainCustomer": _ANY_VALUE,
47+
"field_2_MainCustomer": _ANY_VALUE,
48+
},
49+
id="keys replace config uses values from record",
50+
),
51+
pytest.param(
52+
{"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE},
53+
{},
54+
{},
55+
{"customer_name": "MainCustomer"},
56+
{"old": '{{ record["customer_id"] }}', "new": '{{ stream_slice["customer_name"] }}'},
57+
{
58+
"customer_id": 111111,
59+
"field_1_MainCustomer": _ANY_VALUE,
60+
"field_2_MainCustomer": _ANY_VALUE,
61+
},
62+
id="keys replace config uses values from slice",
63+
),
64+
pytest.param(
65+
{"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE},
66+
{"customer_name": "MainCustomer"},
67+
{},
68+
{},
69+
{"old": '{{ record["customer_id"] }}', "new": '{{ config["customer_name"] }}'},
70+
{
71+
"customer_id": 111111,
72+
"field_1_MainCustomer": _ANY_VALUE,
73+
"field_2_MainCustomer": _ANY_VALUE,
74+
},
75+
id="keys replace config uses values from config",
76+
),
77+
pytest.param(
78+
{
79+
"date time": _ANY_VALUE,
80+
"user id": _ANY_VALUE,
81+
"customer": {
82+
"customer name": _ANY_VALUE,
83+
"customer id": _ANY_VALUE,
84+
"contact info": {"email": _ANY_VALUE, "phone number": _ANY_VALUE},
85+
},
86+
},
87+
{},
88+
{},
89+
{},
90+
{"old": " ", "new": "_"},
91+
{
92+
"customer": {
93+
"contact_info": {"email": _ANY_VALUE, "phone_number": _ANY_VALUE},
94+
"customer_id": _ANY_VALUE,
95+
"customer_name": _ANY_VALUE,
96+
},
97+
"date_time": _ANY_VALUE,
98+
"user_id": _ANY_VALUE,
99+
},
100+
id="simple keys replace config with nested fields in record",
101+
),
102+
],
103+
)
104+
def test_transform(
105+
input_record, config, stream_state, stream_slice, keys_replace_config, expected_record
106+
):
107+
KeysReplaceTransformation(
108+
old=keys_replace_config["old"], new=keys_replace_config["new"], parameters={}
109+
).transform(
110+
record=input_record, config=config, stream_state=stream_state, stream_slice=stream_slice
111+
)
112+
assert input_record == expected_record

0 commit comments

Comments
 (0)