Skip to content

Commit b3d46aa

Browse files
author
Oleksandr Bazarnov
committed
add the request / response to each async operation supported
1 parent 978be1b commit b3d46aa

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def _read_with_chunks(
136136
"""
137137

138138
try:
139+
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
139140
with open(path, "r", encoding=file_encoding) as data:
140141
chunks = pd.read_csv(
141142
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
)
2424
from airbyte_cdk.sources.declarative.requesters.requester import Requester
2525
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
26+
from airbyte_cdk.sources.http_logger import format_http_message
2627
from airbyte_cdk.sources.types import Record, StreamSlice
2728
from airbyte_cdk.utils import AirbyteTracedException
2829

@@ -71,7 +72,15 @@ def _get_validated_polling_response(self, stream_slice: StreamSlice) -> requests
7172
"""
7273

7374
polling_response: Optional[requests.Response] = self.polling_requester.send_request(
74-
stream_slice=stream_slice
75+
stream_slice=stream_slice,
76+
log_formatter=lambda polling_response: format_http_message(
77+
response=polling_response,
78+
title="Async Job -- Polling",
79+
description="Poll the status of the server-side async job.",
80+
stream_name=None,
81+
is_auxiliary=True,
82+
type="ASYNC_POLL",
83+
),
7584
)
7685
if polling_response is None:
7786
raise AirbyteTracedException(
@@ -118,8 +127,17 @@ def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> request
118127
"""
119128

120129
response: Optional[requests.Response] = self.creation_requester.send_request(
121-
stream_slice=stream_slice
130+
stream_slice=stream_slice,
131+
log_formatter=lambda response: format_http_message(
132+
response=response,
133+
title="Async Job -- Create",
134+
description="Create the server-side async job.",
135+
stream_name=None,
136+
is_auxiliary=True,
137+
type="ASYNC_CREATE",
138+
),
122139
)
140+
123141
if not response:
124142
raise AirbyteTracedException(
125143
internal_message="Always expect a response or an exception from creation_requester",
@@ -217,13 +235,33 @@ def abort(self, job: AsyncJob) -> None:
217235
if not self.abort_requester:
218236
return
219237

220-
self.abort_requester.send_request(stream_slice=self._get_create_job_stream_slice(job))
238+
abort_response = self.abort_requester.send_request(
239+
stream_slice=self._get_create_job_stream_slice(job),
240+
log_formatter=lambda abort_response: format_http_message(
241+
response=abort_response,
242+
title="Async Job -- Abort",
243+
description="Abort the running server-side async job.",
244+
stream_name=None,
245+
is_auxiliary=True,
246+
type="ASYNC_ABORT",
247+
),
248+
)
221249

222250
def delete(self, job: AsyncJob) -> None:
223251
if not self.delete_requester:
224252
return
225253

226-
self.delete_requester.send_request(stream_slice=self._get_create_job_stream_slice(job))
254+
delete_job_reponse = self.delete_requester.send_request(
255+
stream_slice=self._get_create_job_stream_slice(job),
256+
log_formatter=lambda delete_job_reponse: format_http_message(
257+
response=delete_job_reponse,
258+
title="Async Job -- Delete",
259+
description="Delete the specified job from the list of Jobs.",
260+
stream_name=None,
261+
is_auxiliary=True,
262+
type="ASYNC_DELETE",
263+
),
264+
)
227265
self._clean_up_job(job.api_job_id())
228266

229267
def _clean_up_job(self, job_id: str) -> None:

0 commit comments

Comments
 (0)