Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 3 additions & 26 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,34 +468,11 @@ def _is_concurrent_cursor_incremental_without_partition_routing(
def _get_retriever(
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
) -> Retriever:
retriever = declarative_stream.retriever

# This is an optimization so that we don't invoke any cursor or state management flows within the
# low-code framework because state management is handled through the ConcurrentCursor.
if declarative_stream and isinstance(retriever, SimpleRetriever):
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
# properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)

# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
# for semi-incremental streams using is_client_side_incremental to filter properly
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
):
retriever.record_selector.record_filter._cursor.set_initial_state(
stream_state=stream_state
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds

if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever):
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None

return retriever
declarative_stream.retriever.cursor = None
return declarative_stream.retriever

@staticmethod
def _select_streams(
Expand Down
12 changes: 4 additions & 8 deletions airbyte_cdk/sources/declarative/extractors/record_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.incremental import (
DatetimeBasedCursor,
GlobalSubstreamCursor,
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState


Expand Down Expand Up @@ -53,13 +49,13 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
"""
Applies a filter to a list of records to exclude those that are older than the stream_state/start_date.

:param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values
:param Cursor cursor: Cursor used to filter out values
:param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state
"""

def __init__(
self,
cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor],
cursor: Union[Cursor],
**kwargs: Any,
):
super().__init__(**kwargs)
Expand All @@ -77,7 +73,7 @@ def filter_records(
for record in records
if self._cursor.should_be_synced(
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
# Record stream name is empty cause it is not used durig the filtering
# Record stream name is empty because it is not used during the filtering
Record(data=record, associated_slice=stream_slice, stream_name="")
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
connector_state_converter: AbstractStreamStateConverter,
cursor_field: CursorField,
use_global_cursor: bool = False,
attempt_to_create_cursor_if_not_provided: bool = False,
) -> None:
self._global_cursor: Optional[StreamState] = {}
self._stream_name = stream_name
Expand Down Expand Up @@ -125,6 +126,9 @@ def __init__(

self._set_initial_state(stream_state)

# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided

@property
def cursor_field(self) -> CursorField:
return self._cursor_field
Expand Down Expand Up @@ -512,13 +516,28 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)

if self._use_global_cursor:
return self._create_cursor(
self._global_cursor,
self._lookback_window if self._global_cursor else 0,
)

partition_key = self._to_partition_key(record.associated_slice.partition)
if partition_key not in self._cursor_per_partition:
if (
partition_key not in self._cursor_per_partition
and not self._attempt_to_create_cursor_if_not_provided
):
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)
cursor = self._cursor_per_partition[partition_key]
return cursor
elif partition_key not in self._cursor_per_partition:
return self._create_cursor(
self._global_cursor,
self._lookback_window if self._global_cursor else 0,
)
else:
return self._cursor_per_partition[partition_key]

def limit_reached(self) -> bool:
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
108 changes: 80 additions & 28 deletions airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
)
from airbyte_cdk.models import FailureType, Level
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative import transformations
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
Expand Down Expand Up @@ -604,7 +603,7 @@
WeekClampingStrategy,
Weekday,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
CustomFormatConcurrentStreamStateConverter,
DateTimeStreamStateConverter,
Expand Down Expand Up @@ -1475,6 +1474,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
stream_namespace: Optional[str],
config: Config,
message_repository: Optional[MessageRepository] = None,
stream_state_migrations: Optional[List[Any]] = None,
**kwargs: Any,
) -> ConcurrentCursor:
# Per-partition incremental streams can dynamically create child cursors which will pass their current
Expand All @@ -1485,6 +1485,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
if "stream_state" not in kwargs
else kwargs["stream_state"]
)
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)

component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
Expand Down Expand Up @@ -1561,6 +1562,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
stream_state: MutableMapping[str, Any],
partition_router: PartitionRouter,
stream_state_migrations: Optional[List[Any]] = None,
attempt_to_create_cursor_if_not_provided: bool = False,
**kwargs: Any,
) -> ConcurrentPerPartitionCursor:
component_type = component_definition.get("type")
Expand Down Expand Up @@ -1631,6 +1633,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
use_global_cursor=use_global_cursor,
attempt_to_create_cursor_if_not_provided=attempt_to_create_cursor_if_not_provided,
)

