Skip to content

Commit 11d3d0c

Browse files
author
Oleksandr Bazarnov
committed
yield slice cursor info for the TestRead
1 parent b3d46aa commit 11d3d0c

File tree

1 file changed

+29
-3
lines changed

1 file changed

+29
-3
lines changed

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

33

4-
from dataclasses import InitVar, dataclass
4+
from dataclasses import InitVar, dataclass, field
55
from typing import Any, Iterable, Mapping, Optional
66

77
from typing_extensions import deprecated
88

99
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
10-
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncPartition
1110
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
1211
from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
1312
AsyncJobPartitionRouter,
@@ -16,6 +15,7 @@
1615
from airbyte_cdk.sources.source import ExperimentalClassWarning
1716
from airbyte_cdk.sources.streams.core import StreamData
1817
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
18+
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger
1919

2020

2121
@deprecated(
@@ -28,6 +28,10 @@ class AsyncRetriever(Retriever):
2828
parameters: InitVar[Mapping[str, Any]]
2929
record_selector: RecordSelector
3030
stream_slicer: AsyncJobPartitionRouter
31+
slice_logger: AlwaysLogSliceLogger = field(
32+
init=False,
33+
default_factory=lambda: AlwaysLogSliceLogger(),
34+
)
3135

3236
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3337
self._parameters = parameters
@@ -74,14 +78,36 @@ def _validate_and_get_stream_slice_jobs(
7478
"""
7579
return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
7680

81+
def _get_cursor_slice_info(
82+
self,
83+
stream_slice: Optional[StreamSlice] = None,
84+
) -> Mapping[str, Any]:
85+
"""
86+
Retrieve the cursor slice information from a provided stream slice.
87+
88+
This method checks if a stream slice is provided and contains a 'cursor_slice' attribute.
89+
If present, it returns the value of 'cursor_slice' as a dictionary. Otherwise, it returns an empty dictionary.
90+
91+
Args:
92+
stream_slice (Optional[StreamSlice]): An optional stream slice object that may include a 'cursor_slice' attribute.
93+
94+
Returns:
95+
Mapping[str, Any]: A dictionary containing the cursor slice information if available, otherwise an empty dictionary.
96+
"""
97+
return stream_slice.cursor_slice if stream_slice and stream_slice.cursor_slice else {}
98+
7799
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
78-
return self.stream_slicer.stream_slices()
100+
yield from self.stream_slicer.stream_slices()
79101

80102
def read_records(
81103
self,
82104
records_schema: Mapping[str, Any],
83105
stream_slice: Optional[StreamSlice] = None,
84106
) -> Iterable[StreamData]:
107+
# emit the slice_descriptor log message, for connector builder TestRead
108+
yield self.slice_logger.create_slice_log_message(
109+
self._get_cursor_slice_info(stream_slice),
110+
)
85111
stream_state: StreamState = self._get_stream_state()
86112
jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
87113
records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)

0 commit comments

Comments
 (0)