Skip to content

Commit a402288

Browse files
author
Baz
authored
fix: (CDK) (AsyncRetriever) - Improve UX on variable naming and interpolation (#368)
1 parent 4dbb6fe commit a402288

File tree

9 files changed

+71
-42
lines changed

9 files changed

+71
-42
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1779,6 +1779,9 @@ definitions:
17791779
- stream_interval
17801780
- stream_partition
17811781
- stream_slice
1782+
- creation_response
1783+
- polling_response
1784+
- download_target
17821785
examples:
17831786
- "/products"
17841787
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
@@ -3223,7 +3226,7 @@ definitions:
32233226
- polling_requester
32243227
- download_requester
32253228
- status_extractor
3226-
- urls_extractor
3229+
- download_target_extractor
32273230
properties:
32283231
type:
32293232
type: string
@@ -3240,7 +3243,7 @@ definitions:
32403243
anyOf:
32413244
- "$ref": "#/definitions/CustomRecordExtractor"
32423245
- "$ref": "#/definitions/DpathExtractor"
3243-
urls_extractor:
3246+
download_target_extractor:
32443247
description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.
32453248
anyOf:
32463249
- "$ref": "#/definitions/CustomRecordExtractor"
@@ -3261,7 +3264,7 @@ definitions:
32613264
anyOf:
32623265
- "$ref": "#/definitions/CustomRequester"
32633266
- "$ref": "#/definitions/HttpRequester"
3264-
url_requester:
3267+
download_target_requester:
32653268
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
32663269
anyOf:
32673270
- "$ref": "#/definitions/CustomRequester"
@@ -3667,6 +3670,21 @@ interpolation:
36673670
self: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=
36683671
next: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2
36693672
count: 82
3673+
- title: creation_response
3674+
description: The response received from the creation_requester in the AsyncRetriever component.
3675+
type: object
3676+
examples:
3677+
- id: "1234"
3678+
- title: polling_response
3679+
description: The response received from the polling_requester in the AsyncRetriever component.
3680+
type: object
3681+
examples:
3682+
- id: "1234"
3683+
- title: download_target
3684+
description: The `URL` received from the polling_requester in the AsyncRetriever with jobStatus as `COMPLETED`.
3685+
type: string
3686+
examples:
3687+
- "https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2&filename=xxx_yyy_zzz.csv"
36703688
- title: stream_interval
36713689
description: The current stream interval being processed. The keys are defined by the incremental sync component. Default keys are `start_time` and `end_time`.
36723690
type: object

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2263,7 +2263,7 @@ class AsyncRetriever(BaseModel):
22632263
status_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
22642264
..., description="Responsible for fetching the actual status of the async job."
22652265
)
2266-
urls_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
2266+
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
22672267
...,
22682268
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
22692269
)
@@ -2278,7 +2278,7 @@ class AsyncRetriever(BaseModel):
22782278
...,
22792279
description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.",
22802280
)
2281-
url_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
2281+
download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
22822282
None,
22832283
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
22842284
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2744,32 +2744,32 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
27442744
if model.delete_requester
27452745
else None
27462746
)
2747-
url_requester = (
2747+
download_target_requester = (
27482748
self._create_component_from_model(
2749-
model=model.url_requester,
2749+
model=model.download_target_requester,
27502750
decoder=decoder,
27512751
config=config,
27522752
name=f"job extract_url - {name}",
27532753
)
2754-
if model.url_requester
2754+
if model.download_target_requester
27552755
else None
27562756
)
27572757
status_extractor = self._create_component_from_model(
27582758
model=model.status_extractor, decoder=decoder, config=config, name=name
27592759
)
2760-
urls_extractor = self._create_component_from_model(
2761-
model=model.urls_extractor, decoder=decoder, config=config, name=name
2760+
download_target_extractor = self._create_component_from_model(
2761+
model=model.download_target_extractor, decoder=decoder, config=config, name=name
27622762
)
27632763
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
27642764
creation_requester=creation_requester,
27652765
polling_requester=polling_requester,
27662766
download_retriever=download_retriever,
2767-
url_requester=url_requester,
2767+
download_target_requester=download_target_requester,
27682768
abort_requester=abort_requester,
27692769
delete_requester=delete_requester,
27702770
status_extractor=status_extractor,
27712771
status_mapping=self._create_async_job_status_mapping(model.status_mapping, config),
2772-
urls_extractor=urls_extractor,
2772+
download_target_extractor=download_target_extractor,
27732773
)
27742774

27752775
async_job_partition_router = AsyncJobPartitionRouter(

airbyte_cdk/sources/declarative/requesters/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# AsyncHttpJobRepository sequence diagram
22

33
- Components marked as optional are not required and can be ignored.
4-
- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response`
5-
- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice
4+
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
5+
- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice
66

77
```mermaid
88
---
@@ -12,7 +12,7 @@ sequenceDiagram
1212
participant AsyncHttpJobRepository as AsyncOrchestrator
1313
participant CreationRequester as creation_requester
1414
participant PollingRequester as polling_requester
15-
participant UrlRequester as url_requester (Optional)
15+
participant UrlRequester as download_target_requester (Optional)
1616
participant DownloadRetriever as download_retriever
1717
participant AbortRequester as abort_requester (Optional)
1818
participant DeleteRequester as delete_requester (Optional)
@@ -25,14 +25,14 @@ sequenceDiagram
2525
2626
loop Poll for job status
2727
AsyncHttpJobRepository ->> PollingRequester: Check job status
28-
PollingRequester ->> Reporting Server: Status request (interpolation_context: `create_job_response`)
28+
PollingRequester ->> Reporting Server: Status request (interpolation_context: `creation_response`)
2929
Reporting Server -->> PollingRequester: Status response
3030
PollingRequester -->> AsyncHttpJobRepository: Job status
3131
end
3232
3333
alt Status: Ready
3434
AsyncHttpJobRepository ->> UrlRequester: Request download URLs (if applicable)
35-
UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_job_response`)
35+
UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_response`)
3636
Reporting Server -->> UrlRequester: Download URLs
3737
UrlRequester -->> AsyncHttpJobRepository: Download URLs
3838

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4343
delete_requester: Optional[Requester]
4444
status_extractor: DpathExtractor
4545
status_mapping: Mapping[str, AsyncJobStatus]
46-
urls_extractor: DpathExtractor
46+
download_target_extractor: DpathExtractor
4747

4848
job_timeout: Optional[timedelta] = None
4949
record_extractor: RecordExtractor = field(
5050
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
5151
)
52-
url_requester: Optional[Requester] = (
52+
download_target_requester: Optional[Requester] = (
5353
None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
5454
)
5555

@@ -211,12 +211,15 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
211211
212212
"""
213213

214-
for url in self._get_download_url(job):
214+
for target_url in self._get_download_targets(job):
215215
job_slice = job.job_parameters()
216216
stream_slice = StreamSlice(
217217
partition=job_slice.partition,
218218
cursor_slice=job_slice.cursor_slice,
219-
extra_fields={**job_slice.extra_fields, "url": url},
219+
extra_fields={
220+
**job_slice.extra_fields,
221+
"download_target": target_url,
222+
},
220223
)
221224
for message in self.download_retriever.read_records({}, stream_slice):
222225
if isinstance(message, Record):
@@ -269,27 +272,29 @@ def _clean_up_job(self, job_id: str) -> None:
269272
del self._polling_job_response_by_id[job_id]
270273

271274
def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
275+
creation_response = self._create_job_response_by_id[job.api_job_id()].json()
272276
stream_slice = StreamSlice(
273-
partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]},
277+
partition={},
274278
cursor_slice={},
279+
extra_fields={"creation_response": creation_response},
275280
)
276281
return stream_slice
277282

278-
def _get_download_url(self, job: AsyncJob) -> Iterable[str]:
279-
if not self.url_requester:
283+
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
284+
if not self.download_target_requester:
280285
url_response = self._polling_job_response_by_id[job.api_job_id()]
281286
else:
287+
polling_response = self._polling_job_response_by_id[job.api_job_id()].json()
282288
stream_slice: StreamSlice = StreamSlice(
283-
partition={
284-
"polling_job_response": self._polling_job_response_by_id[job.api_job_id()]
285-
},
289+
partition={},
286290
cursor_slice={},
291+
extra_fields={"polling_response": polling_response},
287292
)
288-
url_response = self.url_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect url_requester to always be presented, otherwise raise an exception as we cannot proceed with the report
293+
url_response = self.download_target_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect download_target_requester to always be presented, otherwise raise an exception as we cannot proceed with the report
289294
if not url_response:
290295
raise AirbyteTracedException(
291-
internal_message="Always expect a response or an exception from url_requester",
296+
internal_message="Always expect a response or an exception from download_target_requester",
292297
failure_type=FailureType.system_error,
293298
)
294299

295-
yield from self.urls_extractor.extract_records(url_response) # type: ignore # we expect urls_extractor to always return list of strings
300+
yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings

airbyte_cdk/sources/declarative/requesters/http_requester.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
8585
self._parameters = parameters
8686

8787
if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
88-
backoff_strategies = self.error_handler.backoff_strategies
88+
backoff_strategies = self.error_handler.backoff_strategies # type: ignore
8989
else:
9090
backoff_strategies = None
9191

@@ -125,6 +125,12 @@ def get_path(
125125
kwargs = {
126126
"stream_slice": stream_slice,
127127
"next_page_token": next_page_token,
128+
# update the interpolation context with extra fields, if passed.
129+
**(
130+
stream_slice.extra_fields
131+
if stream_slice is not None and hasattr(stream_slice, "extra_fields")
132+
else {}
133+
),
128134
}
129135
path = str(self._path.eval(self.config, **kwargs))
130136
return path.lstrip("/")

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3607,15 +3607,15 @@ def test_create_async_retriever():
36073607
"timeout": ["timeout"],
36083608
"completed": ["ready"],
36093609
},
3610-
"urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
3610+
"download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
36113611
"record_selector": {
36123612
"type": "RecordSelector",
36133613
"extractor": {"type": "DpathExtractor", "field_path": ["data"]},
36143614
},
36153615
"status_extractor": {"type": "DpathExtractor", "field_path": ["status"]},
36163616
"polling_requester": {
36173617
"type": "HttpRequester",
3618-
"path": "/v3/marketing/contacts/exports/{{stream_slice['create_job_response'].json()['id'] }}",
3618+
"path": "/v3/marketing/contacts/exports/{{creation_response['id'] }}",
36193619
"url_base": "https://api.sendgrid.com",
36203620
"http_method": "GET",
36213621
"authenticator": {
@@ -3635,19 +3635,19 @@ def test_create_async_retriever():
36353635
},
36363636
"download_requester": {
36373637
"type": "HttpRequester",
3638-
"path": "{{stream_slice['url']}}",
3638+
"path": "{{download_target}}",
36393639
"url_base": "",
36403640
"http_method": "GET",
36413641
},
36423642
"abort_requester": {
36433643
"type": "HttpRequester",
3644-
"path": "{{stream_slice['url']}}/abort",
3644+
"path": "{{download_target}}/abort",
36453645
"url_base": "",
36463646
"http_method": "POST",
36473647
},
36483648
"delete_requester": {
36493649
"type": "HttpRequester",
3650-
"path": "{{stream_slice['url']}}",
3650+
"path": "{{download_target}}",
36513651
"url_base": "",
36523652
"http_method": "POST",
36533653
},
@@ -3681,7 +3681,7 @@ def test_create_async_retriever():
36813681
assert job_repository.abort_requester
36823682
assert job_repository.delete_requester
36833683
assert job_repository.status_extractor
3684-
assert job_repository.urls_extractor
3684+
assert job_repository.download_target_extractor
36853685

36863686
selector = component.record_selector
36873687
extractor = selector.extractor

unit_tests/sources/declarative/requesters/test_http_job_repository.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def setUp(self) -> None:
6969
self._polling_job_requester = HttpRequester(
7070
name="stream <name>: polling",
7171
url_base=_URL_BASE,
72-
path=_EXPORT_PATH + "/{{stream_slice['create_job_response'].json()['id']}}",
72+
path=_EXPORT_PATH + "/{{creation_response['id']}}",
7373
error_handler=error_handler,
7474
http_method=HttpMethod.GET,
7575
config=_ANY_CONFIG,
@@ -84,7 +84,7 @@ def setUp(self) -> None:
8484
requester=HttpRequester(
8585
name="stream <name>: fetch_result",
8686
url_base="",
87-
path="{{stream_slice.extra_fields['url']}}",
87+
path="{{download_target}}",
8888
error_handler=error_handler,
8989
http_method=HttpMethod.GET,
9090
config=_ANY_CONFIG,
@@ -143,7 +143,7 @@ def setUp(self) -> None:
143143
"failure": AsyncJobStatus.FAILED,
144144
"pending": AsyncJobStatus.RUNNING,
145145
},
146-
urls_extractor=DpathExtractor(
146+
download_target_extractor=DpathExtractor(
147147
decoder=JsonDecoder(parameters={}),
148148
field_path=["urls"],
149149
config={},

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,15 @@
299299
"timeout": ["timeout"],
300300
"completed": ["ready"],
301301
},
302-
"urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
302+
"download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
303303
"record_selector": {
304304
"type": "RecordSelector",
305305
"extractor": {"type": "DpathExtractor", "field_path": []},
306306
},
307307
"status_extractor": {"type": "DpathExtractor", "field_path": ["status"]},
308308
"polling_requester": {
309309
"type": "HttpRequester",
310-
"path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}",
310+
"path": "/async_job/{{creation_response['id'] }}",
311311
"http_method": "GET",
312312
"authenticator": {
313313
"type": "BearerAuthenticator",

0 commit comments

Comments
 (0)