Skip to content

Commit cd1bd1c

Browse files
authored
feat(Low-Code Concurrent CDK): Allow non-incremental substreams and list based partition router streams with parents to be processed by the concurrent cdk (#89)
1 parent 72202ee commit cd1bd1c

File tree

8 files changed

+226
-55
lines changed

8 files changed

+226
-55
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
5050
AlwaysAvailableAvailabilityStrategy,
5151
)
52+
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
5253
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
5354
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
5455
from airbyte_cdk.sources.types import Config, StreamState
@@ -69,6 +70,15 @@ def __init__(
6970
component_factory: Optional[ModelToComponentFactory] = None,
7071
**kwargs: Any,
7172
) -> None:
73+
# To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
74+
# cursors. We do this by no longer automatically instantiating RFR cursors when converting
75+
# the declarative models into runtime components. Concurrent sources will continue to checkpoint
76+
# incremental streams running in full refresh.
77+
component_factory = component_factory or ModelToComponentFactory(
78+
emit_connector_builder_messages=emit_connector_builder_messages,
79+
disable_resumable_full_refresh=True,
80+
)
81+
7282
super().__init__(
7383
source_config=source_config,
7484
debug=debug,
@@ -191,13 +201,24 @@ def _group_streams(
191201
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
192202
# so we need to treat them as synchronous
193203
if isinstance(declarative_stream, DeclarativeStream):
194-
datetime_based_cursor_component_definition = name_to_stream_mapping[
204+
incremental_sync_component_definition = name_to_stream_mapping[
195205
declarative_stream.name
196206
].get("incremental_sync")
197207

208+
partition_router_component_definition = (
209+
name_to_stream_mapping[declarative_stream.name]
210+
.get("retriever")
211+
.get("partition_router")
212+
)
213+
214+
is_substream_without_incremental = (
215+
partition_router_component_definition
216+
and not incremental_sync_component_definition
217+
)
218+
198219
if (
199-
datetime_based_cursor_component_definition
200-
and datetime_based_cursor_component_definition.get("type", "")
220+
incremental_sync_component_definition
221+
and incremental_sync_component_definition.get("type", "")
201222
== DatetimeBasedCursorModel.__name__
202223
and self._stream_supports_concurrent_partition_processing(
203224
declarative_stream=declarative_stream
@@ -213,7 +234,7 @@ def _group_streams(
213234
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
214235
state_manager=state_manager,
215236
model_type=DatetimeBasedCursorModel,
216-
component_definition=datetime_based_cursor_component_definition,
237+
component_definition=incremental_sync_component_definition,
217238
stream_name=declarative_stream.name,
218239
stream_namespace=declarative_stream.namespace,
219240
config=config or {},
@@ -247,6 +268,41 @@ def _group_streams(
247268
cursor=cursor,
248269
)
249270
)
271+
elif is_substream_without_incremental and hasattr(
272+
declarative_stream.retriever, "stream_slicer"
273+
):
274+
partition_generator = StreamSlicerPartitionGenerator(
275+
DeclarativePartitionFactory(
276+
declarative_stream.name,
277+
declarative_stream.get_json_schema(),
278+
self._retriever_factory(
279+
name_to_stream_mapping[declarative_stream.name],
280+
config,
281+
{},
282+
),
283+
self.message_repository,
284+
),
285+
declarative_stream.retriever.stream_slicer,
286+
)
287+
288+
final_state_cursor = FinalStateCursor(
289+
stream_name=declarative_stream.name,
290+
stream_namespace=declarative_stream.namespace,
291+
message_repository=self.message_repository,
292+
)
293+
294+
concurrent_streams.append(
295+
DefaultStream(
296+
partition_generator=partition_generator,
297+
name=declarative_stream.name,
298+
json_schema=declarative_stream.get_json_schema(),
299+
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
300+
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
301+
cursor_field=None,
302+
logger=self.logger,
303+
cursor=final_state_cursor,
304+
)
305+
)
250306
else:
251307
synchronous_streams.append(declarative_stream)
252308
else:

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ def __init__(
387387
emit_connector_builder_messages: bool = False,
388388
disable_retries: bool = False,
389389
disable_cache: bool = False,
390+
disable_resumable_full_refresh: bool = False,
390391
message_repository: Optional[MessageRepository] = None,
391392
):
392393
self._init_mappings()
@@ -395,6 +396,7 @@ def __init__(
395396
self._emit_connector_builder_messages = emit_connector_builder_messages
396397
self._disable_retries = disable_retries
397398
self._disable_cache = disable_cache
399+
self._disable_resumable_full_refresh = disable_resumable_full_refresh
398400
self._message_repository = message_repository or InMemoryMessageRepository( # type: ignore
399401
self._evaluate_log_level(emit_connector_builder_messages)
400402
)
@@ -1339,6 +1341,8 @@ def _merge_stream_slicers(
13391341
if model.incremental_sync
13401342
else None
13411343
)
1344+
elif self._disable_resumable_full_refresh:
1345+
return stream_slicer
13421346
elif stream_slicer:
13431347
# For the Full-Refresh sub-streams, we use the nested `ChildPartitionResumableFullRefreshCursor`
13441348
return PerPartitionCursor(

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def as_airbyte_stream(self) -> AirbyteStream:
6767
name=self.name,
6868
json_schema=dict(self._json_schema),
6969
supported_sync_modes=[SyncMode.full_refresh],
70+
is_resumable=False,
7071
)
7172

7273
if self._namespace:

unit_tests/sources/declarative/decoders/test_json_decoder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def test_jsonl_decoder(requests_mock, response_body, expected_json):
5454
def large_event_response_fixture():
5555
data = {"email": "[email protected]"}
5656
jsonl_string = f"{json.dumps(data)}\n"
57-
lines_in_response = 2_000_000 # ≈ 58 MB of response
57+
lines_in_response = 2 # ≈ 58 MB of response
5858
dir_path = os.path.dirname(os.path.realpath(__file__))
5959
file_path = f"{dir_path}/test_response.txt"
6060
with open(file_path, "w") as file:

0 commit comments

Comments
 (0)