@staticmethod
Expand Down Expand Up @@ -1931,30 +1934,17 @@ def create_declarative_stream(
and hasattr(model.incremental_sync, "is_data_feed")
and model.incremental_sync.is_data_feed
)
client_side_incremental_sync = None
if (
client_side_filtering_enabled = (
model.incremental_sync
and hasattr(model.incremental_sync, "is_client_side_incremental")
and model.incremental_sync.is_client_side_incremental
):
supported_slicers = (
DatetimeBasedCursor,
GlobalSubstreamCursor,
PerPartitionWithGlobalCursor,
)
if combined_slicers and not isinstance(combined_slicers, supported_slicers):
raise ValueError(
"Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead"
)
cursor = (
combined_slicers
if isinstance(
combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor)
)
else self._create_component_from_model(model=model.incremental_sync, config=config)
)
concurrent_cursor = None
if stop_condition_on_cursor or client_side_filtering_enabled:
stream_slicer = self._build_stream_slicer_from_partition_router(
model.retriever, config, stream_name=model.name
)

client_side_incremental_sync = {"cursor": cursor}
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)

if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
cursor_model = model.incremental_sync
Expand Down Expand Up @@ -2029,8 +2019,10 @@ def create_declarative_stream(
primary_key=primary_key,
stream_slicer=combined_slicers,
request_options_provider=request_options_provider,
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
stop_condition_cursor=concurrent_cursor,
client_side_incremental_sync={"cursor": concurrent_cursor}
if client_side_filtering_enabled
else None,
transformations=transformations,
file_uploader=file_uploader,
incremental_sync=model.incremental_sync,
Expand Down Expand Up @@ -2185,6 +2177,67 @@ def _build_incremental_cursor(
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
return None

def _build_concurrent_cursor(
self,
model: DeclarativeStreamModel,
stream_slicer: Optional[PartitionRouter],
config: Config,
) -> Optional[StreamSlicer]:
stream_state = self._connector_state_manager.get_stream_state(
stream_name=model.name or "", namespace=None
)

if model.incremental_sync and stream_slicer:
# FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for
# ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor
if model.state_migrations:
state_transformations = [
self._create_component_from_model(
state_migration, config, declarative_stream=model
)
for state_migration in model.state_migrations
]
else:
state_transformations = []

return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state=stream_state,
stream_state_migrations=state_transformations,
partition_router=stream_slicer,
attempt_to_create_cursor_if_not_provided=True,
)
elif model.incremental_sync:
if type(model.incremental_sync) == IncrementingCountCursorModel:
return self.create_concurrent_cursor_from_incrementing_count_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=IncrementingCountCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state_migrations=model.state_migrations,
)
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=type(model.incremental_sync),
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state_migrations=model.state_migrations,
attempt_to_create_cursor_if_not_provided=True,
)
else:
raise ValueError(
f"Incremental sync of type {type(model.incremental_sync)} is not supported"
)
return None

def _build_resumable_cursor(
self,
model: Union[
Expand Down Expand Up @@ -2285,7 +2338,7 @@ def create_default_paginator(
url_base: str,
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
decoder: Optional[Decoder] = None,
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
cursor_used_for_stop_condition: Optional[Cursor] = None,
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
if decoder:
if self._is_supported_decoder_for_pagination(decoder):
Expand Down Expand Up @@ -3146,7 +3199,7 @@ def create_simple_retriever(
primary_key: Optional[Union[str, List[str], List[List[str]]]],
stream_slicer: Optional[StreamSlicer],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_on_cursor: bool = False,
stop_condition_cursor: Optional[Cursor] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
file_uploader: Optional[DefaultFileUploader] = None,
Expand Down Expand Up @@ -3277,15 +3330,14 @@ def _get_url() -> str:
),
)

cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
paginator = (
self._create_component_from_model(
model=model.paginator,
config=config,
url_base=_get_url(),
extractor_model=model.record_selector.extractor,
decoder=decoder,
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
cursor_used_for_stop_condition=stop_condition_cursor or None,
)
if model.paginator
else NoPagination(parameters={})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

import requests

from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
PaginationStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.types import Record


Expand All @@ -29,8 +28,7 @@ def is_met(self, record: Record) -> bool:
class CursorStopCondition(PaginationStopCondition):
def __init__(
self,
cursor: DeclarativeCursor
| ConcurrentCursor, # migrate to use both old and concurrent versions
cursor: Cursor,
):
self._cursor = cursor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,6 @@ def set_initial_state(self, value: StreamState) -> None:

def ensure_at_least_one_state_emitted(self) -> None:
self.emit_state_message()

def should_be_synced(self, record: Record) -> bool:
return True
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ def ensure_at_least_one_state_emitted(self) -> None:
self._stream_name, self._stream_namespace
)
self._message_repository.emit_message(state_message)

def should_be_synced(self, record: Record) -> bool:
return True
Loading
Loading