Skip to content

Commit 0a9631b

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(low-code cdk): add overwrite flag to dpath flatten (#410)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 9ad9e09 commit 0a9631b

File tree

6 files changed

+61
-8
lines changed

6 files changed

+61
-8
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2265,6 +2265,10 @@ definitions:
22652265
title: Delete Origin Value
22662266
description: Whether to delete the origin value or keep it. Default is False.
22672267
type: boolean
2268+
replace_record:
2269+
title: Replace Origin Record
2270+
description: Whether to replace the origin record or not. Default is False.
2271+
type: boolean
22682272
$parameters:
22692273
type: object
22702274
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel):
877877
description="Whether to delete the origin value or keep it. Default is False.",
878878
title="Delete Origin Value",
879879
)
880+
replace_record: Optional[bool] = Field(
881+
None,
882+
description="Whether to replace the origin record or not. Default is False.",
883+
title="Replace Origin Record",
884+
)
880885
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
881886

882887

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,10 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
712712
)
713713
for added_field_definition_model in model.fields
714714
]
715-
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})
715+
return AddFields(
716+
fields=added_field_definitions,
717+
parameters=model.parameters or {},
718+
)
716719

717720
def create_keys_to_lower_transformation(
718721
self, model: KeysToLowerModel, config: Config, **kwargs: Any
@@ -748,6 +751,7 @@ def create_dpath_flatten_fields(
748751
delete_origin_value=model.delete_origin_value
749752
if model.delete_origin_value is not None
750753
else False,
754+
replace_record=model.replace_record if model.replace_record is not None else False,
751755
parameters=model.parameters or {},
752756
)
753757

airbyte_cdk/sources/declarative/transformations/add_fields.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

55
from dataclasses import InitVar, dataclass, field

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ class DpathFlattenFields(RecordTransformation):
1515
1616
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1717
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
18+
replace_record: bool = False whether to replace origin record or not. Default is False.
1819
1920
"""
2021

2122
config: Config
2223
field_path: List[Union[InterpolatedString, str]]
2324
parameters: InitVar[Mapping[str, Any]]
2425
delete_origin_value: bool = False
26+
replace_record: bool = False
2527

2628
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2729
self._field_path = [
@@ -48,8 +50,12 @@ def transform(
4850
extracted = dpath.get(record, path, default=[])
4951

5052
if isinstance(extracted, dict):
51-
conflicts = set(extracted.keys()) & set(record.keys())
52-
if not conflicts:
53-
if self.delete_origin_value:
54-
dpath.delete(record, path)
53+
if self.replace_record and extracted:
54+
dpath.delete(record, "**")
5555
record.update(extracted)
56+
else:
57+
conflicts = set(extracted.keys()) & set(record.keys())
58+
if not conflicts:
59+
if self.delete_origin_value:
60+
dpath.delete(record, path)
61+
record.update(extracted)

unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
_ANY_VALUE = -1
66
_DELETE_ORIGIN_VALUE = True
7+
_REPLACE_WITH_VALUE = True
78
_DO_NOT_DELETE_ORIGIN_VALUE = False
9+
_DO_NOT_REPLACE_WITH_VALUE = False
810

911

1012
@pytest.mark.parametrize(
@@ -13,6 +15,7 @@
1315
"config",
1416
"field_path",
1517
"delete_origin_value",
18+
"replace_record",
1619
"expected_record",
1720
],
1821
[
@@ -21,6 +24,7 @@
2124
{},
2225
["field2"],
2326
_DO_NOT_DELETE_ORIGIN_VALUE,
27+
_DO_NOT_REPLACE_WITH_VALUE,
2428
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
2529
id="flatten by dpath, don't delete origin value",
2630
),
@@ -29,6 +33,7 @@
2933
{},
3034
["field2"],
3135
_DELETE_ORIGIN_VALUE,
36+
_DO_NOT_REPLACE_WITH_VALUE,
3237
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
3338
id="flatten by dpath, delete origin value",
3439
),
@@ -40,6 +45,7 @@
4045
{},
4146
["field2", "*", "field4"],
4247
_DO_NOT_DELETE_ORIGIN_VALUE,
48+
_DO_NOT_REPLACE_WITH_VALUE,
4349
{
4450
"field1": _ANY_VALUE,
4551
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
@@ -55,6 +61,7 @@
5561
{},
5662
["field2", "*", "field4"],
5763
_DELETE_ORIGIN_VALUE,
64+
_DO_NOT_REPLACE_WITH_VALUE,
5865
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
5966
id="flatten by dpath with *, delete origin value",
6067
),
@@ -63,6 +70,7 @@
6370
{"field_path": "field2"},
6471
["{{ config['field_path'] }}"],
6572
_DO_NOT_DELETE_ORIGIN_VALUE,
73+
_DO_NOT_REPLACE_WITH_VALUE,
6674
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
6775
id="flatten by dpath from config, don't delete origin value",
6876
),
@@ -71,6 +79,7 @@
7179
{},
7280
["non-existing-field"],
7381
_DO_NOT_DELETE_ORIGIN_VALUE,
82+
_DO_NOT_REPLACE_WITH_VALUE,
7483
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
7584
id="flatten by non-existing dpath, don't delete origin value",
7685
),
@@ -79,6 +88,7 @@
7988
{},
8089
["*", "non-existing-field"],
8190
_DO_NOT_DELETE_ORIGIN_VALUE,
91+
_DO_NOT_REPLACE_WITH_VALUE,
8292
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
8393
id="flatten by non-existing dpath with *, don't delete origin value",
8494
),
@@ -87,6 +97,7 @@
8797
{},
8898
["field2"],
8999
_DO_NOT_DELETE_ORIGIN_VALUE,
100+
_DO_NOT_REPLACE_WITH_VALUE,
90101
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
91102
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
92103
),
@@ -95,16 +106,39 @@
95106
{},
96107
["field2"],
97108
_DO_NOT_DELETE_ORIGIN_VALUE,
109+
_DO_NOT_REPLACE_WITH_VALUE,
98110
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
99111
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
100112
),
113+
pytest.param(
114+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
115+
{},
116+
["field2"],
117+
_DO_NOT_DELETE_ORIGIN_VALUE,
118+
_REPLACE_WITH_VALUE,
119+
{"field3": _ANY_VALUE},
120+
id="flatten by dpath, replace with value",
121+
),
122+
pytest.param(
123+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
124+
{},
125+
["field2"],
126+
_DELETE_ORIGIN_VALUE,
127+
_REPLACE_WITH_VALUE,
128+
{"field3": _ANY_VALUE},
129+
id="flatten by dpath, delete_origin_value do not affect to replace_record",
130+
),
101131
],
102132
)
103133
def test_dpath_flatten_lists(
104-
input_record, config, field_path, delete_origin_value, expected_record
134+
input_record, config, field_path, delete_origin_value, replace_record, expected_record
105135
):
106136
flattener = DpathFlattenFields(
107-
field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value
137+
field_path=field_path,
138+
parameters={},
139+
config=config,
140+
delete_origin_value=delete_origin_value,
141+
replace_record=replace_record,
108142
)
109143
flattener.transform(input_record)
110144
assert input_record == expected_record

0 commit comments

Comments
 (0)