Skip to content

Commit 2510d97

Browse files
committed
Merge remote-tracking branch 'origin/main' into artem1205/asyncretriever-split-decoders
# Conflicts: # airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
2 parents b36933a + d242f79 commit 2510d97

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+626
-311
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
1818
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
1919
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
20-
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
2120
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
21+
from airbyte_cdk.sources.types import Record
2222
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
2323
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
2424
from airbyte_cdk.utils import AirbyteTracedException
@@ -147,11 +147,11 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
147147
# AbstractStreams are expected to return data as they are expected.
148148
# Any transformation on the data should be done before reaching this point
149149
message = stream_data_to_airbyte_message(
150-
stream_name=record.partition.stream_name(),
150+
stream_name=record.stream_name,
151151
data_or_message=record.data,
152152
is_file_transfer_message=record.is_file_transfer_message,
153153
)
154-
stream = self._stream_name_to_instance[record.partition.stream_name()]
154+
stream = self._stream_name_to_instance[record.stream_name]
155155

156156
if message.type == MessageType.RECORD:
157157
if self._record_counter[stream.name] == 0:

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
1919
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
2020
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
21-
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
2221
from airbyte_cdk.sources.streams.concurrent.partitions.types import (
2322
PartitionCompleteSentinel,
2423
QueueItem,
2524
)
25+
from airbyte_cdk.sources.types import Record
2626
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
2727

2828

airbyte_cdk/sources/declarative/extractors/record_filter.py

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
import datetime
54
from dataclasses import InitVar, dataclass
65
from typing import Any, Iterable, Mapping, Optional, Union
76

@@ -11,7 +10,7 @@
1110
PerPartitionWithGlobalCursor,
1211
)
1312
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
14-
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
13+
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
1514

1615

