Skip to content

Commit 40a9f1e

Browse files
pnilannatikgadzhi
andauthored
feat: Add JsonParser component to declarative framework (#166)
Co-authored-by: Natik Gadzhi <[email protected]>
1 parent c109297 commit 40a9f1e

File tree

6 files changed

+190
-34
lines changed

6 files changed

+190
-34
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ definitions:
678678
properties:
679679
type:
680680
type: string
681-
enum: [ CustomSchemaNormalization ]
681+
enum: [CustomSchemaNormalization]
682682
class_name:
683683
title: Class Name
684684
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
@@ -2886,6 +2886,7 @@ definitions:
28862886
parser:
28872887
anyOf:
28882888
- "$ref": "#/definitions/GzipParser"
2889+
- "$ref": "#/definitions/JsonParser"
28892890
- "$ref": "#/definitions/JsonLineParser"
28902891
- "$ref": "#/definitions/CsvParser"
28912892
# PARSERS
@@ -2902,6 +2903,20 @@ definitions:
29022903
anyOf:
29032904
- "$ref": "#/definitions/JsonLineParser"
29042905
- "$ref": "#/definitions/CsvParser"
2906+
- "$ref": "#/definitions/JsonParser"
2907+
JsonParser:
2908+
title: JsonParser
2909+
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
2910+
type: object
2911+
required:
2912+
- type
2913+
properties:
2914+
type:
2915+
type: string
2916+
enum: [JsonParser]
2917+
encoding:
2918+
type: string
2919+
default: utf-8
29052920
JsonLineParser:
29062921
type: object
29072922
required:

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
from io import BufferedIOBase, TextIOWrapper
88
from typing import Any, Generator, MutableMapping, Optional
99

10+
import orjson
1011
import requests
1112

13+
from airbyte_cdk.models import FailureType
1214
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
15+
from airbyte_cdk.utils import AirbyteTracedException
1316

1417
logger = logging.getLogger("airbyte")
1518

@@ -42,6 +45,46 @@ def parse(
4245
yield from self.inner_parser.parse(gzipobj)
4346

4447

48+
@dataclass
49+
class JsonParser(Parser):
50+
encoding: str = "utf-8"
51+
52+
def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
53+
"""
54+
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
55+
"""
56+
raw_data = data.read()
57+
body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data)
58+
59+
if body_json is None:
60+
raise AirbyteTracedException(
61+
message="Response JSON data failed to be parsed. See logs for more information.",
62+
internal_message=f"Response JSON data failed to be parsed.",
63+
failure_type=FailureType.system_error,
64+
)
65+
66+
if isinstance(body_json, list):
67+
yield from body_json
68+
else:
69+
yield from [body_json]
70+
71+
def _parse_orjson(self, raw_data: bytes) -> Optional[Any]:
72+
try:
73+
return orjson.loads(raw_data.decode(self.encoding))
74+
except Exception as exc:
75+
logger.debug(
76+
f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}"
77+
)
78+
return None
79+
80+
def _parse_json(self, raw_data: bytes) -> Optional[Any]:
81+
try:
82+
return json.loads(raw_data.decode(self.encoding))
83+
except Exception as exc:
84+
logger.error(f"Failed to parse JSON data using json library. {exc}")
85+
return None
86+
87+
4588
@dataclass
4689
class JsonLineParser(Parser):
4790
encoding: Optional[str] = "utf-8"

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,14 @@ class LegacySessionTokenAuthenticator(BaseModel):
12011201
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
12021202

12031203

1204+
class JsonParser(BaseModel):
1205+
class Config:
1206+
extra = Extra.allow
1207+
1208+
type: Literal["JsonParser"]
1209+
encoding: Optional[str] = "utf-8"
1210+
1211+
12041212
class JsonLineParser(BaseModel):
12051213
type: Literal["JsonLineParser"]
12061214
encoding: Optional[str] = "utf-8"
@@ -1599,7 +1607,7 @@ class RecordSelector(BaseModel):
15991607

16001608
class GzipParser(BaseModel):
16011609
type: Literal["GzipParser"]
1602-
inner_parser: Union[JsonLineParser, CsvParser]
1610+
inner_parser: Union[JsonLineParser, CsvParser, JsonParser]
16031611

16041612

16051613
class Spec(BaseModel):
@@ -1634,7 +1642,7 @@ class CompositeErrorHandler(BaseModel):
16341642

16351643
class CompositeRawDecoder(BaseModel):
16361644
type: Literal["CompositeRawDecoder"]
1637-
parser: Union[GzipParser, JsonLineParser, CsvParser]
1645+
parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser]
16381646

16391647

