Skip to content

Commit a11038e

Browse files
added key_transformation to DpathFlattenFields
1 parent 3e82436 commit a11038e

File tree

5 files changed

+71
-2
lines changed

5 files changed

+71
-2
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2335,6 +2335,12 @@ definitions:
23352335
title: Replace Origin Record
23362336
description: Whether to replace the origin record or not. Default is False.
23372337
type: boolean
2338+
key_transformation:
2339+
title: Key transformation
2340+
description: Transformation for object keys. If not provided, original key will be used.
2341+
type: string
2342+
examples:
2343+
- "flattened_{{ key }}"
23382344
$parameters:
23392345
type: object
23402346
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,14 @@ class DpathFlattenFields(BaseModel):
897897
description="Whether to replace the origin record or not. Default is False.",
898898
title="Replace Origin Record",
899899
)
900+
key_transformation: Optional[str] = Field(
901+
None,
902+
description="Transformation for object keys. If not provided, original key will be used.",
903+
examples=[
904+
"flattened_{{ key }}",
905+
],
906+
title="Key transformation",
907+
)
900908
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
901909

902910

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ def create_dpath_flatten_fields(
797797
if model.delete_origin_value is not None
798798
else False,
799799
replace_record=model.replace_record if model.replace_record is not None else False,
800+
key_transformation=model.key_transformation,
800801
parameters=model.parameters or {},
801802
)
802803

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class DpathFlattenFields(RecordTransformation):
1616
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1717
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
1818
replace_record: bool = False whether to replace origin record or not. Default is False.
19+
key_transformation: string = None how to transform extracted object keys
1920
2021
"""
2122

@@ -24,6 +25,7 @@ class DpathFlattenFields(RecordTransformation):
2425
parameters: InitVar[Mapping[str, Any]]
2526
delete_origin_value: bool = False
2627
replace_record: bool = False
28+
key_transformation: Union[InterpolatedString, str] = None
2729

2830
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2931
self._field_path = [
@@ -34,6 +36,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3436
self._field_path[path_index] = InterpolatedString.create(
3537
self.field_path[path_index], parameters=parameters
3638
)
39+
self.parameters = parameters
3740

3841
def transform(
3942
self,
@@ -50,6 +53,14 @@ def transform(
5053
extracted = dpath.get(record, path, default=[])
5154

5255
if isinstance(extracted, dict):
56+
57+
if self.key_transformation:
58+
updated_extracted = {}
59+
for key, value in extracted.items():
60+
updated_key = InterpolatedString.create(self.key_transformation, parameters=self.parameters).eval(key=key, config=self.config)
61+
updated_extracted[updated_key] = value
62+
extracted = updated_extracted
63+
5364
if self.replace_record and extracted:
5465
dpath.delete(record, "**")
5566
record.update(extracted)

unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
_REPLACE_WITH_VALUE = True
88
_DO_NOT_DELETE_ORIGIN_VALUE = False
99
_DO_NOT_REPLACE_WITH_VALUE = False
10-
10+
_NO_KEY_TRANSFORMATIONS = None
1111

1212
@pytest.mark.parametrize(
1313
[
@@ -16,6 +16,7 @@
1616
"field_path",
1717
"delete_origin_value",
1818
"replace_record",
19+
"key_transformation",
1920
"expected_record",
2021
],
2122
[
@@ -25,6 +26,7 @@
2526
["field2"],
2627
_DO_NOT_DELETE_ORIGIN_VALUE,
2728
_DO_NOT_REPLACE_WITH_VALUE,
29+
_NO_KEY_TRANSFORMATIONS,
2830
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
2931
id="flatten by dpath, don't delete origin value",
3032
),
@@ -34,6 +36,7 @@
3436
["field2"],
3537
_DELETE_ORIGIN_VALUE,
3638
_DO_NOT_REPLACE_WITH_VALUE,
39+
_NO_KEY_TRANSFORMATIONS,
3740
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
3841
id="flatten by dpath, delete origin value",
3942
),
@@ -46,6 +49,7 @@
4649
["field2", "*", "field4"],
4750
_DO_NOT_DELETE_ORIGIN_VALUE,
4851
_DO_NOT_REPLACE_WITH_VALUE,
52+
_NO_KEY_TRANSFORMATIONS,
4953
{
5054
"field1": _ANY_VALUE,
5155
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
@@ -62,6 +66,7 @@
6266
["field2", "*", "field4"],
6367
_DELETE_ORIGIN_VALUE,
6468
_DO_NOT_REPLACE_WITH_VALUE,
69+
_NO_KEY_TRANSFORMATIONS,
6570
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
6671
id="flatten by dpath with *, delete origin value",
6772
),
@@ -71,6 +76,7 @@
7176
["{{ config['field_path'] }}"],
7277
_DO_NOT_DELETE_ORIGIN_VALUE,
7378
_DO_NOT_REPLACE_WITH_VALUE,
79+
_NO_KEY_TRANSFORMATIONS,
7480
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
7581
id="flatten by dpath from config, don't delete origin value",
7682
),
@@ -80,6 +86,7 @@
8086
["non-existing-field"],
8187
_DO_NOT_DELETE_ORIGIN_VALUE,
8288
_DO_NOT_REPLACE_WITH_VALUE,
89+
_NO_KEY_TRANSFORMATIONS,
8390
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
8491
id="flatten by non-existing dpath, don't delete origin value",
8592
),
@@ -89,6 +96,7 @@
8996
["*", "non-existing-field"],
9097
_DO_NOT_DELETE_ORIGIN_VALUE,
9198
_DO_NOT_REPLACE_WITH_VALUE,
99+
_NO_KEY_TRANSFORMATIONS,
92100
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
93101
id="flatten by non-existing dpath with *, don't delete origin value",
94102
),
@@ -98,6 +106,7 @@
98106
["field2"],
99107
_DO_NOT_DELETE_ORIGIN_VALUE,
100108
_DO_NOT_REPLACE_WITH_VALUE,
109+
_NO_KEY_TRANSFORMATIONS,
101110
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
102111
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
103112
),
@@ -107,6 +116,7 @@
107116
["field2"],
108117
_DO_NOT_DELETE_ORIGIN_VALUE,
109118
_DO_NOT_REPLACE_WITH_VALUE,
119+
_NO_KEY_TRANSFORMATIONS,
110120
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
111121
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
112122
),
@@ -116,6 +126,7 @@
116126
["field2"],
117127
_DO_NOT_DELETE_ORIGIN_VALUE,
118128
_REPLACE_WITH_VALUE,
129+
_NO_KEY_TRANSFORMATIONS,
119130
{"field3": _ANY_VALUE},
120131
id="flatten by dpath, replace with value",
121132
),
@@ -125,20 +136,52 @@
125136
["field2"],
126137
_DELETE_ORIGIN_VALUE,
127138
_REPLACE_WITH_VALUE,
139+
_NO_KEY_TRANSFORMATIONS,
128140
{"field3": _ANY_VALUE},
129141
id="flatten by dpath, delete_origin_value do not affect to replace_record",
130142
),
143+
pytest.param(
144+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
145+
{},
146+
["field2"],
147+
_DO_NOT_DELETE_ORIGIN_VALUE,
148+
_DO_NOT_REPLACE_WITH_VALUE,
149+
"field2_{{ key }}",
150+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field2_field3": _ANY_VALUE},
151+
id="flatten by dpath, not delete origin value, add keys transformation",
152+
),
153+
pytest.param(
154+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
155+
{},
156+
["field2"],
157+
_DELETE_ORIGIN_VALUE,
158+
_DO_NOT_REPLACE_WITH_VALUE,
159+
"field2_{{ key }}",
160+
{"field1": _ANY_VALUE, "field2_field3": _ANY_VALUE},
161+
id="flatten by dpath, delete origin value, add keys transformation",
162+
),
163+
pytest.param(
164+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
165+
{},
166+
["field2"],
167+
_DO_NOT_DELETE_ORIGIN_VALUE,
168+
_REPLACE_WITH_VALUE,
169+
"field2_{{ key }}",
170+
{"field2_field3": _ANY_VALUE},
171+
id="flatten by dpath, not delete origin value, replace record, add keys transformation",
172+
),
131173
],
132174
)
133175
def test_dpath_flatten_lists(
134-
input_record, config, field_path, delete_origin_value, replace_record, expected_record
176+
input_record, config, field_path, delete_origin_value, replace_record, key_transformation, expected_record
135177
):
136178
flattener = DpathFlattenFields(
137179
field_path=field_path,
138180
parameters={},
139181
config=config,
140182
delete_origin_value=delete_origin_value,
141183
replace_record=replace_record,
184+
key_transformation=key_transformation,
142185
)
143186
flattener.transform(input_record)
144187
assert input_record == expected_record

0 commit comments

Comments
 (0)