Skip to content

Commit e4cbaaf

Browse files
authored
chore: remove cursor.is_greater_than_or_equal (#674)
1 parent a59d25f commit e4cbaaf

File tree

12 files changed

+3
-262
lines changed

12 files changed

+3
-262
lines changed

airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -429,17 +429,6 @@ def _send_log(self, level: Level, message: str) -> None:
429429
)
430430
)
431431

432-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
433-
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
434-
first_cursor_value = first.get(cursor_field)
435-
second_cursor_value = second.get(cursor_field)
436-
if first_cursor_value and second_cursor_value:
437-
return self.parse_date(first_cursor_value) >= self.parse_date(second_cursor_value)
438-
elif first_cursor_value:
439-
return True
440-
else:
441-
return False
442-
443432
def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
444433
"""
445434
Updates the lookback window based on a given number of seconds if the new duration

airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,12 +338,6 @@ def get_request_body_json(
338338
def should_be_synced(self, record: Record) -> bool:
339339
return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))
340340

341-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
342-
return self._stream_cursor.is_greater_than_or_equal(
343-
self._convert_record_to_cursor_record(first),
344-
self._convert_record_to_cursor_record(second),
345-
)
346-
347341
@staticmethod
348342
def _convert_record_to_cursor_record(record: Record) -> Record:
349343
return Record(

airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -315,21 +315,6 @@ def should_be_synced(self, record: Record) -> bool:
315315
self._convert_record_to_cursor_record(record)
316316
)
317317

318-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
319-
if not first.associated_slice or not second.associated_slice:
320-
raise ValueError(
321-
f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}"
322-
)
323-
if first.associated_slice.partition != second.associated_slice.partition:
324-
raise ValueError(
325-
f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}"
326-
)
327-
328-
return self._get_cursor(first).is_greater_than_or_equal(
329-
self._convert_record_to_cursor_record(first),
330-
self._convert_record_to_cursor_record(second),
331-
)
332-
333318
@staticmethod
334319
def _convert_record_to_cursor_record(record: Record) -> Record:
335320
return Record(

airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,3 @@ def get_request_body_json(
195195

196196
def should_be_synced(self, record: Record) -> bool:
197197
return self._get_active_cursor().should_be_synced(record)
198-
199-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
200-
return self._global_cursor.is_greater_than_or_equal(first, second)

airbyte_cdk/sources/declarative/incremental/resumable_full_refresh_cursor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,6 @@ def should_be_synced(self, record: Record) -> bool:
4242
"""
4343
return True
4444

