28
28
SchemaInferrer ,
29
29
)
30
30
31
- from .types import LOG_MESSAGES_OUTPUT_TYPE
31
+ from .types import ASYNC_AUXILIARY_REQUEST_TYPES , LOG_MESSAGES_OUTPUT_TYPE
32
32
33
33
# -------
34
34
# Parsers
@@ -226,7 +226,8 @@ def should_close_page(
226
226
at_least_one_page_in_group
227
227
and is_log_message (message )
228
228
and (
229
- is_page_http_request (json_message ) or message .log .message .startswith ("slice:" ) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
229
+ is_page_http_request (json_message )
230
+ or message .log .message .startswith (SliceLogger .SLICE_LOG_PREFIX ) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
230
231
)
231
232
)
232
233
@@ -330,6 +331,10 @@ def is_auxiliary_http_request(message: Optional[Dict[str, Any]]) -> bool:
330
331
return is_http_log (message ) and message .get ("http" , {}).get ("is_auxiliary" , False )
331
332
332
333
334
+ def is_async_auxiliary_request (message : AuxiliaryRequest ) -> bool :
335
+ return message .type in ASYNC_AUXILIARY_REQUEST_TYPES
336
+
337
+
333
338
def is_log_message (message : AirbyteMessage ) -> bool :
334
339
"""
335
340
Determines whether the provided message is of type LOG.
@@ -413,6 +418,7 @@ def handle_current_slice(
413
418
current_slice_pages : List [StreamReadPages ],
414
419
current_slice_descriptor : Optional [Dict [str , Any ]] = None ,
415
420
latest_state_message : Optional [Dict [str , Any ]] = None ,
421
+ auxiliary_requests : Optional [List [AuxiliaryRequest ]] = None ,
416
422
) -> StreamReadSlices :
417
423
"""
418
424
Handles the current slice by packaging its pages, descriptor, and state into a StreamReadSlices instance.
@@ -421,6 +427,7 @@ def handle_current_slice(
421
427
current_slice_pages (List[StreamReadPages]): The pages to be included in the slice.
422
428
current_slice_descriptor (Optional[Dict[str, Any]]): Descriptor for the current slice, optional.
423
429
latest_state_message (Optional[Dict[str, Any]]): The latest state message, optional.
430
+ auxiliary_requests (Optional[List[AuxiliaryRequest]]): The auxiliary requests to include, optional.
424
431
425
432
Returns:
426
433
StreamReadSlices: An object containing the current slice's pages, descriptor, and state.
@@ -429,6 +436,7 @@ def handle_current_slice(
429
436
pages = current_slice_pages ,
430
437
slice_descriptor = current_slice_descriptor ,
431
438
state = [latest_state_message ] if latest_state_message else [],
439
+ auxiliary_requests = auxiliary_requests if auxiliary_requests else [],
432
440
)
433
441
434
442
@@ -486,29 +494,24 @@ def handle_auxiliary_request(json_message: Dict[str, JsonType]) -> AuxiliaryRequ
486
494
Raises:
487
495
ValueError: If any of the "airbyte_cdk", "stream", or "http" fields is not a dictionary.
488
496
"""
489
- airbyte_cdk = json_message .get ("airbyte_cdk" , {})
490
-
491
- if not isinstance (airbyte_cdk , dict ):
492
- raise ValueError (
493
- f"Expected airbyte_cdk to be a dict, got { airbyte_cdk } of type { type (airbyte_cdk )} "
494
- )
495
-
496
- stream = airbyte_cdk .get ("stream" , {})
497
497
498
- if not isinstance (stream , dict ):
499
- raise ValueError (f"Expected stream to be a dict, got { stream } of type { type (stream )} " )
498
+ airbyte_cdk = get_airbyte_cdk_from_message (json_message )
499
+ stream = get_stream_from_airbyte_cdk (airbyte_cdk )
500
+ title_prefix = get_auxiliary_request_title_prefix (stream )
501
+ http = get_http_property_from_message (json_message )
502
+ request_type = get_auxiliary_request_type (stream , http )
500
503
501
- title_prefix = "Parent stream: " if stream .get ("is_substream" , False ) else ""
502
- http = json_message .get ("http" , {})
503
-
504
- if not isinstance (http , dict ):
505
- raise ValueError (f"Expected http to be a dict, got { http } of type { type (http )} " )
504
+ title = title_prefix + str (http .get ("title" , None ))
505
+ description = str (http .get ("description" , None ))
506
+ request = create_request_from_log_message (json_message )
507
+ response = create_response_from_log_message (json_message )
506
508
507
509
return AuxiliaryRequest (
508
- title = title_prefix + str (http .get ("title" , None )),
509
- description = str (http .get ("description" , None )),
510
- request = create_request_from_log_message (json_message ),
511
- response = create_response_from_log_message (json_message ),
510
+ title = title ,
511
+ type = request_type ,
512
+ description = description ,
513
+ request = request ,
514
+ response = response ,
512
515
)
513
516
514
517
@@ -558,7 +561,8 @@ def handle_log_message(
558
561
at_least_one_page_in_group ,
559
562
current_page_request ,
560
563
current_page_response ,
561
- auxiliary_request or log_message ,
564
+ auxiliary_request ,
565
+ log_message ,
562
566
)
563
567
564
568
@@ -589,3 +593,97 @@ def handle_record_message(
589
593
datetime_format_inferrer .accumulate (message .record ) # type: ignore
590
594
591
595
return records_count
596
+
597
+
598
+ # -------
599
+ # Reusable Getters
600
+ # -------
601
+
602
+
603
+ def get_airbyte_cdk_from_message (json_message : Dict [str , JsonType ]) -> dict : # type: ignore
604
+ """
605
+ Retrieves the "airbyte_cdk" dictionary from the provided JSON message.
606
+
607
+ This function validates that the extracted "airbyte_cdk" is of type dict,
608
+ raising a ValueError if the validation fails.
609
+
610
+ Parameters:
611
+ json_message (Dict[str, JsonType]): A dictionary representing the JSON message.
612
+
613
+ Returns:
614
+ dict: The "airbyte_cdk" dictionary extracted from the JSON message.
615
+
616
+ Raises:
617
+ ValueError: If the "airbyte_cdk" field is not a dictionary.
618
+ """
619
+ airbyte_cdk = json_message .get ("airbyte_cdk" , {})
620
+
621
+ if not isinstance (airbyte_cdk , dict ):
622
+ raise ValueError (
623
+ f"Expected airbyte_cdk to be a dict, got { airbyte_cdk } of type { type (airbyte_cdk )} "
624
+ )
625
+
626
+ return airbyte_cdk
627
+
628
+
629
+ def get_stream_from_airbyte_cdk (airbyte_cdk : dict ) -> dict : # type: ignore
630
+ """
631
+ Retrieves the "stream" dictionary from the provided "airbyte_cdk" dictionary.
632
+
633
+ This function ensures that the extracted "stream" is of type dict,
634
+ raising a ValueError if the validation fails.
635
+
636
+ Parameters:
637
+ airbyte_cdk (dict): The dictionary representing the Airbyte CDK data.
638
+
639
+ Returns:
640
+ dict: The "stream" dictionary extracted from the Airbyte CDK data.
641
+
642
+ Raises:
643
+ ValueError: If the "stream" field is not a dictionary.
644
+ """
645
+
646
+ stream = airbyte_cdk .get ("stream" , {})
647
+
648
+ if not isinstance (stream , dict ):
649
+ raise ValueError (f"Expected stream to be a dict, got { stream } of type { type (stream )} " )
650
+
651
+ return stream
652
+
653
+
654
+ def get_auxiliary_request_title_prefix (stream : dict ) -> str : # type: ignore
655
+ """
656
+ Generates a title prefix based on the stream type.
657
+ """
658
+ return "Parent stream: " if stream .get ("is_substream" , False ) else ""
659
+
660
+
661
+ def get_http_property_from_message (json_message : Dict [str , JsonType ]) -> dict : # type: ignore
662
+ """
663
+ Retrieves the "http" dictionary from the provided JSON message.
664
+
665
+ This function validates that the extracted "http" is of type dict,
666
+ raising a ValueError if the validation fails.
667
+
668
+ Parameters:
669
+ json_message (Dict[str, JsonType]): A dictionary representing the JSON message.
670
+
671
+ Returns:
672
+ dict: The "http" dictionary extracted from the JSON message.
673
+
674
+ Raises:
675
+ ValueError: If the "http" field is not a dictionary.
676
+ """
677
+ http = json_message .get ("http" , {})
678
+
679
+ if not isinstance (http , dict ):
680
+ raise ValueError (f"Expected http to be a dict, got { http } of type { type (http )} " )
681
+
682
+ return http
683
+
684
+
685
+ def get_auxiliary_request_type (stream : dict , http : dict ) -> str : # type: ignore
686
+ """
687
+ Determines the type of the auxiliary request based on the stream and HTTP properties.
688
+ """
689
+ return "PARENT_STREAM" if stream .get ("is_substream" , False ) else str (http .get ("type" , None ))
0 commit comments