Skip to content

Commit e7db857

Browse files
maxi297octavia-squidington-iii
andauthored
fix: do not break on bad cursor value in perpartition cursor (#758)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent ae0e8aa commit e7db857

File tree

2 files changed

+50
-4
lines changed

2 files changed

+50
-4
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def __init__(
189189
# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
190190
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
191191
self._synced_some_data = False
192+
self._logged_regarding_datetime_format_error = False
192193

193194
@property
194195
def cursor_field(self) -> CursorField:
@@ -518,10 +519,23 @@ def observe(self, record: Record) -> None:
518519
except ValueError:
519520
return
520521

522+
try:
523+
record_cursor = self._connector_state_converter.output_format(
524+
self._connector_state_converter.parse_value(record_cursor_value)
525+
)
526+
except ValueError as exception:
527+
if not self._logged_regarding_datetime_format_error:
528+
logger.warning(
529+
"Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s",
530+
self._stream_name,
531+
self._cursor_field.cursor_field_key,
532+
record_cursor_value,
533+
exception,
534+
)
535+
self._logged_regarding_datetime_format_error = True
536+
return
537+
521538
self._synced_some_data = True
522-
record_cursor = self._connector_state_converter.output_format(
523-
self._connector_state_converter.parse_value(record_cursor_value)
524-
)
525539
self._update_global_cursor(record_cursor)
526540
if not self._use_global_cursor:
527541
self._cursor_per_partition[

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
ConcurrentDeclarativeSource,
2121
)
2222
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
23+
from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter
2324
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader
2425
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
2526
DeclarativePartition,
@@ -29,7 +30,7 @@
2930
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
3031
CustomFormatConcurrentStreamStateConverter,
3132
)
32-
from airbyte_cdk.sources.types import StreamSlice
33+
from airbyte_cdk.sources.types import Record, StreamSlice
3334
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
3435
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
3536

@@ -4400,3 +4401,34 @@ def test_duplicate_partition_while_processing():
44004401
assert len(cursor._processing_partitions_indexes) == 0
44014402
assert len(cursor._partition_key_to_index) == 0
44024403
assert len(cursor._partitions_done_generating_stream_slices) == 0
4404+
4405+
4406+
def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_break_sync():
4407+
cursor_factory_mock = MagicMock()
4408+
cursor_factory_mock.create.side_effect = [_make_inner_cursor("2024-01-01T00:00:00Z")]
4409+
cursor = ConcurrentPerPartitionCursor(
4410+
cursor_factory=MagicMock(),
4411+
partition_router=ListPartitionRouter(
4412+
values=["1"], cursor_field="partition_id", config={}, parameters={}
4413+
),
4414+
stream_name="test_stream",
4415+
stream_namespace=None,
4416+
stream_state={},
4417+
message_repository=MagicMock(),
4418+
connector_state_manager=MagicMock(),
4419+
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
4420+
datetime_format="%Y-%m-%dT%H:%M:%SZ",
4421+
input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"],
4422+
is_sequential_state=True,
4423+
cursor_granularity=timedelta(0),
4424+
),
4425+
cursor_field=CursorField(cursor_field_key="updated_at"),
4426+
)
4427+
4428+
cursor.observe(
4429+
Record(
4430+
data={"updated_at": ""},
4431+
stream_name="test_stream",
4432+
associated_slice=StreamSlice(partition={"partition_id": "1"}, cursor_slice={}),
4433+
)
4434+
)

0 commit comments

Comments
 (0)