Skip to content

Commit fd44be1

Browse files
committed
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airbytehq/airbyte-python-cdk into lazebnyi/add-dynamic-schema-loader
2 parents 227325f + 13441ca commit fd44be1

File tree

8 files changed

+117
-8
lines changed

8 files changed

+117
-8
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,19 @@ definitions:
13311331
$parameters:
13321332
type: object
13331333
additionalProperties: true
1334+
ResponseToFileExtractor:
1335+
title: CSV To File Extractor
1336+
description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed.
1337+
type: object
1338+
required:
1339+
- type
1340+
properties:
1341+
type:
1342+
type: string
1343+
enum: [ResponseToFileExtractor]
1344+
$parameters:
1345+
type: object
1346+
additionalProperties: true
13341347
ExponentialBackoffStrategy:
13351348
title: Exponential Backoff
13361349
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
@@ -2766,6 +2779,12 @@ definitions:
27662779
anyOf:
27672780
- "$ref": "#/definitions/CustomRecordExtractor"
27682781
- "$ref": "#/definitions/DpathExtractor"
2782+
download_extractor:
2783+
description: Responsible for fetching the records from provided urls.
2784+
anyOf:
2785+
- "$ref": "#/definitions/CustomRecordExtractor"
2786+
- "$ref": "#/definitions/DpathExtractor"
2787+
- "$ref": "#/definitions/ResponseToFileExtractor"
27692788
creation_requester:
27702789
description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.
27712790
anyOf:
@@ -2820,6 +2839,16 @@ definitions:
28202839
- "$ref": "#/definitions/IterableDecoder"
28212840
- "$ref": "#/definitions/XmlDecoder"
28222841
- "$ref": "#/definitions/GzipJsonDecoder"
2842+
download_decoder:
2843+
title: Download Decoder
2844+
description: Component decoding the download response so records can be extracted.
2845+
anyOf:
2846+
- "$ref": "#/definitions/CustomDecoder"
2847+
- "$ref": "#/definitions/JsonDecoder"
2848+
- "$ref": "#/definitions/JsonlDecoder"
2849+
- "$ref": "#/definitions/IterableDecoder"
2850+
- "$ref": "#/definitions/XmlDecoder"
2851+
- "$ref": "#/definitions/GzipJsonDecoder"
28232852
$parameters:
28242853
type: object
28252854
additionalProperties: true

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import uuid
77
import zlib
88
from contextlib import closing
9+
from dataclasses import InitVar, dataclass
910
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple
1011

1112
import pandas as pd
@@ -19,6 +20,7 @@
1920
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10
2021

2122

23+
@dataclass
2224
class ResponseToFileExtractor(RecordExtractor):
2325
"""
2426
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
@@ -28,7 +30,9 @@ class ResponseToFileExtractor(RecordExtractor):
2830
a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
2931
"""
3032

31-
def __init__(self) -> None:
33+
parameters: InitVar[Mapping[str, Any]]
34+
35+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3236
self.logger = logging.getLogger("airbyte")
3337

3438
def _get_response_encoding(self, headers: Dict[str, Any]) -> str:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,11 @@ class DpathExtractor(BaseModel):
567567
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
568568

569569

570+
class ResponseToFileExtractor(BaseModel):
571+
type: Literal["ResponseToFileExtractor"]
572+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
573+
574+
570575
class ExponentialBackoffStrategy(BaseModel):
571576
type: Literal["ExponentialBackoffStrategy"]
572577
factor: Optional[Union[float, str]] = Field(
@@ -1835,6 +1840,9 @@ class AsyncRetriever(BaseModel):
18351840
...,
18361841
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
18371842
)
1843+
download_extractor: Optional[
1844+
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
1845+
] = Field(None, description="Responsible for fetching the records from provided urls.")
18381846
creation_requester: Union[CustomRequester, HttpRequester] = Field(
18391847
...,
18401848
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -1885,6 +1893,20 @@ class AsyncRetriever(BaseModel):
18851893
description="Component decoding the response so records can be extracted.",
18861894
title="Decoder",
18871895
)
1896+
download_decoder: Optional[
1897+
Union[
1898+
CustomDecoder,
1899+
JsonDecoder,
1900+
JsonlDecoder,
1901+
IterableDecoder,
1902+
XmlDecoder,
1903+
GzipJsonDecoder,
1904+
]
1905+
] = Field(
1906+
None,
1907+
description="Component decoding the download response so records can be extracted.",
1908+
title="Download Decoder",
1909+
)
18881910
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
18891911

18901912

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@
273273
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
274274
RequestPath as RequestPathModel,
275275
)
276+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
277+
ResponseToFileExtractor as ResponseToFileExtractorModel,
278+
)
276279
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
277280
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
278281
)
@@ -440,6 +443,7 @@ def _init_mappings(self) -> None:
440443
DefaultErrorHandlerModel: self.create_default_error_handler,
441444
DefaultPaginatorModel: self.create_default_paginator,
442445
DpathExtractorModel: self.create_dpath_extractor,
446+
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
443447
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
444448
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
445449
HttpRequesterModel: self.create_http_requester,
@@ -1471,6 +1475,13 @@ def create_dpath_extractor(
14711475
parameters=model.parameters or {},
14721476
)
14731477

