Skip to content

Commit 1ad437e

Browse files
author
Baz
authored
fix: (CDK) (ConnectorBuilder) - Add auxiliary requests to slice; support TestRead for AsyncRetriever (part 1/2) (#355)
1 parent 4991e07 commit 1ad437e

File tree

10 files changed

+216
-63
lines changed

10 files changed

+216
-63
lines changed

airbyte_cdk/connector_builder/models.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,6 @@ class HttpRequest:
2121
body: Optional[str] = None
2222

2323

24-
@dataclass
25-
class StreamReadPages:
26-
records: List[object]
27-
request: Optional[HttpRequest] = None
28-
response: Optional[HttpResponse] = None
29-
30-
31-
@dataclass
32-
class StreamReadSlices:
33-
pages: List[StreamReadPages]
34-
slice_descriptor: Optional[Dict[str, Any]]
35-
state: Optional[List[Dict[str, Any]]] = None
36-
37-
3824
@dataclass
3925
class LogMessage:
4026
message: str
@@ -46,11 +32,27 @@ class LogMessage:
4632
@dataclass
4733
class AuxiliaryRequest:
4834
title: str
35+
type: str
4936
description: str
5037
request: HttpRequest
5138
response: HttpResponse
5239

5340

41+
@dataclass
42+
class StreamReadPages:
43+
records: List[object]
44+
request: Optional[HttpRequest] = None
45+
response: Optional[HttpResponse] = None
46+
47+
48+
@dataclass
49+
class StreamReadSlices:
50+
pages: List[StreamReadPages]
51+
slice_descriptor: Optional[Dict[str, Any]]
52+
state: Optional[List[Dict[str, Any]]] = None
53+
auxiliary_requests: Optional[List[AuxiliaryRequest]] = None
54+
55+
5456
@dataclass
5557
class StreamRead(object):
5658
logs: List[LogMessage]

airbyte_cdk/connector_builder/test_reader/helpers.py

Lines changed: 120 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
SchemaInferrer,
2929
)
3030

31-
from .types import LOG_MESSAGES_OUTPUT_TYPE
31+
from .types import ASYNC_AUXILIARY_REQUEST_TYPES, LOG_MESSAGES_OUTPUT_TYPE
3232

3333
# -------
3434
# Parsers
@@ -226,7 +226,8 @@ def should_close_page(
226226
at_least_one_page_in_group
227227
and is_log_message(message)
228228
and (
229-
is_page_http_request(json_message) or message.log.message.startswith("slice:") # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
229+
is_page_http_request(json_message)
230+
or message.log.message.startswith(SliceLogger.SLICE_LOG_PREFIX) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
230231
)
231232
)
232233

@@ -330,6 +331,10 @@ def is_auxiliary_http_request(message: Optional[Dict[str, Any]]) -> bool:
330331
return is_http_log(message) and message.get("http", {}).get("is_auxiliary", False)
331332

332333

