Skip to content

Commit a3304b9

Browse files
committed
Use request options provider for ConcurrentPerPartitionCursor
1 parent 37efbae commit a3304b9

File tree

5 files changed

+40
-207
lines changed

5 files changed

+40
-207
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
#
44

55
import logging
6-
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import PerPartitionCursor
7-
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import PerPartitionWithGlobalCursor
86
from typing import Any, Callable, Generic, Iterator, List, Mapping, Optional, Tuple, Union
97

108
from airbyte_cdk.models import (
@@ -22,6 +20,9 @@
2220
ClientSideIncrementalRecordFilterDecorator,
2321
)
2422
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
23+
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
24+
PerPartitionWithGlobalCursor,
25+
)
2526
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
2627
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
2728
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
@@ -306,8 +307,8 @@ def _group_streams(
306307
)
307308
)
308309
elif (
309-
datetime_based_cursor_component_definition
310-
and datetime_based_cursor_component_definition.get("type", "")
310+
incremental_sync_component_definition
311+
and incremental_sync_component_definition.get("type", "")
311312
== DatetimeBasedCursorModel.__name__
312313
and self._stream_supports_concurrent_partition_processing(
313314
declarative_stream=declarative_stream
@@ -320,18 +321,16 @@ def _group_streams(
320321
)
321322
partition_router = declarative_stream.retriever.stream_slicer._partition_router
322323

323-
cursor, connector_state_converter = (
324-
self._constructor.create_concurrent_cursor_from_perpartition_cursor(
324+
cursor = self._constructor.create_concurrent_cursor_from_perpartition_cursor(
325325
state_manager=state_manager,
326326
model_type=DatetimeBasedCursorModel,
327-
component_definition=datetime_based_cursor_component_definition,
327+
component_definition=incremental_sync_component_definition,
328328
stream_name=declarative_stream.name,
329329
stream_namespace=declarative_stream.namespace,
330330
config=config or {},
331331
stream_state=stream_state,
332332
partition_router=partition_router,
333333
)
334-
)
335334

336335

337336
partition_generator = StreamSlicerPartitionGenerator(

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,31 @@
11
import copy
22

3-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
4-
53
#
64
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
75
#
8-
96
import logging
107
from collections import OrderedDict
11-
from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional, Union
8+
from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional
129

10+
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1311
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
1412
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
13+
from airbyte_cdk.sources.message import MessageRepository
1514
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import (
1615
PerPartitionKeySerializer,
1716
)
18-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField, CursorValueType, GapType
17+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
1918
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
2019
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
21-
import functools
22-
from abc import ABC, abstractmethod
23-
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple
24-
25-
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
26-
from airbyte_cdk.sources.message import MessageRepository
27-
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
28-
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
29-
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
30-
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
31-
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
32-
AbstractStreamStateConverter,
33-
)
34-
from airbyte_cdk.sources.types import StreamSlice
3520

3621
logger = logging.getLogger("airbyte")
3722

3823

3924
class ConcurrentCursorFactory:
40-
def __init__(self, create_function: Callable[[Mapping[str, Any]], DeclarativeCursor]):
25+
def __init__(self, create_function: Callable[..., tuple[Cursor, ...]]):
4126
self._create_function = create_function
4227

43-
def create(self, stream_state: Mapping[str, Any]) -> DeclarativeCursor:
28+
def create(self, stream_state: Mapping[str, Any]) -> Cursor:
4429
return self._create_function(stream_state=stream_state)[0]
4530

4631

@@ -78,52 +63,36 @@ def __init__(
7863
stream_state: Any,
7964
message_repository: MessageRepository,
8065
connector_state_manager: ConnectorStateManager,
81-
connector_state_converter: AbstractStreamStateConverter,
8266
cursor_field: CursorField,
83-
slice_boundary_fields: Optional[Tuple[str, str]],
84-
start: Optional[CursorValueType],
85-
end_provider: Callable[[], CursorValueType],
86-
lookback_window: Optional[GapType] = None,
87-
slice_range: Optional[GapType] = None,
88-
cursor_granularity: Optional[GapType] = None,
8967
) -> None:
9068
self._stream_name = stream_name
9169
self._stream_namespace = stream_namespace
9270
self._message_repository = message_repository
93-
self._connector_state_converter = connector_state_converter
9471
self._connector_state_manager = connector_state_manager
9572
self._cursor_field = cursor_field
96-
# To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
97-
self._slice_boundary_fields = slice_boundary_fields
98-
self._start = start
99-
self._end_provider = end_provider
100-
self._conccurent_state = stream_state
101-
# self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
102-
self._lookback_window = lookback_window
103-
self._slice_range = slice_range
104-
self._most_recent_cursor_value_per_partition: MutableMapping[Partition, Any] = {}
105-
self._has_closed_at_least_one_slice = False
106-
self._cursor_granularity = cursor_granularity
10773

10874
self._cursor_factory = cursor_factory
10975
self._partition_router = partition_router
76+
11077
# The dict is ordered to ensure that once the maximum number of partitions is reached,
11178
# the oldest partitions can be efficiently removed, maintaining the most recent partitions.
11279
self._cursor_per_partition: OrderedDict[str, Cursor] = OrderedDict()
11380
self._over_limit = 0
114-
self._state = {}
11581
self._partition_serializer = PerPartitionKeySerializer()
11682

11783
self._set_initial_state(stream_state)
11884

85+
@property
86+
def cursor_field(self) -> CursorField:
87+
return self._cursor_field
88+
11989
@property
12090
def state(self) -> MutableMapping[str, Any]:
12191
states = []
12292
for partition_tuple, cursor in self._cursor_per_partition.items():
12393
cursor_state = cursor._connector_state_converter.convert_to_state_message(
12494
cursor._cursor_field, cursor.state
12595
)
126-
# print(cursor_state, cursor.state)
12796
if cursor_state:
12897
states.append(
12998
{
@@ -132,13 +101,8 @@ def state(self) -> MutableMapping[str, Any]:
132101
}
133102
)
134103
state: dict[str, Any] = {"states": states}
135-
# print(state)
136104
return state
137105

138-
@property
139-
def cursor_field(self) -> CursorField:
140-
return self._cursor_field
141-
142106
def close_partition(self, partition: Partition) -> None:
143107
self._cursor_per_partition[self._to_partition_key(partition._stream_slice.partition)].close_partition_without_emit(partition=partition)
144108

@@ -253,8 +217,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
253217
self._partition_router.set_initial_state(stream_state)
254218

255219
def observe(self, record: Record) -> None:
256-
print(f"ESTATE: {self._to_partition_key(record.partition._stream_slice.partition)}: {record.data[self.cursor_field.cursor_field_key]}")
257-
self._cursor_per_partition[self._to_partition_key(record.partition._stream_slice.partition)].observe(record)
220+
self._cursor_per_partition[self._to_partition_key(record.associated_slice.partition)].observe(record)
258221

259222
def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
260223
return self._partition_serializer.to_partition_key(partition)
@@ -267,9 +230,7 @@ def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor:
267230
return cursor
268231

269232
def should_be_synced(self, record: Record) -> bool:
270-
return self._get_cursor(record).should_be_synced(
271-
self._convert_record_to_cursor_record(record)
272-
)
233+
return self._get_cursor(record).should_be_synced(record)
273234

274235
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
275236
if not first.associated_slice or not second.associated_slice:
@@ -295,7 +256,7 @@ def _convert_record_to_cursor_record(record: Record) -> Record:
295256
else None,
296257
)
297258

298-
def _get_cursor(self, record: Record) -> DeclarativeCursor:
259+
def _get_cursor(self, record: Record) -> Cursor:
299260
if not record.associated_slice:
300261
raise ValueError(
301262
"Invalid state as stream slices that are emitted should refer to an existing cursor"

airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -244,15 +244,6 @@ def get_request_headers(
244244
next_page_token: Optional[Mapping[str, Any]] = None,
245245
) -> Mapping[str, Any]:
246246
if stream_slice:
247-
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
248-
partition_state = (
249-
self._state_to_migrate_from
250-
if self._state_to_migrate_from
251-
else self._NO_CURSOR_STATE
252-
)
253-
cursor = self._create_cursor(partition_state)
254-
255-
self._cursor_per_partition[self._to_partition_key(stream_slice.partition)] = cursor
256247
return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping
257248
stream_state=stream_state,
258249
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),

0 commit comments

Comments
 (0)