1478+
def create_response_to_file_extractor(
1479+
self,
1480+
model: ResponseToFileExtractorModel,
1481+
**kwargs: Any,
1482+
) -> ResponseToFileExtractor:
1483+
return ResponseToFileExtractor(parameters=model.parameters or {})
1484+
14741485
@staticmethod
14751486
def create_exponential_backoff_strategy(
14761487
model: ExponentialBackoffStrategyModel, config: Config
@@ -2092,6 +2103,7 @@ def create_async_retriever(
20922103
model=model.record_selector,
20932104
config=config,
20942105
decoder=decoder,
2106+
name=name,
20952107
transformations=transformations,
20962108
client_side_incremental_sync=client_side_incremental_sync,
20972109
)
@@ -2109,16 +2121,36 @@ def create_async_retriever(
21092121
name=f"job polling - {name}",
21102122
)
21112123
job_download_components_name = f"job download - {name}"
2124+
download_decoder = (
2125+
self._create_component_from_model(model=model.download_decoder, config=config)
2126+
if model.download_decoder
2127+
else JsonDecoder(parameters={})
2128+
)
2129+
download_extractor = (
2130+
self._create_component_from_model(
2131+
model=model.download_extractor,
2132+
config=config,
2133+
decoder=download_decoder,
2134+
parameters=model.parameters,
2135+
)
2136+
if model.download_extractor
2137+
else DpathExtractor(
2138+
[],
2139+
config=config,
2140+
decoder=download_decoder,
2141+
parameters=model.parameters or {},
2142+
)
2143+
)
21122144
download_requester = self._create_component_from_model(
21132145
model=model.download_requester,
2114-
decoder=decoder,
2146+
decoder=download_decoder,
21152147
config=config,
21162148
name=job_download_components_name,
21172149
)
21182150
download_retriever = SimpleRetriever(
21192151
requester=download_requester,
21202152
record_selector=RecordSelector(
2121-
extractor=ResponseToFileExtractor(),
2153+
extractor=download_extractor,
21222154
name=name,
21232155
record_filter=None,
21242156
transformations=[],

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4242

4343
job_timeout: Optional[timedelta] = None
4444
record_extractor: RecordExtractor = field(
45-
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor()
45+
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
4646
)
4747

4848
def __post_init__(self) -> None:

unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
class ResponseToFileExtractorTest(TestCase):
1616
def setUp(self) -> None:
17-
self._extractor = ResponseToFileExtractor()
17+
self._extractor = ResponseToFileExtractor({})
1818
self._http_mocker = requests_mock.Mocker()
1919
self._http_mocker.__enter__()
2020

@@ -76,7 +76,7 @@ def large_event_response_fixture():
7676
@pytest.mark.limit_memory("20 MB")
7777
def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
7878
lines_in_response, file_path = large_events_response
79-
extractor = ResponseToFileExtractor()
79+
extractor = ResponseToFileExtractor({})
8080

8181
url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
8282
requests_mock.get(url, body=open(file_path, "rb"))

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44

55
# mypy: ignore-errors
66
import datetime
7-
from typing import Any, Mapping
7+
from typing import Any, Iterable, Mapping
88

99
import freezegun
1010
import pendulum
1111
import pytest
12+
import requests
1213

1314
from airbyte_cdk import AirbyteTracedException
1415
from airbyte_cdk.models import FailureType, Level
@@ -27,6 +28,7 @@
2728
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
2829
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
2930
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
31+
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
3032
from airbyte_cdk.sources.declarative.extractors.record_filter import (
3133
ClientSideIncrementalRecordFilterDecorator,
3234
)
@@ -47,6 +49,9 @@
4749
from airbyte_cdk.sources.declarative.models import (
4850
CustomPartitionRouter as CustomPartitionRouterModel,
4951
)
52+
from airbyte_cdk.sources.declarative.models import (
53+
CustomRecordExtractor as CustomRecordExtractorModel,
54+
)
5055
from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel
5156
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
5257
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
@@ -3271,3 +3276,20 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
32713276
"state_type": "date-range",
32723277
"legacy": {},
32733278
}
3279+
3280+
3281+
class CustomRecordExtractor(RecordExtractor):
3282+
def extract_records(
3283+
self,
3284+
response: requests.Response,
3285+
) -> Iterable[Mapping[str, Any]]:
3286+
yield from response.json()
3287+
3288+
3289+
def test_create_custom_record_extractor():
3290+
definition = {
3291+
"type": "CustomRecordExtractor",
3292+
"class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.CustomRecordExtractor",
3293+
}
3294+
component = factory.create_component(CustomRecordExtractorModel, definition, {})
3295+
assert isinstance(component, CustomRecordExtractor)

unit_tests/sources/declarative/requesters/test_http_job_repository.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def setUp(self) -> None:
9595
stream_response=True,
9696
),
9797
record_selector=RecordSelector(
98-
extractor=ResponseToFileExtractor(),
98+
extractor=ResponseToFileExtractor({}),
9999
record_filter=None,
100100
transformations=[],
101101
schema_normalization=TypeTransformer(TransformConfig.NoTransform),

0 commit comments

Comments
 (0)