1716
@dataclass
@@ -68,37 +67,21 @@ def __init__(
6867
self._date_time_based_cursor = date_time_based_cursor
6968
self._substream_cursor = substream_cursor
7069

71-
@property
72-
def _cursor_field(self) -> str:
73-
return self._date_time_based_cursor.cursor_field.eval(self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context
74-
75-
@property
76-
def _start_date_from_config(self) -> datetime.datetime:
77-
return self._date_time_based_cursor._start_datetime.get_datetime(
78-
self._date_time_based_cursor.config
79-
)
80-
81-
@property
82-
def _end_datetime(self) -> datetime.datetime:
83-
return self._date_time_based_cursor.select_best_end_datetime()
84-
8570
def filter_records(
8671
self,
8772
records: Iterable[Mapping[str, Any]],
8873
stream_state: StreamState,
8974
stream_slice: Optional[StreamSlice] = None,
9075
next_page_token: Optional[Mapping[str, Any]] = None,
9176
) -> Iterable[Mapping[str, Any]]:
92-
state_value = self._get_state_value(
93-
stream_state, stream_slice or StreamSlice(partition={}, cursor_slice={})
94-
)
95-
filter_date: datetime.datetime = self._get_filter_date(state_value)
9677
records = (
9778
record
9879
for record in records
99-
if self._end_datetime
100-
>= self._date_time_based_cursor.parse_date(record[self._cursor_field])
101-
>= filter_date
80+
if (self._substream_cursor or self._date_time_based_cursor).should_be_synced(
81+
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
82+
# Record stream name is empty cause it is not used durig the filtering
83+
Record(data=record, associated_slice=stream_slice, stream_name="")
84+
)
10285
)
10386
if self.condition:
10487
records = super().filter_records(
@@ -108,28 +91,3 @@ def filter_records(
10891
next_page_token=next_page_token,
10992
)
11093
yield from records
111-
112-
def _get_state_value(
113-
self, stream_state: StreamState, stream_slice: StreamSlice
114-
) -> Optional[str]:
115-
"""
116-
Return cursor_value or None in case it was not found.
117-
Cursor_value may be empty if:
118-
1. It is an initial sync => no stream_state exist at all.
119-
2. In Parent-child stream, and we already make initial sync, so stream_state is present.
120-
During the second read, we receive one extra record from parent and therefore no stream_state for this record will be found.
121-
122-
:param StreamState stream_state: State
123-
:param StreamSlice stream_slice: Current Stream slice
124-
:return Optional[str]: cursor_value in case it was found, otherwise None.
125-
"""
126-
state = (self._substream_cursor or self._date_time_based_cursor).select_state(stream_slice)
127-
128-
return state.get(self._cursor_field) if state else None
129-
130-
def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
131-
start_date_parsed = self._start_date_from_config
132-
if state_value:
133-
return max(start_date_parsed, self._date_time_based_cursor.parse_date(state_value))
134-
else:
135-
return start_date_parsed

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6-
from typing import Any, Iterable, List, Mapping, Optional
6+
from typing import Any, Iterable, List, Mapping, Optional, Union
77

88
import requests
99

1010
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
1111
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1212
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
13+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1314
from airbyte_cdk.sources.declarative.models import SchemaNormalization
1415
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1516
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
@@ -38,11 +39,34 @@ class RecordSelector(HttpSelector):
3839
config: Config
3940
parameters: InitVar[Mapping[str, Any]]
4041
schema_normalization: TypeTransformer
42+
name: str
43+
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
4144
record_filter: Optional[RecordFilter] = None
4245
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4346

4447
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4548
self._parameters = parameters
49+
self._name = (
50+
InterpolatedString(self._name, parameters=parameters)
51+
if isinstance(self._name, str)
52+
else self._name
53+
)
54+
55+
@property # type: ignore
56+
def name(self) -> str:
57+
"""
58+
:return: Stream name
59+
"""
60+
return (
61+
str(self._name.eval(self.config))
62+
if isinstance(self._name, InterpolatedString)
63+
else self._name
64+
)
65+
66+
@name.setter
67+
def name(self, value: str) -> None:
68+
if not isinstance(value, property):
69+
self._name = value
4670

4771
def select_records(
4872
self,
@@ -86,7 +110,7 @@ def filter_and_transform(
86110
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
87111
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
88112
for data in normalized_data:
89-
yield Record(data, stream_slice)
113+
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
90114

91115
def _normalize_by_schema(
92116
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
@@ -126,6 +150,9 @@ def _transform(
126150
for record in records:
127151
for transformation in self.transformations:
128152
transformation.transform(
129-
record, config=self.config, stream_state=stream_state, stream_slice=stream_slice
130-
) # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected
153+
record, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected
154+
config=self.config,
155+
stream_state=stream_state,
156+
stream_slice=stream_slice,
157+
)
131158
yield record

airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,11 @@ def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
340340
@staticmethod
341341
def _convert_record_to_cursor_record(record: Record) -> Record:
342342
return Record(
343-
record.data,
344-
StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice)
343+
data=record.data,
344+
stream_name=record.stream_name,
345+
associated_slice=StreamSlice(
346+
partition={}, cursor_slice=record.associated_slice.cursor_slice
347+
)
345348
if record.associated_slice
346349
else None,
347350
)

airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,8 +325,11 @@ def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
325325
@staticmethod
326326
def _convert_record_to_cursor_record(record: Record) -> Record:
327327
return Record(
328-
record.data,
329-
StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice)
328+
data=record.data,
329+
stream_name=record.stream_name,
330+
associated_slice=StreamSlice(
331+
partition={}, cursor_slice=record.associated_slice.cursor_slice
332+
)
330333
if record.associated_slice
331334
else None,
332335
)

airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,7 @@ def get_request_body_json(
194194
)
195195

196196
def should_be_synced(self, record: Record) -> bool:
197-
return self._global_cursor.should_be_synced(
198-
record
199-
) or self._per_partition_cursor.should_be_synced(record)
197+
return self._get_active_cursor().should_be_synced(record)
200198

201199
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
202200
return self._global_cursor.is_greater_than_or_equal(first, second)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,6 +1792,7 @@ def create_record_selector(
17921792
self,
17931793
model: RecordSelectorModel,
17941794
config: Config,
1795+
name: str,
17951796
*,
17961797
transformations: List[RecordTransformation],
17971798
decoder: Optional[Decoder] = None,
@@ -1822,6 +1823,7 @@ def create_record_selector(
18221823

18231824
return RecordSelector(
18241825
extractor=extractor,
1826+
name=name,
18251827
config=config,
18261828
record_filter=record_filter,
18271829
transformations=transformations,
@@ -1892,6 +1894,7 @@ def create_simple_retriever(
18921894
)
18931895
record_selector = self._create_component_from_model(
18941896
model=model.record_selector,
1897+
name=name,
18951898
config=config,
18961899
decoder=decoder,
18971900
transformations=transformations,
@@ -2063,6 +2066,7 @@ def create_async_retriever(
20632066
requester=download_requester,
20642067
record_selector=RecordSelector(
20652068
extractor=download_extractor,
2069+
name=name,
20662070
record_filter=None,
20672071
transformations=[],
20682072
schema_normalization=TypeTransformer(TransformConfig.NoTransform),

airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
1212
PaginationStrategy,
1313
)
14+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
1415
from airbyte_cdk.sources.types import Record
1516

1617

@@ -26,7 +27,11 @@ def is_met(self, record: Record) -> bool:
2627

2728

2829
class CursorStopCondition(PaginationStopCondition):
29-
def __init__(self, cursor: DeclarativeCursor):
30+
def __init__(
31+
self,
32+
cursor: DeclarativeCursor
33+
| ConcurrentCursor, # migrate to use both old and concurrent versions
34+
):
3035
self._cursor = cursor
3136

3237
def is_met(self, record: Record) -> bool:
@@ -47,8 +52,8 @@ def next_page_token(
4752
return None
4853
return self._delegate.next_page_token(response, last_page_size, last_record)
4954

50-
def reset(self) -> None:
51-
self._delegate.reset()
55+
def reset(self, reset_value: Optional[Any] = None) -> None:
56+
self._delegate.reset(reset_value)
5257

5358
def get_page_size(self) -> Optional[int]:
5459
return self._delegate.get_page_size()

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,9 @@ def _get_most_recent_record(
468468
else:
469469
return None
470470

471-
@staticmethod
472-
def _extract_record(stream_data: StreamData, stream_slice: StreamSlice) -> Optional[Record]:
471+
def _extract_record(
472+
self, stream_data: StreamData, stream_slice: StreamSlice
473+
) -> Optional[Record]:
473474
"""
474475
As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
475476
to data to streamline the rest of the process.
@@ -478,9 +479,15 @@ def _extract_record(stream_data: StreamData, stream_slice: StreamSlice) -> Optio
478479
# Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
479480
return stream_data
480481
elif isinstance(stream_data, (dict, Mapping)):
481-
return Record(dict(stream_data), stream_slice)
482+
return Record(
483+
data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
484+
)
482485
elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
483-
return Record(stream_data.record.data, stream_slice)
486+
return Record(
487+
data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data
488+
associated_slice=stream_slice,
489+
stream_name=self.name,
490+
)
484491
return None
485492

486493
# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever

0 commit comments

Comments
 (0)