Skip to content

Commit f222fcc

Browse files
authored
feat(low-code cdk): add flatten fields (#181)
1 parent 4759654 commit f222fcc

File tree

5 files changed

+134
-0
lines changed

5 files changed

+134
-0
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1852,6 +1852,19 @@ definitions:
18521852
$parameters:
18531853
type: object
18541854
additionalProperties: true
1855+
FlattenFields:
1856+
title: Flatten Fields
1857+
description: A transformation that flatten record to single level format.
1858+
type: object
1859+
required:
1860+
- type
1861+
properties:
1862+
type:
1863+
type: string
1864+
enum: [FlattenFields]
1865+
$parameters:
1866+
type: object
1867+
additionalProperties: true
18551868
IterableDecoder:
18561869
title: Iterable Decoder
18571870
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,11 @@ class KeysToSnakeCase(BaseModel):
715715
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
716716

717717

718+
class FlattenFields(BaseModel):
719+
type: Literal["FlattenFields"]
720+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
721+
722+
718723
class IterableDecoder(BaseModel):
719724
type: Literal["IterableDecoder"]
720725

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@
197197
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
198198
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
199199
)
200+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
201+
FlattenFields as FlattenFieldsModel,
202+
)
200203
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
201204
GzipJsonDecoder as GzipJsonDecoderModel,
202205
)
@@ -387,6 +390,9 @@
387390
RemoveFields,
388391
)
389392
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
393+
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
394+
FlattenFields,
395+
)
390396
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
391397
KeysToLowerTransformation,
392398
)
@@ -472,6 +478,7 @@ def _init_mappings(self) -> None:
472478
JsonlDecoderModel: self.create_jsonl_decoder,
473479
GzipJsonDecoderModel: self.create_gzipjson_decoder,
474480
KeysToLowerModel: self.create_keys_to_lower_transformation,
481+
FlattenFieldsModel: self.create_flatten_fields,
475482
IterableDecoderModel: self.create_iterable_decoder,
476483
XmlDecoderModel: self.create_xml_decoder,
477484
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
@@ -587,6 +594,11 @@ def create_keys_to_lower_transformation(
587594
) -> KeysToLowerTransformation:
588595
return KeysToLowerTransformation()
589596

597+
def create_flatten_fields(
598+
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
599+
) -> FlattenFields:
600+
return FlattenFields()
601+
590602
@staticmethod
591603
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
592604
if not value_type:
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, Dict, Optional
7+
8+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
9+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
10+
11+
12+
@dataclass
13+
class FlattenFields(RecordTransformation):
14+
def transform(
15+
self,
16+
record: Dict[str, Any],
17+
config: Optional[Config] = None,
18+
stream_state: Optional[StreamState] = None,
19+
stream_slice: Optional[StreamSlice] = None,
20+
) -> None:
21+
transformed_record = self.flatten_record(record)
22+
record.clear()
23+
record.update(transformed_record)
24+
25+
def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
26+
stack = [(record, "_")]
27+
transformed_record: Dict[str, Any] = {}
28+
force_with_parent_name = False
29+
30+
while stack:
31+
current_record, parent_key = stack.pop()
32+
33+
if isinstance(current_record, dict):
34+
for current_key, value in current_record.items():
35+
new_key = (
36+
f"{parent_key}.{current_key}"
37+
if (current_key in transformed_record or force_with_parent_name)
38+
else current_key
39+
)
40+
stack.append((value, new_key))
41+
42+
elif isinstance(current_record, list):
43+
for i, item in enumerate(current_record):
44+
force_with_parent_name = True
45+
stack.append((item, f"{parent_key}.{i}"))
46+
47+
else:
48+
transformed_record[parent_key] = current_record
49+
50+
return transformed_record
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import pytest
6+
7+
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
8+
FlattenFields,
9+
)
10+
11+
12+
@pytest.mark.parametrize(
13+
"input_record, expected_output",
14+
[
15+
({"FirstName": "John", "LastName": "Doe"}, {"FirstName": "John", "LastName": "Doe"}),
16+
({"123Number": 123, "456Another123": 456}, {"123Number": 123, "456Another123": 456}),
17+
(
18+
{
19+
"NestedRecord": {"FirstName": "John", "LastName": "Doe"},
20+
"456Another123": 456,
21+
},
22+
{
23+
"FirstName": "John",
24+
"LastName": "Doe",
25+
"456Another123": 456,
26+
},
27+
),
28+
(
29+
{"ListExample": [{"A": "a"}, {"A": "b"}]},
30+
{"ListExample.0.A": "a", "ListExample.1.A": "b"},
31+
),
32+
(
33+
{
34+
"MixedCase123": {
35+
"Nested": [{"Key": {"Value": "test1"}}, {"Key": {"Value": "test2"}}]
36+
},
37+
"SimpleKey": "SimpleValue",
38+
},
39+
{
40+
"Nested.0.Key.Value": "test1",
41+
"Nested.1.Key.Value": "test2",
42+
"SimpleKey": "SimpleValue",
43+
},
44+
),
45+
(
46+
{"List": ["Item1", "Item2", "Item3"]},
47+
{"List.0": "Item1", "List.1": "Item2", "List.2": "Item3"},
48+
),
49+
],
50+
)
51+
def test_flatten_fields(input_record, expected_output):
52+
flattener = FlattenFields()
53+
flattener.transform(input_record)
54+
assert input_record == expected_output

0 commit comments

Comments
 (0)