16401648
class DeclarativeSource1(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@
7272
CsvParser,
7373
GzipParser,
7474
JsonLineParser,
75+
JsonParser,
76+
Parser,
7577
)
7678
from airbyte_cdk.sources.declarative.extractors import (
7779
DpathExtractor,
@@ -247,6 +249,9 @@
247249
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
248250
JsonLineParser as JsonLineParserModel,
249251
)
252+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
253+
JsonParser as JsonParserModel,
254+
)
250255
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
251256
JwtAuthenticator as JwtAuthenticatorModel,
252257
)
@@ -522,6 +527,7 @@ def _init_mappings(self) -> None:
522527
JsonDecoderModel: self.create_json_decoder,
523528
JsonlDecoderModel: self.create_jsonl_decoder,
524529
JsonLineParserModel: self.create_json_line_parser,
530+
JsonParserModel: self.create_json_parser,
525531
GzipJsonDecoderModel: self.create_gzipjson_decoder,
526532
GzipParserModel: self.create_gzip_parser,
527533
KeysToLowerModel: self.create_keys_to_lower_transformation,
@@ -1032,17 +1038,17 @@ def create_cursor_pagination(
10321038
self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any
10331039
) -> CursorPaginationStrategy:
10341040
if isinstance(decoder, PaginationDecoderDecorator):
1035-
if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)):
1036-
raise ValueError(
1037-
f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
1038-
)
1041+
inner_decoder = decoder.decoder
1042+
else:
1043+
inner_decoder = decoder
1044+
decoder = PaginationDecoderDecorator(decoder=decoder)
1045+
1046+
if self._is_supported_decoder_for_pagination(inner_decoder):
10391047
decoder_to_use = decoder
10401048
else:
1041-
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
1042-
raise ValueError(
1043-
f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
1044-
)
1045-
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
1049+
raise ValueError(
1050+
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
1051+
)
10461052

10471053
return CursorPaginationStrategy(
10481054
cursor_value=model.cursor_value,
@@ -1515,11 +1521,10 @@ def create_default_paginator(
15151521
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
15161522
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
15171523
if decoder:
1518-
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
1519-
raise ValueError(
1520-
f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
1521-
)
1522-
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
1524+
if self._is_supported_decoder_for_pagination(decoder):
1525+
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
1526+
else:
1527+
raise ValueError(self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(decoder)))
15231528
else:
15241529
decoder_to_use = PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
15251530
page_size_option = (
@@ -1748,6 +1753,11 @@ def create_dynamic_schema_loader(
17481753
def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder:
17491754
return JsonDecoder(parameters={})
17501755

1756+
@staticmethod
1757+
def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser:
1758+
encoding = model.encoding if model.encoding else "utf-8"
1759+
return JsonParser(encoding=encoding)
1760+
17511761
@staticmethod
17521762
def create_jsonl_decoder(
17531763
model: JsonlDecoderModel, config: Config, **kwargs: Any
@@ -1940,22 +1950,22 @@ def create_oauth_authenticator(
19401950
message_repository=self._message_repository,
19411951
)
19421952

1943-
@staticmethod
19441953
def create_offset_increment(
1945-
model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
1954+
self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
19461955
) -> OffsetIncrement:
19471956
if isinstance(decoder, PaginationDecoderDecorator):
1948-
if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)):
1949-
raise ValueError(
1950-
f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
1951-
)
1957+
inner_decoder = decoder.decoder
1958+
else:
1959+
inner_decoder = decoder
1960+
decoder = PaginationDecoderDecorator(decoder=decoder)
1961+
1962+
if self._is_supported_decoder_for_pagination(inner_decoder):
19521963
decoder_to_use = decoder
19531964
else:
1954-
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
1955-
raise ValueError(
1956-
f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
1957-
)
1958-
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
1965+
raise ValueError(
1966+
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
1967+
)
1968+
19591969
return OffsetIncrement(
19601970
page_size=model.page_size,
19611971
config=config,
@@ -2555,3 +2565,25 @@ def create_config_components_resolver(
25552565
components_mapping=components_mapping,
25562566
parameters=model.parameters or {},
25572567
)
2568+
2569+
_UNSUPPORTED_DECODER_ERROR = (
2570+
"Specified decoder of {decoder_type} is not supported for pagination."
2571+
"Please set as `JsonDecoder`, `XmlDecoder`, or a `CompositeRawDecoder` with an inner_parser of `JsonParser` or `GzipParser` instead."
2572+
"If using `GzipParser`, please ensure that the lowest level inner_parser is a `JsonParser`."
2573+
)
2574+
2575+
def _is_supported_decoder_for_pagination(self, decoder: Decoder) -> bool:
2576+
if isinstance(decoder, (JsonDecoder, XmlDecoder)):
2577+
return True
2578+
elif isinstance(decoder, CompositeRawDecoder):
2579+
return self._is_supported_parser_for_pagination(decoder.parser)
2580+
else:
2581+
return False
2582+
2583+
def _is_supported_parser_for_pagination(self, parser: Parser) -> bool:
2584+
if isinstance(parser, JsonParser):
2585+
return True
2586+
elif isinstance(parser, GzipParser):
2587+
return isinstance(parser.inner_parser, JsonParser)
2588+
else:
2589+
return False

docs/RELEASES.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ A few seconds after any PR is merged to `main` , a release draft will be created
99
3. Optionally tweak the text in the release notes - for instance to call out contributors, to make a specific change more intuitive for readers to understand, or to move updates into a different category than they were assigned by default. (Note: You can also do this retroactively after publishing the release.)
1010
4. Publish the release by pressing the “Publish release” button.
1111

12-
*Note:*
12+
_Note:_
1313

14-
- *Only maintainers can see release drafts. Non-maintainers will only see published releases.*
14+
- _Only maintainers can see release drafts. Non-maintainers will only see published releases._
1515
- If you create a tag on accident that you need to remove, contact a maintainer to delete the tag and the release.
1616
- You can monitor the PyPI release process here in the GitHub Actions view: https://github.com/airbytehq/airbyte-python-cdk/actions/workflows/pypi_publish.yml
1717

@@ -49,23 +49,23 @@ The first option is to look in the `declarative_manifest_image_version` database
4949

5050
If that is not available as an option, you can run an Builder-created connector in Cloud and note the version number printed in the logs. Warning: this may not be indicative if that connector instance has been manually pinned to a specific version.
5151

52-
TODO: Would be great to find a way to inspect directly without requiring direct prod DB access.
52+
TODO: Would be great to find a way to inspect directly without requiring direct prod DB access.
5353

5454
### How to pretest changes to SDM images manually
5555

5656
To manually test changes against a dev image of SDM before committing to a release, first use the Publishing & Packaging workflow to publish a pre-release version of the CDK/SDM. Be sure to uncheck the option to create a connector builder PR.
5757

5858
#### Pretesting Manifest-Only connectors
5959

60-
Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well).
61-
Next, build the pre-release image locally using `airbyte-ci connectors —name=<source> build`.
60+
Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well).
61+
Next, build the pre-release image locally using `airbyte-ci connectors —name=<source> build`.
6262
You can now run connector interfaces against the built image using the pattern
`docker run airbyte/<source-name>:dev <spec/check/discover/read>`.
6363
The connector's README should include a list of these commands, which can be copy/pasted and run from the connector's directory for quick testing against a local config.
6464
You can also run `airbyte-ci connectors —name=<source> test` to run the CI test suite against the dev image.
6565

6666
#### Pretesting Low-Code Python connectors
6767

68-
Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI.
68+
Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI.
6969
Update the lockfile and run connector interfaces via poetry:
`poetry run source-<name> spec/check/discover/read`.
7070
You can also run `airbyte-ci connectors —name=<source> test` to run the CI test suite against the dev image.


7171

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import gzip
66
import json
77
from io import BytesIO, StringIO
8+
from unittest.mock import patch
89

910
import pytest
1011
import requests
@@ -14,7 +15,9 @@
1415
CsvParser,
1516
GzipParser,
1617
JsonLineParser,
18+
JsonParser,
1719
)
20+
from airbyte_cdk.utils import AirbyteTracedException
1821

