Skip to content

Commit cd48741

Browse files
maxi297octavia-squidington-iii
andauthored
chore: incremental without partition router as defaultstream (#697)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 2ccff10 commit cd48741

File tree

2 files changed

+133
-117
lines changed

2 files changed

+133
-117
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,13 @@
9494
ClientSideIncrementalRecordFilterDecorator,
9595
)
9696
from airbyte_cdk.sources.declarative.incremental import (
97-
ChildPartitionResumableFullRefreshCursor,
9897
ConcurrentCursorFactory,
9998
ConcurrentPerPartitionCursor,
10099
CursorFactory,
101100
DatetimeBasedCursor,
102101
DeclarativeCursor,
103102
GlobalSubstreamCursor,
104-
PerPartitionCursor,
105103
PerPartitionWithGlobalCursor,
106-
ResumableFullRefreshCursor,
107104
)
108105
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
109106
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
@@ -446,10 +443,6 @@
446443
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
447444
ZipfileDecoder as ZipfileDecoderModel,
448445
)
449-
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
450-
COMPONENTS_MODULE_NAME,
451-
SDM_COMPONENTS_MODULE_NAME,
452-
)
453446
from airbyte_cdk.sources.declarative.partition_routers import (
454447
CartesianProductStreamSlicer,
455448
GroupingPartitionRouter,
@@ -508,7 +501,7 @@
508501
RequestOptionsProvider,
509502
)
510503
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
511-
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
504+
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
512505
from airbyte_cdk.sources.declarative.resolvers import (
513506
ComponentMappingDefinition,
514507
ConfigComponentsResolver,
@@ -617,6 +610,9 @@
617610
)
618611
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
619612
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
613+
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import (
614+
StreamSlicer as ConcurrentStreamSlicer,
615+
)
620616
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
621617
CustomFormatConcurrentStreamStateConverter,
622618
DateTimeStreamStateConverter,
@@ -1933,29 +1929,7 @@ def create_datetime_based_cursor(
19331929
def create_declarative_stream(
19341930
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
19351931
) -> Union[DeclarativeStream, AbstractStream]:
1936-
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
1937-
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
1938-
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
1939-
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
1940-
combined_slicers = self._merge_stream_slicers(model=model, config=config)
1941-
19421932
primary_key = model.primary_key.__root__ if model.primary_key else None
1943-
stop_condition_on_cursor = (
1944-
model.incremental_sync
1945-
and hasattr(model.incremental_sync, "is_data_feed")
1946-
and model.incremental_sync.is_data_feed
1947-
)
1948-
client_side_filtering_enabled = (
1949-
model.incremental_sync
1950-
and hasattr(model.incremental_sync, "is_client_side_incremental")
1951-
and model.incremental_sync.is_client_side_incremental
1952-
)
1953-
concurrent_cursor = None
1954-
if stop_condition_on_cursor or client_side_filtering_enabled:
1955-
stream_slicer = self._build_stream_slicer_from_partition_router(
1956-
model.retriever, config, stream_name=model.name
1957-
)
1958-
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)
19591933

19601934
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
19611935
cursor_model = model.incremental_sync
@@ -2023,16 +1997,27 @@ def create_declarative_stream(
20231997
model=model.file_uploader, config=config
20241998
)
20251999

