Skip to content

Commit 9f743f2

Browse files
author
Oleksandr Bazarnov
committed
Merge remote-tracking branch 'origin/main' into baz/cdk/add-deprecations-module
2 parents c94363e + b148ca5 commit 9f743f2

File tree

8 files changed

+201
-111
lines changed

8 files changed

+201
-111
lines changed

.github/workflows/connector-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ jobs:
7575
# Chargebee is being flaky:
7676
# - connector: source-chargebee
7777
# cdk_extra: n/a
78-
# These two are behind in CDK updates and can't be used as tests until they are updated:
79-
# - connector: source-s3
80-
# cdk_extra: file-based
78+
# This one is behind in CDK updates and can't be used as tests until it is updated:
8179
# - connector: destination-pinecone
8280
# cdk_extra: vector-db-based
81+
- connector: source-google-drive
82+
cdk_extra: file-based
8383
- connector: destination-motherduck
8484
cdk_extra: sql
8585
# ZenDesk currently failing (as of 2024-12-02)

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from airbyte_cdk.sources.declarative.extractors.record_filter import (
2020
ClientSideIncrementalRecordFilterDecorator,
2121
)
22-
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
22+
from airbyte_cdk.sources.declarative.incremental import (
23+
ConcurrentPerPartitionCursor,
24+
GlobalSubstreamCursor,
25+
)
2326
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
2427
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
2528
PerPartitionWithGlobalCursor,
@@ -361,7 +364,8 @@ def _group_streams(
361364
== DatetimeBasedCursorModel.__name__
362365
and hasattr(declarative_stream.retriever, "stream_slicer")
363366
and isinstance(
364-
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
367+
declarative_stream.retriever.stream_slicer,
368+
(GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
365369
)
366370
):
367371
stream_state = self._connector_state_manager.get_stream_state(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1471,7 +1471,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
14711471
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
14721472

14731473
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
1474-
use_global_cursor = isinstance(partition_router, GroupingPartitionRouter)
1474+
use_global_cursor = isinstance(
1475+
partition_router, GroupingPartitionRouter
1476+
) or component_definition.get("global_substream_cursor", False)
14751477

14761478
# Return the concurrent cursor and state converter
14771479
return ConcurrentPerPartitionCursor(

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ def get_request_property_chunks(
5252
chunk_size = 0
5353
for property_field in property_fields:
5454
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
55+
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
5556
property_field_size = (
5657
len(property_field)
58+
+ 3 # The +3 represents the extra characters for encoding the delimiter in between properties
5759
if self.property_limit_type == PropertyLimitType.characters
5860
else 1
5961
)

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 106 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import (
1111
Any,
1212
Callable,
13+
Dict,
1314
Iterable,
1415
List,
1516
Mapping,
@@ -367,14 +368,65 @@ def _read_pages(
367368
{"next_page_token": initial_token} if initial_token is not None else None
368369
)
369370
while not pagination_complete:
370-
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
371+
property_chunks: List[List[str]] = (
372+
list(
373+
self.additional_query_properties.get_request_property_chunks(
374+
stream_slice=stream_slice
375+
)
376+
)
377+
if self.additional_query_properties
378+
else [
379+
[]
380+
] # A single empty property chunk represents the case where property chunking is not configured
381+
)
371382

383+
merged_records: MutableMapping[str, Any] = defaultdict(dict)
372384
last_page_size = 0
373385
last_record: Optional[Record] = None
374-
for record in records_generator_fn(response):
375-
last_page_size += 1
376-
last_record = record
377-
yield record
386+
response: Optional[requests.Response] = None
387+
for properties in property_chunks:
388+
if len(properties) > 0:
389+
stream_slice = StreamSlice(
390+
partition=stream_slice.partition or {},
391+
cursor_slice=stream_slice.cursor_slice or {},
392+
extra_fields={"query_properties": properties},
393+
)
394+
395+
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
396+
for current_record in records_generator_fn(response):
397+
if (
398+
current_record
399+
and self.additional_query_properties
400+
and self.additional_query_properties.property_chunking
401+
):
402+
merge_key = (
403+
self.additional_query_properties.property_chunking.get_merge_key(
404+
current_record
405+
)
406+
)
407+
if merge_key:
408+
_deep_merge(merged_records[merge_key], current_record)
409+
else:
410+
# We should still emit records even if the record did not have a merge key
411+
last_page_size += 1
412+
last_record = current_record
413+
yield current_record
414+
else:
415+
last_page_size += 1
416+
last_record = current_record
417+
yield current_record
418+
419+
if (
420+
self.additional_query_properties
421+
and self.additional_query_properties.property_chunking
422+
):
423+
for merged_record in merged_records.values():
424+
record = Record(
425+
data=merged_record, stream_name=self.name, associated_slice=stream_slice
426+
)
427+
last_page_size += 1
428+
last_record = record
429+
yield record
378430

379431
if not response:
380432
pagination_complete = True
@@ -449,110 +501,43 @@ def read_records(
449501
:param stream_slice: The stream slice to read data for
450502
:return: The records read from the API source
451503
"""
452-
453-
property_chunks = (
454-
list(
455-
self.additional_query_properties.get_request_property_chunks(
456-
stream_slice=stream_slice
457-
)
458-
)
459-
if self.additional_query_properties
460-
else []
461-
)
462-
records_without_merge_key = []
463-
merged_records: MutableMapping[str, Any] = defaultdict(dict)
464-
465504
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
505+
466506
most_recent_record_from_slice = None
507+
record_generator = partial(
508+
self._parse_records,
509+
stream_slice=stream_slice,
510+
stream_state=self.state or {},
511+
records_schema=records_schema,
512+
)
467513

468-
if self.additional_query_properties:
469-
for properties in property_chunks:
470-
_slice = StreamSlice(
471-
partition=_slice.partition or {},
472-
cursor_slice=_slice.cursor_slice or {},
473-
extra_fields={"query_properties": properties},
474-
) # None-check
475-
476-
record_generator = partial(
477-
self._parse_records,
478-
stream_slice=_slice,
479-
stream_state=self.state or {},
480-
records_schema=records_schema,
481-
)
514+
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
515+
stream_state = self.state
482516

483-
for stream_data in self._read_pages(record_generator, self.state, _slice):
484-
current_record = self._extract_record(stream_data, _slice)
485-
if self.cursor and current_record:
486-
self.cursor.observe(_slice, current_record)
517+
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
518+
# fetch more records. The platform deletes stream state for full refresh streams before starting a
519+
# new job, so we don't need to worry about this value existing for the initial attempt
520+
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
521+
return
487522

488-
# Latest record read, not necessarily within slice boundaries.
489-
# TODO Remove once all custom components implement `observe` method.
490-
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
491-
most_recent_record_from_slice = self._get_most_recent_record(
492-
most_recent_record_from_slice, current_record, _slice
493-
)
523+
yield from self._read_single_page(record_generator, stream_state, _slice)
524+
else:
525+
for stream_data in self._read_pages(record_generator, self.state, _slice):
526+
current_record = self._extract_record(stream_data, _slice)
527+
if self.cursor and current_record:
528+
self.cursor.observe(_slice, current_record)
529+
530+
# Latest record read, not necessarily within slice boundaries.
531+
# TODO Remove once all custom components implement `observe` method.
532+
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
533+
most_recent_record_from_slice = self._get_most_recent_record(
534+
most_recent_record_from_slice, current_record, _slice
535+
)
536+
yield stream_data
494537

495-
if current_record and self.additional_query_properties.property_chunking:
496-
merge_key = (
497-
self.additional_query_properties.property_chunking.get_merge_key(
498-
current_record
499-
)
500-
)
501-
if merge_key:
502-
merged_records[merge_key].update(current_record)
503-
else:
504-
# We should still emit records even if the record did not have a merge key
505-
records_without_merge_key.append(current_record)
506-
else:
507-
yield stream_data
508538
if self.cursor:
509539
self.cursor.close_slice(_slice, most_recent_record_from_slice)
510-
511-
if len(merged_records) > 0:
512-
yield from [
513-
Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice)
514-
for merged_record in merged_records.values()
515-
]
516-
if len(records_without_merge_key) > 0:
517-
yield from records_without_merge_key
518-
else:
519-
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
520-
521-
most_recent_record_from_slice = None
522-
record_generator = partial(
523-
self._parse_records,
524-
stream_slice=stream_slice,
525-
stream_state=self.state or {},
526-
records_schema=records_schema,
527-
)
528-
529-
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
530-
stream_state = self.state
531-
532-
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
533-
# fetch more records. The platform deletes stream state for full refresh streams before starting a
534-
# new job, so we don't need to worry about this value existing for the initial attempt
535-
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
536-
return
537-
538-
yield from self._read_single_page(record_generator, stream_state, _slice)
539-
else:
540-
for stream_data in self._read_pages(record_generator, self.state, _slice):
541-
current_record = self._extract_record(stream_data, _slice)
542-
if self.cursor and current_record:
543-
self.cursor.observe(_slice, current_record)
544-
545-
# Latest record read, not necessarily within slice boundaries.
546-
# TODO Remove once all custom components implement `observe` method.
547-
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
548-
most_recent_record_from_slice = self._get_most_recent_record(
549-
most_recent_record_from_slice, current_record, _slice
550-
)
551-
yield stream_data
552-
553-
if self.cursor:
554-
self.cursor.close_slice(_slice, most_recent_record_from_slice)
555-
return
540+
return
556541

557542
def _get_most_recent_record(
558543
self,
@@ -639,6 +624,26 @@ def _to_partition_key(to_serialize: Any) -> str:
639624
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
640625

641626

627+
def _deep_merge(
628+
target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]]
629+
) -> None:
630+
"""
631+
Recursively merge two dictionaries, combining nested dictionaries instead of overwriting them.
632+
633+
:param target: The dictionary to merge into (modified in place)
634+
:param source: The dictionary to merge from
635+
"""
636+
for key, value in source.items():
637+
if (
638+
key in target
639+
and isinstance(target[key], MutableMapping)
640+
and isinstance(value, MutableMapping)
641+
):
642+
_deep_merge(target[key], value)
643+
else:
644+
target[key] = value
645+
646+
642647
@dataclass
643648
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
644649
"""

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3449,3 +3449,48 @@ def test_semaphore_cleanup():
34493449
assert '{"id":"2"}' not in cursor._semaphore_per_partition
34503450
assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped
34513451
assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state
3452+
3453+
3454+
def test_given_global_state_when_read_then_state_is_not_per_partition() -> None:
3455+
manifest = deepcopy(SUBSTREAM_MANIFEST)
3456+
manifest["definitions"]["post_comments_stream"]["incremental_sync"][
3457+
"global_substream_cursor"
3458+
] = True
3459+
manifest["streams"].remove({"$ref": "#/definitions/post_comment_votes_stream"})
3460+
record = {
3461+
"id": 9,
3462+
"post_id": 1,
3463+
"updated_at": COMMENT_10_UPDATED_AT,
3464+
}
3465+
mock_requests = [
3466+
(
3467+
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}",
3468+
{
3469+
"posts": [
3470+
{"id": 1, "updated_at": POST_1_UPDATED_AT},
3471+
],
3472+
},
3473+
),
3474+
# Fetch the first page of comments for post 1
3475+
(
3476+
"https://api.example.com/community/posts/1/comments?per_page=100",
3477+
{
3478+
"comments": [record],
3479+
},
3480+
),
3481+
]
3482+
3483+
run_mocked_test(
3484+
mock_requests,
3485+
manifest,
3486+
CONFIG,
3487+
"post_comments",
3488+
{},
3489+
[record],
3490+
{
3491+
"lookback_window": 1,
3492+
"parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}},
3493+
"state": {"updated_at": "2024-01-25T00:00:00Z"},
3494+
"use_global_cursor": True, # ensures that it is running the Concurrent CDK version as this is not populated in the declarative implementation
3495+
}, # this state does have per partition which would be under `states`
3496+
)

unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,18 @@
4343
["kate", "laurie", "jaclyn"],
4444
None,
4545
PropertyLimitType.characters,
46-
10,
46+
20,
4747
[["kate", "laurie"], ["jaclyn"]],
4848
id="test_property_chunking_limit_characters",
4949
),
50+
pytest.param(
51+
["laurie", "jaclyn", "kaitlin"],
52+
None,
53+
PropertyLimitType.characters,
54+
17, # laurie%2Cjaclyn%2C == 18, so this will create separate chunks
55+
[["laurie"], ["jaclyn"], ["kaitlin"]],
56+
id="test_property_chunking_includes_extra_delimiter",
57+
),
5058
pytest.param(
5159
[],
5260
None,

0 commit comments

Comments
 (0)