334+
def is_async_auxiliary_request(message: AuxiliaryRequest) -> bool:
335+
return message.type in ASYNC_AUXILIARY_REQUEST_TYPES
336+
337+
333338
def is_log_message(message: AirbyteMessage) -> bool:
334339
"""
335340
Determines whether the provided message is of type LOG.
@@ -413,6 +418,7 @@ def handle_current_slice(
413418
current_slice_pages: List[StreamReadPages],
414419
current_slice_descriptor: Optional[Dict[str, Any]] = None,
415420
latest_state_message: Optional[Dict[str, Any]] = None,
421+
auxiliary_requests: Optional[List[AuxiliaryRequest]] = None,
416422
) -> StreamReadSlices:
417423
"""
418424
Handles the current slice by packaging its pages, descriptor, and state into a StreamReadSlices instance.
@@ -421,6 +427,7 @@ def handle_current_slice(
421427
current_slice_pages (List[StreamReadPages]): The pages to be included in the slice.
422428
current_slice_descriptor (Optional[Dict[str, Any]]): Descriptor for the current slice, optional.
423429
latest_state_message (Optional[Dict[str, Any]]): The latest state message, optional.
430+
auxiliary_requests (Optional[List[AuxiliaryRequest]]): The auxiliary requests to include, optional.
424431
425432
Returns:
426433
StreamReadSlices: An object containing the current slice's pages, descriptor, and state.
@@ -429,6 +436,7 @@ def handle_current_slice(
429436
pages=current_slice_pages,
430437
slice_descriptor=current_slice_descriptor,
431438
state=[latest_state_message] if latest_state_message else [],
439+
auxiliary_requests=auxiliary_requests if auxiliary_requests else [],
432440
)
433441

434442

@@ -486,29 +494,24 @@ def handle_auxiliary_request(json_message: Dict[str, JsonType]) -> AuxiliaryRequ
486494
Raises:
487495
ValueError: If any of the "airbyte_cdk", "stream", or "http" fields is not a dictionary.
488496
"""
489-
airbyte_cdk = json_message.get("airbyte_cdk", {})
490-
491-
if not isinstance(airbyte_cdk, dict):
492-
raise ValueError(
493-
f"Expected airbyte_cdk to be a dict, got {airbyte_cdk} of type {type(airbyte_cdk)}"
494-
)
495-
496-
stream = airbyte_cdk.get("stream", {})
497497

498-
if not isinstance(stream, dict):
499-
raise ValueError(f"Expected stream to be a dict, got {stream} of type {type(stream)}")
498+
airbyte_cdk = get_airbyte_cdk_from_message(json_message)
499+
stream = get_stream_from_airbyte_cdk(airbyte_cdk)
500+
title_prefix = get_auxiliary_request_title_prefix(stream)
501+
http = get_http_property_from_message(json_message)
502+
request_type = get_auxiliary_request_type(stream, http)
500503

501-
title_prefix = "Parent stream: " if stream.get("is_substream", False) else ""
502-
http = json_message.get("http", {})
503-
504-
if not isinstance(http, dict):
505-
raise ValueError(f"Expected http to be a dict, got {http} of type {type(http)}")
504+
title = title_prefix + str(http.get("title", None))
505+
description = str(http.get("description", None))
506+
request = create_request_from_log_message(json_message)
507+
response = create_response_from_log_message(json_message)
506508

507509
return AuxiliaryRequest(
508-
title=title_prefix + str(http.get("title", None)),
509-
description=str(http.get("description", None)),
510-
request=create_request_from_log_message(json_message),
511-
response=create_response_from_log_message(json_message),
510+
title=title,
511+
type=request_type,
512+
description=description,
513+
request=request,
514+
response=response,
512515
)
513516

514517

@@ -558,7 +561,8 @@ def handle_log_message(
558561
at_least_one_page_in_group,
559562
current_page_request,
560563
current_page_response,
561-
auxiliary_request or log_message,
564+
auxiliary_request,
565+
log_message,
562566
)
563567

564568

@@ -589,3 +593,97 @@ def handle_record_message(
589593
datetime_format_inferrer.accumulate(message.record) # type: ignore
590594

591595
return records_count
596+
597+
598+
# -------
599+
# Reusable Getters
600+
# -------
601+
602+
603+
def get_airbyte_cdk_from_message(json_message: Dict[str, JsonType]) -> dict: # type: ignore
604+
"""
605+
Retrieves the "airbyte_cdk" dictionary from the provided JSON message.
606+
607+
This function validates that the extracted "airbyte_cdk" is of type dict,
608+
raising a ValueError if the validation fails.
609+
610+
Parameters:
611+
json_message (Dict[str, JsonType]): A dictionary representing the JSON message.
612+
613+
Returns:
614+
dict: The "airbyte_cdk" dictionary extracted from the JSON message.
615+
616+
Raises:
617+
ValueError: If the "airbyte_cdk" field is not a dictionary.
618+
"""
619+
airbyte_cdk = json_message.get("airbyte_cdk", {})
620+
621+
if not isinstance(airbyte_cdk, dict):
622+
raise ValueError(
623+
f"Expected airbyte_cdk to be a dict, got {airbyte_cdk} of type {type(airbyte_cdk)}"
624+
)
625+
626+
return airbyte_cdk
627+
628+
629+
def get_stream_from_airbyte_cdk(airbyte_cdk: dict) -> dict: # type: ignore
630+
"""
631+
Retrieves the "stream" dictionary from the provided "airbyte_cdk" dictionary.
632+
633+
This function ensures that the extracted "stream" is of type dict,
634+
raising a ValueError if the validation fails.
635+
636+
Parameters:
637+
airbyte_cdk (dict): The dictionary representing the Airbyte CDK data.
638+
639+
Returns:
640+
dict: The "stream" dictionary extracted from the Airbyte CDK data.
641+
642+
Raises:
643+
ValueError: If the "stream" field is not a dictionary.
644+
"""
645+
646+
stream = airbyte_cdk.get("stream", {})
647+
648+
if not isinstance(stream, dict):
649+
raise ValueError(f"Expected stream to be a dict, got {stream} of type {type(stream)}")
650+
651+
return stream
652+
653+
654+
def get_auxiliary_request_title_prefix(stream: dict) -> str: # type: ignore
655+
"""
656+
Generates a title prefix based on the stream type.
657+
"""
658+
return "Parent stream: " if stream.get("is_substream", False) else ""
659+
660+
661+
def get_http_property_from_message(json_message: Dict[str, JsonType]) -> dict: # type: ignore
662+
"""
663+
Retrieves the "http" dictionary from the provided JSON message.
664+
665+
This function validates that the extracted "http" is of type dict,
666+
raising a ValueError if the validation fails.
667+
668+
Parameters:
669+
json_message (Dict[str, JsonType]): A dictionary representing the JSON message.
670+
671+
Returns:
672+
dict: The "http" dictionary extracted from the JSON message.
673+
674+
Raises:
675+
ValueError: If the "http" field is not a dictionary.
676+
"""
677+
http = json_message.get("http", {})
678+
679+
if not isinstance(http, dict):
680+
raise ValueError(f"Expected http to be a dict, got {http} of type {type(http)}")
681+
682+
return http
683+
684+
685+
def get_auxiliary_request_type(stream: dict, http: dict) -> str: # type: ignore
686+
"""
687+
Determines the type of the auxiliary request based on the stream and HTTP properties.
688+
"""
689+
return "PARENT_STREAM" if stream.get("is_substream", False) else str(http.get("type", None))

airbyte_cdk/connector_builder/test_reader/message_grouper.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Any, Dict, Iterator, List, Mapping, Optional
77

88
from airbyte_cdk.connector_builder.models import (
9+
AuxiliaryRequest,
910
HttpRequest,
1011
HttpResponse,
1112
StreamReadPages,
@@ -24,6 +25,7 @@
2425
handle_current_slice,
2526
handle_log_message,
2627
handle_record_message,
28+
is_async_auxiliary_request,
2729
is_config_update_message,
2830
is_log_message,
2931
is_record_message,
@@ -89,6 +91,7 @@ def get_message_groups(
8991
current_page_request: Optional[HttpRequest] = None
9092
current_page_response: Optional[HttpResponse] = None
9193
latest_state_message: Optional[Dict[str, Any]] = None
94+
slice_auxiliary_requests: List[AuxiliaryRequest] = []
9295

9396
while records_count < limit and (message := next(messages, None)):
9497
json_message = airbyte_message_to_json(message)
@@ -106,6 +109,7 @@ def get_message_groups(
106109
current_slice_pages,
107110
current_slice_descriptor,
108111
latest_state_message,
112+
slice_auxiliary_requests,
109113
)
110114
current_slice_descriptor = parse_slice_description(message.log.message) # type: ignore
111115
current_slice_pages = []
@@ -118,16 +122,24 @@ def get_message_groups(
118122
at_least_one_page_in_group,
119123
current_page_request,
120124
current_page_response,
121-
log_or_auxiliary_request,
125+
auxiliary_request,
126+
log_message,
122127
) = handle_log_message(
123128
message,
124129
json_message,
125130
at_least_one_page_in_group,
126131
current_page_request,
127132
current_page_response,
128133
)
129-
if log_or_auxiliary_request:
130-
yield log_or_auxiliary_request
134+
135+
if auxiliary_request:
136+
if is_async_auxiliary_request(auxiliary_request):
137+
slice_auxiliary_requests.append(auxiliary_request)
138+
else:
139+
yield auxiliary_request
140+
141+
if log_message:
142+
yield log_message
131143
elif is_trace_with_error(message):
132144
if message.trace is not None:
133145
yield message.trace
@@ -157,4 +169,5 @@ def get_message_groups(
157169
current_slice_pages,
158170
current_slice_descriptor,
159171
latest_state_message,
172+
slice_auxiliary_requests,
160173
)

airbyte_cdk/connector_builder/test_reader/types.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,13 @@
7171
bool,
7272
HttpRequest | None,
7373
HttpResponse | None,
74-
AuxiliaryRequest | AirbyteLogMessage | None,
74+
AuxiliaryRequest | None,
75+
AirbyteLogMessage | None,
76+
]
77+
78+
ASYNC_AUXILIARY_REQUEST_TYPES = [
79+
"ASYNC_CREATE",
80+
"ASYNC_POLL",
81+
"ASYNC_ABORT",
82+
"ASYNC_DELETE",
7583
]

airbyte_cdk/sources/declarative/auth/token_provider.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def _refresh(self) -> None:
5858
"Obtains session token",
5959
None,
6060
is_auxiliary=True,
61+
type="AUTH",
6162
),
6263
)
6364
if response is None:

0 commit comments

Comments
 (0)