2000+
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
2001+
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
2002+
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
2003+
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
2004+
combined_slicers = self._merge_stream_slicers(model=model, config=config)
2005+
partition_router = self._build_stream_slicer_from_partition_router(
2006+
model.retriever, config, stream_name=model.name
2007+
)
2008+
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
20262009
retriever = self._create_component_from_model(
20272010
model=model.retriever,
20282011
config=config,
20292012
name=model.name,
20302013
primary_key=primary_key,
20312014
stream_slicer=combined_slicers,
20322015
request_options_provider=request_options_provider,
2033-
stop_condition_cursor=concurrent_cursor,
2016+
stop_condition_cursor=concurrent_cursor
2017+
if self._is_stop_condition_on_cursor(model)
2018+
else None,
20342019
client_side_incremental_sync={"cursor": concurrent_cursor}
2035-
if client_side_filtering_enabled
2020+
if self._is_client_side_filtering_enabled(model)
20362021
else None,
20372022
transformations=transformations,
20382023
file_uploader=file_uploader,
@@ -2066,37 +2051,61 @@ def create_declarative_stream(
20662051
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20672052

20682053
if (
2069-
isinstance(combined_slicers, PartitionRouter)
2054+
(
2055+
isinstance(combined_slicers, PartitionRouter)
2056+
or isinstance(concurrent_cursor, ConcurrentCursor)
2057+
)
20702058
and not self._emit_connector_builder_messages
20712059
and not is_parent
20722060
):
20732061
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
20742062
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
20752063
# * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter
2076-
# * Streams without partition router but with cursor
2064+
# * Streams without partition router but with cursor. This is the `isinstance(concurrent_cursor, ConcurrentCursor)` condition
20772065
# * Streams with both partition router and cursor
20782066
# We specifically exclude parent streams here because SubstreamPartitionRouter has not been updated yet
20792067
# We specifically exclude Connector Builder stuff for now as Brian is working on this anyway
2068+
20802069
stream_name = model.name or ""
2070+
stream_slicer: ConcurrentStreamSlicer = (
2071+
concurrent_cursor if concurrent_cursor else SinglePartitionRouter(parameters={})
2072+
)
2073+
cursor: Cursor = FinalStateCursor(stream_name, None, self._message_repository)
2074+
if isinstance(retriever, AsyncRetriever):
2075+
# The AsyncRetriever only ever worked with a cursor from the concurrent package. Hence, the method
2076+
# `_build_incremental_cursor` which we would usually think would return only declarative stuff has a
2077+
# special clause and return a concurrent cursor. This stream slicer is passed to AsyncRetriever when
2078+
# built because the async retriever has a specific partition router which relies on this stream slicer.
2079+
# We can't re-use `concurrent_cursor` because it is a different instance than the one passed in
2080+
# AsyncJobPartitionRouter.
2081+
stream_slicer = retriever.stream_slicer
2082+
if isinstance(combined_slicers, Cursor):
2083+
cursor = combined_slicers
2084+
elif isinstance(combined_slicers, PartitionRouter):
2085+
stream_slicer = combined_slicers
2086+
elif concurrent_cursor:
2087+
cursor = concurrent_cursor
2088+
20812089
partition_generator = StreamSlicerPartitionGenerator(
20822090
DeclarativePartitionFactory(
20832091
stream_name,
20842092
schema_loader,
20852093
retriever,
20862094
self._message_repository,
20872095
),
2088-
stream_slicer=combined_slicers,
2096+
stream_slicer=stream_slicer,
20892097
)
20902098
return DefaultStream(
20912099
partition_generator=partition_generator,
20922100
name=stream_name,
20932101
json_schema=schema_loader.get_json_schema,
20942102
primary_key=get_primary_key_from_stream(primary_key),
2095-
cursor_field=None,
2096-
# FIXME we should have the cursor field has part of the interface of cursor
2103+
cursor_field=cursor.cursor_field.cursor_field_key
2104+
if hasattr(cursor, "cursor_field")
2105+
else "", # FIXME we should have the cursor field has part of the interface of cursor,
20972106
logger=logging.getLogger(f"airbyte.{stream_name}"),
2098-
# FIXME this is a breaking change compared to the old implementation,
2099-
cursor=FinalStateCursor(stream_name, None, self._message_repository),
2107+
# FIXME this is a breaking change compared to the old implementation which used the source name instead
2108+
cursor=cursor,
21002109
supports_file_transfer=hasattr(model, "file_uploader")
21012110
and bool(model.file_uploader),
21022111
)
@@ -2120,6 +2129,20 @@ def create_declarative_stream(
21202129
parameters=model.parameters or {},
21212130
)
21222131

2132+
def _is_stop_condition_on_cursor(self, model: DeclarativeStreamModel) -> bool:
2133+
return bool(
2134+
model.incremental_sync
2135+
and hasattr(model.incremental_sync, "is_data_feed")
2136+
and model.incremental_sync.is_data_feed
2137+
)
2138+
2139+
def _is_client_side_filtering_enabled(self, model: DeclarativeStreamModel) -> bool:
2140+
return bool(
2141+
model.incremental_sync
2142+
and hasattr(model.incremental_sync, "is_client_side_incremental")
2143+
and model.incremental_sync.is_client_side_incremental
2144+
)
2145+
21232146
def _build_stream_slicer_from_partition_router(
21242147
self,
21252148
model: Union[

0 commit comments

Comments
 (0)