1922

2023
def compress_with_gzip(data: str, encoding: str = "utf-8"):
@@ -117,3 +120,58 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str):
117120
for _ in composite_raw_decoder.decode(response):
118121
counter += 1
119122
assert counter == 3
123+
124+
125+
@pytest.mark.parametrize(
126+
"test_data",
127+
[
128+
({"data-type": "string"}),
129+
([{"id": "1"}, {"id": "2"}]),
130+
({"id": "170141183460469231731687303715884105727"}),
131+
({}),
132+
({"nested": {"foo": {"bar": "baz"}}}),
133+
],
134+
ids=[
135+
"valid_dict",
136+
"list_of_dicts",
137+
"int128",
138+
"empty_object",
139+
"nested_structure",
140+
],
141+
)
142+
def test_composite_raw_decoder_json_parser(requests_mock, test_data):
143+
encodings = ["utf-8", "utf", "iso-8859-1"]
144+
for encoding in encodings:
145+
raw_data = json.dumps(test_data).encode(encoding=encoding)
146+
requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data)
147+
response = requests.get("https://airbyte.io/", stream=True)
148+
composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding=encoding))
149+
actual = list(composite_raw_decoder.decode(response))
150+
if isinstance(test_data, list):
151+
assert actual == test_data
152+
else:
153+
assert actual == [test_data]
154+
155+
156+
def test_composite_raw_decoder_orjson_parser_error(requests_mock):
157+
raw_data = json.dumps({"test": "test"}).encode("utf-8")
158+
requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data)
159+
response = requests.get("https://airbyte.io/", stream=True)
160+
161+
composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8"))
162+
163+
with patch("orjson.loads", side_effect=Exception("test")):
164+
assert [{"test": "test"}] == list(composite_raw_decoder.decode(response))
165+
166+
167+
def test_composite_raw_decoder_raises_traced_exception_when_both_parsers_fail(requests_mock):
168+
raw_data = json.dumps({"test": "test"}).encode("utf-8")
169+
requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data)
170+
response = requests.get("https://airbyte.io/", stream=True)
171+
172+
composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8"))
173+
174+
with patch("orjson.loads", side_effect=Exception("test")):
175+
with patch("json.loads", side_effect=Exception("test")):
176+
with pytest.raises(AirbyteTracedException):
177+
list(composite_raw_decoder.decode(response))

0 commit comments

Comments
 (0)