Skip to content

Commit 7fdd067

Browse files
author
Oleksandr Bazarnov
committed
Merge remote-tracking branch 'origin/main' into baz/cdk/add-async-retriever-operation-auxiliary-requests
2 parents 11d3d0c + c3efa4c commit 7fdd067

File tree

7 files changed

+48
-13
lines changed

7 files changed

+48
-13
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from airbyte_cdk.sources.source import TState
4545
from airbyte_cdk.sources.streams import Stream
4646
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
47+
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
4748
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
4849
AlwaysAvailableAvailabilityStrategy,
4950
)
@@ -118,6 +119,12 @@ def __init__(
118119
message_repository=self.message_repository,
119120
)
120121

122+
# TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
123+
@property
124+
def is_partially_declarative(self) -> bool:
125+
"""This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
126+
return False
127+
121128
def read(
122129
self,
123130
logger: logging.Logger,
@@ -369,6 +376,14 @@ def _group_streams(
369376
)
370377
else:
371378
synchronous_streams.append(declarative_stream)
379+
# TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
380+
# Condition below needs to ensure that concurrent support is not lost for sources that already support
381+
# it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
382+
elif (
383+
isinstance(declarative_stream, AbstractStreamFacade)
384+
and self.is_partially_declarative
385+
):
386+
concurrent_streams.append(declarative_stream.get_underlying_stream())
372387
else:
373388
synchronous_streams.append(declarative_stream)
374389

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1490,7 +1490,11 @@ definitions:
14901490
limit:
14911491
title: Limit
14921492
description: The maximum number of calls allowed within the interval.
1493-
type: integer
1493+
anyOf:
1494+
- type: integer
1495+
- type: string
1496+
interpolation_context:
1497+
- config
14941498
interval:
14951499
title: Interval
14961500
description: The time interval for the rate limit.

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ class CsvParser(Parser):
107107
encoding: Optional[str] = "utf-8"
108108
delimiter: Optional[str] = ","
109109

110+
def _get_delimiter(self) -> Optional[str]:
111+
"""
112+
Get delimiter from the configuration. Check for the escape character and decode it.
113+
"""
114+
if self.delimiter is not None:
115+
if self.delimiter.startswith("\\"):
116+
self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape")
117+
118+
return self.delimiter
119+
110120
def parse(
111121
self,
112122
data: BufferedIOBase,
@@ -115,8 +125,9 @@ def parse(
115125
Parse CSV data from decompressed bytes.
116126
"""
117127
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
118-
reader = csv.DictReader(text_data, delimiter=self.delimiter or ",")
119-
yield from reader
128+
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
129+
for row in reader:
130+
yield row
120131

121132

122133
@dataclass

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ class Rate(BaseModel):
646646
class Config:
647647
extra = Extra.allow
648648

649-
limit: int = Field(
649+
limit: Union[int, str] = Field(
650650
...,
651651
description="The maximum number of calls allowed within the interval.",
652652
title="Limit",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2091,10 +2091,10 @@ def create_dynamic_schema_loader(
20912091
def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> Decoder:
20922092
return JsonDecoder(parameters={})
20932093

2094-
@staticmethod
2095-
def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder:
2094+
def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder:
20962095
return CompositeRawDecoder(
2097-
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
2096+
parser=ModelToComponentFactory._get_parser(model, config),
2097+
stream_response=False if self._emit_connector_builder_messages else True,
20982098
)
20992099

21002100
@staticmethod
@@ -2103,10 +2103,12 @@ def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any
21032103
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
21042104
)
21052105

2106-
@staticmethod
2107-
def create_gzip_decoder(model: GzipDecoderModel, config: Config, **kwargs: Any) -> Decoder:
2106+
def create_gzip_decoder(
2107+
self, model: GzipDecoderModel, config: Config, **kwargs: Any
2108+
) -> Decoder:
21082109
return CompositeRawDecoder(
2109-
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
2110+
parser=ModelToComponentFactory._get_parser(model, config),
2111+
stream_response=False if self._emit_connector_builder_messages else True,
21102112
)
21112113

21122114
@staticmethod
@@ -3024,8 +3026,9 @@ def create_unlimited_call_rate_policy(
30243026
)
30253027

30263028
def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate:
3029+
interpolated_limit = InterpolatedString.create(str(model.limit), parameters={})
30273030
return Rate(
3028-
limit=model.limit,
3031+
limit=int(interpolated_limit.eval(config=config)),
30293032
interval=parse_duration(model.interval),
30303033
)
30313034

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str):
6262
)
6363
response = requests.get("https://airbyte.io/", stream=True)
6464

65-
parser = GzipParser(inner_parser=CsvParser(encoding=encoding, delimiter="\t"))
65+
# the delimiter is set to `\\t` intentionally to test the parsing logic here
66+
parser = GzipParser(inner_parser=CsvParser(encoding=encoding, delimiter="\\t"))
67+
6668
composite_raw_decoder = CompositeRawDecoder(parser=parser)
6769
counter = 0
6870
for _ in composite_raw_decoder.decode(response):

unit_tests/sources/declarative/requesters/test_http_requester.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any
946946
)
947947

948948

949-
def test_http_requester_with_mock_apibudget(http_requester_factory, monkeypatch):
949+
def test_http_requester_with_mock_api_budget(http_requester_factory, monkeypatch):
950950
mock_budget = MagicMock(spec=HttpAPIBudget)
951951

952952
requester = http_requester_factory(

0 commit comments

Comments
 (0)