45-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
46-
"""
47-
RFR record don't have ordering to be compared between one another.
48-
"""
49-
return False
50-
5145
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
5246
# A top-level RFR cursor only manages the state of a single partition
5347
return self._cursor

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
from airbyte_cdk.sources.declarative.requesters.requester import Requester
4242
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
4343
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
44-
from airbyte_cdk.sources.http_logger import format_http_message
4544
from airbyte_cdk.sources.source import ExperimentalClassWarning
4645
from airbyte_cdk.sources.streams.core import StreamData
4746
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
@@ -528,35 +527,13 @@ def read_records(
528527
if self.cursor and current_record:
529528
self.cursor.observe(_slice, current_record)
530529

531-
# Latest record read, not necessarily within slice boundaries.
532-
# TODO Remove once all custom components implement `observe` method.
533-
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
534-
most_recent_record_from_slice = self._get_most_recent_record(
535-
most_recent_record_from_slice, current_record, _slice
536-
)
537530
yield stream_data
538531

539532
if self.cursor:
540-
self.cursor.close_slice(_slice, most_recent_record_from_slice)
533+
self.cursor.close_slice(_slice)
541534
return
542535

543-
def _get_most_recent_record(
544-
self,
545-
current_most_recent: Optional[Record],
546-
current_record: Optional[Record],
547-
stream_slice: StreamSlice,
548-
) -> Optional[Record]:
549-
if self.cursor and current_record:
550-
if not current_most_recent:
551-
return current_record
552-
else:
553-
return (
554-
current_most_recent
555-
if self.cursor.is_greater_than_or_equal(current_most_recent, current_record)
556-
else current_record
557-
)
558-
else:
559-
return None
536+
# FIXME based on the comment above in SimpleRetriever.read_records, it seems like we can tackle https://github.com/airbytehq/airbyte-internal-issues/issues/6955 and remove this
560537

561538
def _extract_record(
562539
self, stream_data: StreamData, stream_slice: StreamSlice

airbyte_cdk/sources/streams/checkpoint/cursor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,6 @@ def should_be_synced(self, record: Record) -> bool:
6262
Evaluating if a record should be synced allows for filtering and stop condition on pagination
6363
"""
6464

65-
@abstractmethod
66-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
67-
"""
68-
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
69-
"""
70-
7165
@abstractmethod
7266
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
7367
"""

airbyte_cdk/sources/streams/checkpoint/resumable_full_refresh_cursor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,6 @@ def should_be_synced(self, record: Record) -> bool:
4040
"""
4141
return True
4242

43-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
44-
"""
45-
RFR record don't have ordering to be compared between one another.
46-
"""
47-
return False
48-
4943
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
5044
# A top-level RFR cursor only manages the state of a single partition
5145
return self._cursor

airbyte_cdk/sources/streams/checkpoint/substream_resumable_full_refresh_cursor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,6 @@ def should_be_synced(self, record: Record) -> bool:
8989
"""
9090
return True
9191

92-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
93-
"""
94-
RFR record don't have ordering to be compared between one another.
95-
"""
96-
return False
97-
9892
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
9993
if not stream_slice:
10094
raise ValueError("A partition needs to be provided in order to extract a state")

unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,57 +1205,5 @@ def test_given_record_without_cursor_value_when_should_be_synced_then_return_tru
12051205
assert cursor.should_be_synced(Record({"record without cursor value": "any"}, ANY_SLICE))
12061206

12071207

1208-
def test_given_first_greater_than_second_then_return_true():
1209-
cursor = DatetimeBasedCursor(
1210-
start_datetime=MinMaxDatetime("3000-01-01", parameters={}),
1211-
cursor_field="cursor_field",
1212-
datetime_format="%Y-%m-%d",
1213-
config=config,
1214-
parameters={},
1215-
)
1216-
assert cursor.is_greater_than_or_equal(
1217-
Record({"cursor_field": "2023-01-01"}, {}), Record({"cursor_field": "2021-01-01"}, {})
1218-
)
1219-
1220-
1221-
def test_given_first_lesser_than_second_then_return_false():
1222-
cursor = DatetimeBasedCursor(
1223-
start_datetime=MinMaxDatetime("3000-01-01", parameters={}),
1224-
cursor_field="cursor_field",
1225-
datetime_format="%Y-%m-%d",
1226-
config=config,
1227-
parameters={},
1228-
)
1229-
assert not cursor.is_greater_than_or_equal(
1230-
Record({"cursor_field": "2021-01-01"}, {}), Record({"cursor_field": "2023-01-01"}, {})
1231-
)
1232-
1233-
1234-
def test_given_no_cursor_value_for_second_than_second_then_return_true():
1235-
cursor = DatetimeBasedCursor(
1236-
start_datetime=MinMaxDatetime("3000-01-01", parameters={}),
1237-
cursor_field="cursor_field",
1238-
datetime_format="%Y-%m-%d",
1239-
config=config,
1240-
parameters={},
1241-
)
1242-
assert cursor.is_greater_than_or_equal(
1243-
Record({"cursor_field": "2021-01-01"}, {}), Record({}, {})
1244-
)
1245-
1246-
1247-
def test_given_no_cursor_value_for_first_than_second_then_return_false():
1248-
cursor = DatetimeBasedCursor(
1249-
start_datetime=MinMaxDatetime("3000-01-01", parameters={}),
1250-
cursor_field="cursor_field",
1251-
datetime_format="%Y-%m-%d",
1252-
config=config,
1253-
parameters={},
1254-
)
1255-
assert not cursor.is_greater_than_or_equal(
1256-
Record({}, {}), Record({"cursor_field": "2021-01-01"}, {})
1257-
)
1258-
1259-
12601208
if __name__ == "__main__":
12611209
unittest.main()

0 commit comments

Comments
 (0)