Skip to content

Commit d326a26

Browse files
committed
Add concurrent PerPartitionCursor
1 parent bbf704e commit d326a26

File tree

7 files changed

+590
-0
lines changed

7 files changed

+590
-0
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#
44

55
import logging
6+
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import PerPartitionCursor
7+
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import PerPartitionWithGlobalCursor
68
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, Union, Callable
79

810
from airbyte_cdk.models import (
@@ -37,6 +39,7 @@
3739
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, Retriever
3840
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
3941
DeclarativePartitionFactory,
42+
DeclarativePartitionFactory1,
4043
StreamSlicerPartitionGenerator,
4144
)
4245
from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields
@@ -219,6 +222,61 @@ def _group_streams(
219222
)
220223
)
221224

225+
partition_generator = StreamSlicerPartitionGenerator(
226+
DeclarativePartitionFactory(
227+
declarative_stream.name,
228+
declarative_stream.get_json_schema(),
229+
self._retriever_factory(
230+
name_to_stream_mapping[declarative_stream.name],
231+
config,
232+
stream_state,
233+
),
234+
self.message_repository,
235+
),
236+
cursor,
237+
)
238+
239+
concurrent_streams.append(
240+
DefaultStream(
241+
partition_generator=partition_generator,
242+
name=declarative_stream.name,
243+
json_schema=declarative_stream.get_json_schema(),
244+
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
245+
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
246+
cursor_field=cursor.cursor_field.cursor_field_key,
247+
logger=self.logger,
248+
cursor=cursor,
249+
)
250+
)
251+
elif (
252+
datetime_based_cursor_component_definition
253+
and datetime_based_cursor_component_definition.get("type", "")
254+
== DatetimeBasedCursorModel.__name__
255+
and self._stream_supports_concurrent_partition_processing(
256+
declarative_stream=declarative_stream
257+
)
258+
and hasattr(declarative_stream.retriever, "stream_slicer")
259+
and isinstance(declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor)
260+
):
261+
stream_state = state_manager.get_stream_state(
262+
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
263+
)
264+
partition_router = declarative_stream.retriever.stream_slicer._partition_router
265+
266+
cursor, connector_state_converter = (
267+
self._constructor.create_concurrent_cursor_from_perpartition_cursor(
268+
state_manager=state_manager,
269+
model_type=DatetimeBasedCursorModel,
270+
component_definition=datetime_based_cursor_component_definition,
271+
stream_name=declarative_stream.name,
272+
stream_namespace=declarative_stream.namespace,
273+
config=config or {},
274+
stream_state=stream_state,
275+
partition_router=partition_router,
276+
)
277+
)
278+
279+
222280
partition_generator = StreamSlicerPartitionGenerator(
223281
DeclarativePartitionFactory(
224282
declarative_stream.name,

airbyte_cdk/sources/declarative/incremental/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ConcurrentCursorFactory, ConcurrentPerPartitionCursor
56
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
67
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
78
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor
@@ -14,6 +15,8 @@
1415

1516
__all__ = [
1617
"CursorFactory",
18+
"ConcurrentCursorFactory"
19+
"ConcurrentPerPartitionCursor",
1720
"DatetimeBasedCursor",
1821
"DeclarativeCursor",
1922
"GlobalSubstreamCursor",

0 commit comments

Comments
 (0)