Skip to content

Commit d060ff0

Browse files
committed
Merge branch 'main' into issue-10552/introduce-concurrent-stream-slicer
2 parents d9bce25 + 39786d2 commit d060ff0

File tree

11 files changed

+497
-281
lines changed

11 files changed

+497
-281
lines changed

.github/workflows/pypi_publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jobs:
4141
file_glob: true
4242

4343
- name: Publish to PyPI (${{vars.PYPI_PUBLISH_URL}})
44-
uses: pypa/gh-action-pypi-publish@v1.10.3
44+
uses: pypa/gh-action-pypi-publish@v1.12.2
4545
with:
4646
# Can be toggled at the repository level between `https://upload.pypi.org/legacy/` and `https://test.pypi.org/legacy/`
4747
repository-url: ${{vars.PYPI_PUBLISH_URL}}

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
1717
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
1818
from airbyte_cdk.sources.declarative.extractors import RecordSelector
19+
from airbyte_cdk.sources.declarative.extractors.record_filter import (
20+
ClientSideIncrementalRecordFilterDecorator,
21+
)
1922
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
2023
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
2124
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
@@ -284,6 +287,9 @@ def _stream_supports_concurrent_partition_processing(
284287
if isinstance(record_selector, RecordSelector):
285288
if (
286289
record_selector.record_filter
290+
and not isinstance(
291+
record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
292+
)
287293
and "stream_state" in record_selector.record_filter.condition
288294
):
289295
self.logger.warning(

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,6 +1750,45 @@ definitions:
17501750
type:
17511751
type: string
17521752
enum: [XmlDecoder]
1753+
CustomDecoder:
1754+
title: Custom Decoder
1755+
description: Use this to implement custom decoder logic.
1756+
type: object
1757+
additionalProperties: true
1758+
required:
1759+
- type
1760+
- class_name
1761+
properties:
1762+
type:
1763+
type: string
1764+
enum: [CustomDecoder]
1765+
class_name:
1766+
title: Class Name
1767+
description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.
1768+
type: string
1769+
additionalProperties: true
1770+
examples:
1771+
- "source_amazon_ads.components.GzipJsonlDecoder"
1772+
$parameters:
1773+
type: object
1774+
additionalProperties: true
1775+
GzipJsonDecoder:
1776+
title: GzipJson Decoder
1777+
description: Use this if the response is Gzip compressed Json.
1778+
type: object
1779+
additionalProperties: true
1780+
required:
1781+
- type
1782+
properties:
1783+
type:
1784+
type: string
1785+
enum: [GzipJsonDecoder]
1786+
encoding:
1787+
type: string
1788+
default: utf-8
1789+
$parameters:
1790+
type: object
1791+
additionalProperties: true
17531792
ListPartitionRouter:
17541793
title: List Partition Router
17551794
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
@@ -2404,10 +2443,12 @@ definitions:
24042443
title: Decoder
24052444
description: Component decoding the response so records can be extracted.
24062445
anyOf:
2446+
- "$ref": "#/definitions/CustomDecoder"
24072447
- "$ref": "#/definitions/JsonDecoder"
24082448
- "$ref": "#/definitions/JsonlDecoder"
24092449
- "$ref": "#/definitions/IterableDecoder"
24102450
- "$ref": "#/definitions/XmlDecoder"
2451+
- "$ref": "#/definitions/GzipJsonDecoder"
24112452
$parameters:
24122453
type: object
24132454
additionalProperties: true
@@ -2520,10 +2561,12 @@ definitions:
25202561
title: Decoder
25212562
description: Component decoding the response so records can be extracted.
25222563
anyOf:
2564+
- "$ref": "#/definitions/CustomDecoder"
25232565
- "$ref": "#/definitions/JsonDecoder"
25242566
- "$ref": "#/definitions/JsonlDecoder"
25252567
- "$ref": "#/definitions/IterableDecoder"
25262568
- "$ref": "#/definitions/XmlDecoder"
2569+
- "$ref": "#/definitions/GzipJsonDecoder"
25272570
$parameters:
25282571
type: object
25292572
additionalProperties: true

airbyte_cdk/sources/declarative/decoders/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
#
44

55
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
6-
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder
6+
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder, GzipJsonDecoder
77
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
88
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
99
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
1010

11-
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
11+
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]

airbyte_cdk/sources/declarative/decoders/json_decoder.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
import codecs
55
import logging
66
from dataclasses import InitVar, dataclass
7-
from typing import Any, Generator, Mapping
7+
from gzip import decompress
8+
from typing import Any, Generator, Mapping, MutableMapping, List, Optional
89

910
import requests
1011
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
11-
from orjson import orjson
12+
import orjson
1213

1314
logger = logging.getLogger("airbyte")
1415

@@ -24,24 +25,32 @@ class JsonDecoder(Decoder):
2425
def is_stream_response(self) -> bool:
2526
return False
2627

27-
def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
28+
def decode(
29+
self, response: requests.Response
30+
) -> Generator[MutableMapping[str, Any], None, None]:
2831
"""
2932
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
3033
"""
3134
try:
3235
body_json = response.json()
33-
if not isinstance(body_json, list):
34-
body_json = [body_json]
35-
if len(body_json) == 0:
36-
yield {}
37-
else:
38-
yield from body_json
36+
yield from self.parse_body_json(body_json)
3937
except requests.exceptions.JSONDecodeError:
4038
logger.warning(
4139
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
4240
)
4341
yield {}
4442

43+
@staticmethod
44+
def parse_body_json(
45+
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
46+
) -> Generator[MutableMapping[str, Any], None, None]:
47+
if not isinstance(body_json, list):
48+
body_json = [body_json]
49+
if len(body_json) == 0:
50+
yield {}
51+
else:
52+
yield from body_json
53+
4554

4655
@dataclass
4756
class IterableDecoder(Decoder):
@@ -54,7 +63,9 @@ class IterableDecoder(Decoder):
5463
def is_stream_response(self) -> bool:
5564
return True
5665

57-
def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
66+
def decode(
67+
self, response: requests.Response
68+
) -> Generator[MutableMapping[str, Any], None, None]:
5869
for line in response.iter_lines():
5970
yield {"record": line.decode()}
6071

@@ -70,8 +81,30 @@ class JsonlDecoder(Decoder):
7081
def is_stream_response(self) -> bool:
7182
return True
7283

73-
def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
84+
def decode(
85+
self, response: requests.Response
86+
) -> Generator[MutableMapping[str, Any], None, None]:
7487
# TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional?
7588
# https://github.com/airbytehq/airbyte-internal-issues/issues/8436
7689
for record in response.iter_lines():
7790
yield orjson.loads(record)
91+
92+
93+
@dataclass
94+
class GzipJsonDecoder(JsonDecoder):
95+
encoding: Optional[str]
96+
97+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
98+
if self.encoding:
99+
try:
100+
codecs.lookup(self.encoding)
101+
except LookupError:
102+
raise ValueError(
103+
f"Invalid encoding '{self.encoding}'. Please check provided encoding"
104+
)
105+
106+
def decode(
107+
self, response: requests.Response
108+
) -> Generator[MutableMapping[str, Any], None, None]:
109+
raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8")
110+
yield from self.parse_body_json(orjson.loads(raw_string))

airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
66
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
7-
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor, SubstreamPartitionRouter
7+
from airbyte_cdk.sources.declarative.models import (
8+
DatetimeBasedCursor,
9+
SubstreamPartitionRouter,
10+
CustomIncrementalSync,
11+
)
812
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig
913

1014

@@ -32,7 +36,7 @@ class LegacyToPerPartitionStateMigration(StateMigration):
3236
def __init__(
3337
self,
3438
partition_router: SubstreamPartitionRouter,
35-
cursor: DatetimeBasedCursor,
39+
cursor: CustomIncrementalSync | DatetimeBasedCursor,
3640
config: Mapping[str, Any],
3741
parameters: Mapping[str, Any],
3842
):
@@ -64,7 +68,7 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
6468
return False
6569

6670
# There is exactly one parent stream
67-
number_of_parent_streams = len(self._partition_router.parent_stream_configs)
71+
number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed
6872
if number_of_parent_streams != 1:
6973
# There should be exactly one parent stream
7074
return False

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,29 @@ class XmlDecoder(BaseModel):
687687
type: Literal["XmlDecoder"]
688688

689689

690+
class CustomDecoder(BaseModel):
691+
class Config:
692+
extra = Extra.allow
693+
694+
type: Literal["CustomDecoder"]
695+
class_name: str = Field(
696+
...,
697+
description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.",
698+
examples=["source_amazon_ads.components.GzipJsonlDecoder"],
699+
title="Class Name",
700+
)
701+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
702+
703+
704+
class GzipJsonDecoder(BaseModel):
705+
class Config:
706+
extra = Extra.allow
707+
708+
type: Literal["GzipJsonDecoder"]
709+
encoding: Optional[str] = "utf-8"
710+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
711+
712+
690713
class MinMaxDatetime(BaseModel):
691714
type: Literal["MinMaxDatetime"]
692715
datetime: str = Field(
@@ -1620,7 +1643,16 @@ class SimpleRetriever(BaseModel):
16201643
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
16211644
title="Partition Router",
16221645
)
1623-
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = Field(
1646+
decoder: Optional[
1647+
Union[
1648+
CustomDecoder,
1649+
JsonDecoder,
1650+
JsonlDecoder,
1651+
IterableDecoder,
1652+
XmlDecoder,
1653+
GzipJsonDecoder,
1654+
]
1655+
] = Field(
16241656
None,
16251657
description="Component decoding the response so records can be extracted.",
16261658
title="Decoder",
@@ -1680,7 +1712,16 @@ class AsyncRetriever(BaseModel):
16801712
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
16811713
title="Partition Router",
16821714
)
1683-
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = Field(
1715+
decoder: Optional[
1716+
Union[
1717+
CustomDecoder,
1718+
JsonDecoder,
1719+
JsonlDecoder,
1720+
IterableDecoder,
1721+
XmlDecoder,
1722+
GzipJsonDecoder,
1723+
]
1724+
] = Field(
16841725
None,
16851726
description="Component decoding the response so records can be extracted.",
16861727
title="Decoder",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
5959
from airbyte_cdk.sources.declarative.decoders import (
6060
Decoder,
61+
GzipJsonDecoder,
6162
IterableDecoder,
6263
JsonDecoder,
6364
JsonlDecoder,
@@ -134,6 +135,9 @@
134135
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
135136
CustomBackoffStrategy as CustomBackoffStrategyModel,
136137
)
138+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
139+
CustomDecoder as CustomDecoderModel,
140+
)
137141
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
138142
CustomErrorHandler as CustomErrorHandlerModel,
139143
)
@@ -182,6 +186,9 @@
182186
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
183187
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
184188
)
189+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
190+
GzipJsonDecoder as GzipJsonDecoderModel,
191+
)
185192
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
186193
HttpRequester as HttpRequesterModel,
187194
)
@@ -402,6 +409,7 @@ def _init_mappings(self) -> None:
402409
CursorPaginationModel: self.create_cursor_pagination,
403410
CustomAuthenticatorModel: self.create_custom_component,
404411
CustomBackoffStrategyModel: self.create_custom_component,
412+
CustomDecoderModel: self.create_custom_component,
405413
CustomErrorHandlerModel: self.create_custom_component,
406414
CustomIncrementalSyncModel: self.create_custom_component,
407415
CustomRecordExtractorModel: self.create_custom_component,
@@ -425,6 +433,7 @@ def _init_mappings(self) -> None:
425433
InlineSchemaLoaderModel: self.create_inline_schema_loader,
426434
JsonDecoderModel: self.create_json_decoder,
427435
JsonlDecoderModel: self.create_jsonl_decoder,
436+
GzipJsonDecoderModel: self.create_gzipjson_decoder,
428437
KeysToLowerModel: self.create_keys_to_lower_transformation,
429438
IterableDecoderModel: self.create_iterable_decoder,
430439
XmlDecoderModel: self.create_xml_decoder,
@@ -619,11 +628,16 @@ def create_legacy_to_per_partition_state_migration(
619628
"LegacyToPerPartitionStateMigrations can only be applied with a parent stream configuration."
620629
)
621630

631+
if not hasattr(declarative_stream, "incremental_sync"):
632+
raise ValueError(
633+
"LegacyToPerPartitionStateMigrations can only be applied with an incremental_sync configuration."
634+
)
635+
622636
return LegacyToPerPartitionStateMigration(
623-
declarative_stream.retriever.partition_router,
624-
declarative_stream.incremental_sync,
637+
partition_router, # type: ignore # was already checked above
638+
declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams.
625639
config,
626-
declarative_stream.parameters,
640+
declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any]
627641
) # type: ignore # The retriever type was already checked
628642

629643
def create_session_token_authenticator(
@@ -1548,6 +1562,12 @@ def create_iterable_decoder(
15481562
def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder:
15491563
return XmlDecoder(parameters={})
15501564

1565+
@staticmethod
1566+
def create_gzipjson_decoder(
1567+
model: GzipJsonDecoderModel, config: Config, **kwargs: Any
1568+
) -> GzipJsonDecoder:
1569+
return GzipJsonDecoder(parameters={}, encoding=model.encoding)
1570+
15511571
@staticmethod
15521572
def create_json_file_schema_loader(
15531573
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any

0 commit comments

Comments
 (0)