Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,18 @@
import functools
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple
from typing import (
Any,
Callable,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Protocol,
Tuple,
Union,
)

from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import MessageRepository
Expand Down Expand Up @@ -175,7 +186,9 @@ def __init__(
self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
self._lookback_window = lookback_window
self._slice_range = slice_range
self._most_recent_cursor_value_per_partition: MutableMapping[StreamSlice, Any] = {}
self._most_recent_cursor_value_per_partition: MutableMapping[
Union[StreamSlice, Mapping[str, Any], None], Any
] = {}
self._has_closed_at_least_one_slice = False
self._cursor_granularity = cursor_granularity
# Flag to track if the logger has been triggered (per stream)
Expand Down Expand Up @@ -216,10 +229,13 @@ def observe(self, record: Record) -> None:
most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
record.associated_slice
)
cursor_value = self._extract_cursor_value(record)
try:
cursor_value = self._extract_cursor_value(record)

if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
except ValueError:
self._log_for_record_without_cursor_value()

def _extract_cursor_value(self, record: Record) -> Any:
return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
Expand Down Expand Up @@ -459,10 +475,13 @@ def should_be_synced(self, record: Record) -> bool:
try:
record_cursor_value: CursorValueType = self._extract_cursor_value(record) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
except ValueError:
if not self._should_be_synced_logger_triggered:
LOGGER.warning(
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record. The incremental sync will assume it needs to be synced"
)
self._should_be_synced_logger_triggered = True
self._log_for_record_without_cursor_value()
return True
return self.start <= record_cursor_value <= self._end_provider()

def _log_for_record_without_cursor_value(self) -> None:
if not self._should_be_synced_logger_triggered:
LOGGER.warning(
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
)
self._should_be_synced_logger_triggered = True
14 changes: 14 additions & 0 deletions unit_tests/sources/streams/concurrent/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor:
_NO_LOOKBACK_WINDOW,
)

def test_given_no_cursor_value_when_observe_then_do_not_raise(self) -> None:
cursor = self._cursor_with_slice_boundary_fields()
partition = _partition(_NO_SLICE)

cursor.observe(
Record(
data={"record_with_A_CURSOR_FIELD_KEY": "any value"},
associated_slice=partition.to_slice(),
stream_name=_A_STREAM_NAME,
)
)

# did not raise

def test_given_boundary_fields_when_close_partition_then_emit_state(self) -> None:
cursor = self._cursor_with_slice_boundary_fields()
cursor.close_partition(
Expand Down
Loading