-
Notifications
You must be signed in to change notification settings - Fork 30
fix(connector-builder): Re-revert "fix: revert remerge concurrent cdk builder change because of flaky test" #712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…flaky te…" This reverts commit 1c9049a.
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@revert-705-revert_concurrent_changes#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch revert-705-revert_concurrent_changes Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Fast)3 692 tests - 2 3 681 ✅ - 2 6m 26s ⏱️ -5s Results for commit c0b35ab. ± Comparison against base commit cd48741. This pull request removes 6 and adds 4 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
📝 WalkthroughWalkthroughSwaps connector-builder to construct and return a queue-backed ConcurrentDeclarativeSource with imported TestLimits, forces builder concurrency_level=1, wires a shared Queue and ConcurrentMessageRepository, adds PartitionLogger/SliceLogger and per-partition Cursor handling, propagates limits/slice/record caps, and updates call signatures and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant CB as Connector Builder
participant CDS as ConcurrentDeclarativeSource
participant CS as ConcurrentSource
participant Q as Shared Queue
participant MR as ConcurrentMessageRepository
participant PR as PartitionReader
participant C as Cursor
CB->>CDS: create_source(config, limits, catalog, state)
CDS->>CS: create(..., queue=Q, message_repository=MR)
CB->>CDS: read()
CDS->>CS: read()
CS->>Q: enqueue Partition items
loop per Partition
CS->>PR: submit process_partition(partition, cursor=C)
PR->>MR: emit slice log via PartitionLogger (optional)
PR->>C: observe(record)
PR-->>Q: enqueue Record / AirbyteMessage
PR->>C: close_partition(partition)
PR-->>Q: enqueue PartitionCompleteSentinel
end
CS->>Q: enqueue PartitionGenerationCompletedSentinel
CDS->>CB: consume queue -> emit ordered AirbyteMessages
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Would you like a compact comparison diagram contrasting the old ManifestDeclarativeSource flow with the new queue-based ConcurrentDeclarativeSource flow, wdyt? Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🔭 Outside diff range comments (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
149-157
: Callcursor()
before observing recordsSame issue here:
stream.cursor.observe(record)
accesses the method attribute instead of the Cursor instance. Can we callcursor()
to get the instance, then invokeobserve
? wdyt?- stream.cursor.observe(record) + stream.cursor().observe(record)
239-241
: Ensure we callcursor()
before invokingensure_at_least_one_state_emitted()
This is the same method-vs-property mistake. Please call the accessor.
- stream.cursor.ensure_at_least_one_state_emitted() + stream.cursor().ensure_at_least_one_state_emitted()
♻️ Duplicate comments (10)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (10)
3702-3709
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
3887-3894
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
3961-3968
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
4045-4054
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
4166-4173
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
4251-4258
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
4263-4271
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
4277-4283
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
4335-4341
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
4346-4352
: Same construction pattern duplication hereSame suggestion as above to DRY up repeated DeclarativePartition(...) calls via a helper, wdyt?
🧹 Nitpick comments (19)
airbyte_cdk/connector_builder/test_reader/message_grouper.py (1)
53-53
: Docstring still claims limit-based termination; propose updating to match behavior and document stream_name, wdyt?Given the loop now drains the iterator, the doc should no longer promise stopping at
limit
. Also,stream_name
isn’t documented. How about this tweak?@@ - This function iterates over the provided messages until the number of record messages processed reaches the specified limit. + This function iterates over the provided messages until the iterator is exhausted. + Note: Any record limits (e.g., for testing) are applied upstream by the caller; this function does not enforce `limit`. @@ - Parameters: + Parameters: messages (Iterator[AirbyteMessage]): An iterator yielding AirbyteMessage instances. schema_inferrer (SchemaInferrer): An instance used to infer and update schema based on record messages. datetime_format_inferrer (DatetimeFormatInferrer): An instance used to infer datetime formats from record messages. - limit (int): The maximum number of record messages to process before stopping. + limit (int): Deprecated/unused here; preserved for compatibility. Limits are enforced by the caller and may be removed in a future release. + stream_name (str): The target stream name; page HTTP requests for other streams are ignored.If you prefer to keep enforcement here (previous semantics), we can instead keep the original description and re-add the limit check per Option B above. Which direction do you prefer, wdyt?
Also applies to: 68-74
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
625-627
: Use MAX_SLICES to replace magic numbers for slice limitsDefining MAX_SLICES is great; could we replace the hard-coded 5s later in this file to avoid drift and centralize the default, wdyt?
For example:
- maximum_number_of_slices=self._limit_slices_fetched or 5, + maximum_number_of_slices=self._limit_slices_fetched or MAX_SLICES,and similarly in the AsyncRetriever path:
- maximum_number_of_slices=self._limit_slices_fetched or 5, + maximum_number_of_slices=self._limit_slices_fetched or MAX_SLICES,airbyte_cdk/connector_builder/main.py (1)
94-99
: Good call passing catalog/state into create_source; consider widening the type hint ofsource
and dropping ManifestDeclarativeSource importThe keyword-arg call into create_source aligns with the new ConcurrentDeclarativeSource API. Since create_source now returns a ConcurrentDeclarativeSource, would you switch the
handle_connector_builder_request
parameter type to the commonSource
and remove the now-superfluous ManifestDeclarativeSource import to keep typing accurate, wdyt?Proposed patch:
-from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +# (import removed) def handle_connector_builder_request( - source: ManifestDeclarativeSource, + source: Source, command: str, config: Mapping[str, Any], catalog: Optional[ConfiguredAirbyteCatalog], - state: List[AirbyteStateMessage], + state: List[AirbyteStateMessage], limits: TestLimits, ) -> AirbyteMessage:Optionally, if you want to reflect that state may be absent for non-
test_read
requests, we could widen that annotation toOptional[List[AirbyteStateMessage]]
as well. Do you want to go that route, or keep[]
as the “no state” sentinel, wdyt?unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
355-361
: Good call passing limits=TestLimits() to avoid HttpClient caching bleed-throughUsing limits=TestLimits() here prevents cross-test interference from HttpClient caching. For consistency and isolation, would you also want to pass limits=TestLimits() in test_dynamic_schema_loader_manifest_flow (Lines 275-277) to make both discovery tests deterministic, wdyt?
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
34-44
: QueueItem now includes AirbyteMessage; consider updating the docstring for clarityThe widened typing to include AirbyteMessage is sensible and matches the concurrent repository/queue routing changes. Tiny nit: could we update the nearby docstring to reflect that protocol messages are now valid queue entries, wdyt?
Apply this docstring tweak:
-""" -Typedef representing the items that can be added to the ThreadBasedConcurrentStream -""" +""" +Typedef representing the items that can be enqueued/consumed by the concurrent stream pipeline + (ThreadBasedConcurrentStream and friends). Now also includes protocol-level AirbyteMessage items. +"""unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
3616-3624
: Reduce repeated DeclarativePartition construction with a tiny helperYou repeatedly construct DeclarativePartition with the same argument shape (stream_name/json_schema/retriever/message_repository/max_records_limit=None). A small local helper (e.g., make_partition(slice)) would reduce duplication and the chance of mismatched args across tests. Worth it for readability and maintainability, wdyt?
airbyte_cdk/connector_builder/test_reader/helpers.py (2)
471-473
: Type shape mismatch: state list now may contain AirbyteStateMessage; widen the type hintsYou’re returning a list element that can be either Dict[str, Any] or a mutated AirbyteStateMessage (after converting its stream_state). The StreamReadSlices.state typing likely expects a JSON-serializable mapping list. Two small improvements:
- Widen handle_current_slice’s latest_state_message param to accept Union[AirbyteStateMessage, Dict[str, Any]].
- Make convert_state_blob_to_mapping’s return annotation Union[AirbyteStateMessage, Dict[str, Any]], to reflect reality.
Would you like to adopt the following signatures, wdyt?
# Outside the changed hunk: update handle_current_slice signature def handle_current_slice( current_slice_pages: List[StreamReadPages], current_slice_descriptor: Optional[Dict[str, Any]] = None, latest_state_message: Optional[Union[AirbyteStateMessage, Dict[str, Any]]] = None, auxiliary_requests: Optional[List[AuxiliaryRequest]] = None, ) -> StreamReadSlices: ...
725-743
: Return type should reflect actual behavior (and document in-place mutation)convert_state_blob_to_mapping can return either the mutated AirbyteStateMessage or a dict. The current annotation says Dict[str, Any], which is misleading. Can we update the return type and docstring accordingly, wdyt?
-def convert_state_blob_to_mapping( - state_message: Union[AirbyteStateMessage, Dict[str, Any]], -) -> Dict[str, Any]: +def convert_state_blob_to_mapping( + state_message: Union[AirbyteStateMessage, Dict[str, Any]], +) -> Union[AirbyteStateMessage, Dict[str, Any]]: @@ - The AirbyteStreamState stores state as an AirbyteStateBlob which deceivingly is not - a dictionary, but rather a list of kwargs fields. This in turn causes it to not be - properly turned into a dictionary when translating this back into response output - by the connector_builder_handler using asdict() + Converts an AirbyteStateBlob found in an AirbyteStateMessage.stream.stream_state into a plain dict, + in-place, so downstream serialization (e.g., asdict()/JSON) is stable. If the input is already a + mapping, it is returned unchanged. @@ - return state_message # type: ignore # See above, but when this is an AirbyteStateMessage we must convert AirbyteStateBlob to a Dict + return state_message # type: ignoreOptionally, would you prefer we avoid in-place mutation and return a pure dict snapshot instead to eliminate side effects (I can draft that version too), wdyt?
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
78-149
: Great boundary tests for max_records_limit; consider centralizing the counter resetThe new tests covering the global total_record_counter behavior are valuable. Since the counter is global, would you like to reset it in a setUp/tearDown (or a pytest fixture) to guarantee isolation even if future tests forget to reset it inline, wdyt?
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
118-127
: Sentinel handling simplification looks goodRemoving per-partition cursor closing here is consistent with moving that responsibility into
PartitionReader.process_partition
. The remaining logic (tracking running partitions and marking the stream done when appropriate) reads cleanly. Would you like to add a debug log when a failed sentinel is observed (i.e.,sentinel.is_successful is False
) to aid future triage, or should we continue relying solely on the exception path? wdyt?airbyte_cdk/sources/message/concurrent_repository.py (1)
24-43
: Queue-backed repository wrapper reads well and matches the ordering goalPushing messages to the shared queue immediately after delegating to the decorated repository achieves the desired ordering guarantees for the main thread. This is a pragmatic design that keeps the wrapper thin. Would you like a small private helper (e.g.,
_drain_decorated_queue()
) to remove the duplication betweenemit_message
andlog_message
, or shall we keep it as-is for clarity? wdyt?airbyte_cdk/sources/utils/slice_logger.py (1)
14-17
: Comment clarifies intent; consider future TODO with link/issue?The note about eventually removing this abstraction in favor of direct MessageRepository writes is helpful. Do you want to add a TODO with an issue reference to track it, so it isn’t lost? wdyt?
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
28-43
: Lightweight PartitionLogger is a good fit; emits slice logs only when appropriateThe partition-scoped slice logging via
SliceLogger.should_log_slice_message
keeps noise down and preserves ordering by writing straight to the repository. Minor wording nit: “partitions message repository” could be “partition’s message repository” for readability—want me to send a tiny docstring tweak? wdyt?
63-87
: Cursor lifecycle is handled in the worker thread correctly
- Logging slice (if configured), streaming records, observing with the cursor, and closing the partition before emitting the success sentinel is the right flow.
- Exception path emits both the error and a failed sentinel, which the main thread can interpret accordingly.
Optionally, do you want to add a very narrow
except BaseException as e
guard forKeyboardInterrupt
-like cases to re-raise, while keepingexcept Exception
for operational errors? In long-running workers it can help termination behave as expected, but I’m fine either way. wdyt?unit_tests/sources/streams/concurrent/test_partition_reader.py (2)
10-12
: Consider using relative imports for consistencyThe import for
InMemoryMessageRepository
uses an absolute path whileStreamThreadException
uses a relative path fromairbyte_cdk.sources.concurrent_source
. Should we standardize these imports for better consistency? wdyt?-from airbyte_cdk import InMemoryMessageRepository +from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
56-57
: Verify cursor method invocations are completeThe test verifies that
cursor.observe
andcursor.close_partition
are called, which is great. However, should we also verify thatcursor.observe
was called with the correct record arguments for each of the records in_RECORDS
? This would ensure the cursor is properly tracking all emitted records. wdyt?-cursor.observe.assert_called() +assert cursor.observe.call_count == len(_RECORDS) +for i, record in enumerate(_RECORDS): + cursor.observe.assert_any_call(record) cursor.close_partition.assert_called_once()unit_tests/connector_builder/test_connector_builder_handler.py (3)
56-59
: Consolidate TestLimits import to avoid shadowing and reduce confusion
TestLimits
is imported twice (once from connector_builder_handler and again here), which shadows the earlier import and makes it unclear which class is used. Could we import it from a single module to keep things unambiguous, wdyt?One simple tweak is to keep the re-export from connector_builder_handler and drop the duplicate here:
Apply this diff:
-from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( - ConcurrentDeclarativeSource, - TestLimits, -) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +)
533-535
: DRY: factor a helper/fixture for ConcurrentDeclarativeSource constructionThe same construction pattern for
ConcurrentDeclarativeSource(catalog=None, config=config, state=None, source_config=...)
appears multiple times in this file. Would you consider a small helper or fixture to centralize this and make future signature changes easier, wdyt?Example (new helper in this test module):
def make_source(manifest, config): return ConcurrentDeclarativeSource(catalog=None, config=config, state=None, source_config=manifest)Then here:
- source = ConcurrentDeclarativeSource( - catalog=None, config=config, state=None, source_config=MANIFEST - ) + source = make_source(MANIFEST, config)
1019-1025
: Tests assert on private internals; consider asserting behavior insteadAsserting
source._constructor._limit_pages_fetched_per_slice
andsource._constructor._limit_slices_fetched
reaches into private state, which can make tests brittle against refactors. Would you consider validating these via observable behavior (e.g., number of pages/slices produced in a small read) or exposing minimal public accessors in test-only paths, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (23)
airbyte_cdk/connector_builder/connector_builder_handler.py
(3 hunks)airbyte_cdk/connector_builder/main.py
(1 hunks)airbyte_cdk/connector_builder/test_reader/helpers.py
(4 hunks)airbyte_cdk/connector_builder/test_reader/message_grouper.py
(1 hunks)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(2 hunks)airbyte_cdk/sources/concurrent_source/concurrent_source.py
(7 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(8 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(6 hunks)airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
(1 hunks)airbyte_cdk/sources/message/concurrent_repository.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/partition_reader.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/partitions/types.py
(2 hunks)airbyte_cdk/sources/utils/slice_logger.py
(1 hunks)unit_tests/connector_builder/test_connector_builder_handler.py
(16 hunks)unit_tests/connector_builder/test_message_grouper.py
(0 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(9 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(1 hunks)unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
(2 hunks)unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(5 hunks)unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py
(1 hunks)unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
(2 hunks)unit_tests/sources/streams/concurrent/test_partition_reader.py
(5 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/connector_builder/test_message_grouper.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Applied to files:
unit_tests/connector_builder/test_connector_builder_handler.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
🧬 Code Graph Analysis (18)
unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py (3)
streams
(65-79)NeverLogSliceLogger
(174-176)message_repository
(107-108)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (2)
airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
StreamSlicer
(27-39)airbyte_cdk/sources/types.py (1)
StreamSlice
(75-169)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/sources/types.py (1)
Record
(21-72)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel
(9-24)
airbyte_cdk/sources/message/concurrent_repository.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/connector_builder/models.py (1)
LogMessage
(25-29)airbyte_cdk/sources/message/repository.py (1)
MessageRepository
(45-60)
airbyte_cdk/connector_builder/main.py (1)
airbyte_cdk/connector_builder/connector_builder_handler.py (1)
create_source
(65-89)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
DeclarativePartition
(53-105)stream_name
(101-102)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
close_partition
(159-184)
airbyte_cdk/connector_builder/test_reader/helpers.py (2)
airbyte_cdk/models/airbyte_protocol.py (2)
AirbyteStateBlob
(15-50)AirbyteStateMessage
(67-75)unit_tests/connector_builder/test_message_grouper.py (1)
state_message
(846-854)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (6)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
cursor
(87-90)airbyte_cdk/sources/streams/concurrent/adapters.py (2)
cursor
(196-197)stream_name
(310-311)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor
(94-95)airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
stream_name
(36-41)airbyte_cdk/sources/message/concurrent_repository.py (2)
emit_message
(28-31)consume_queue
(38-43)airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
process_partition
(63-86)
unit_tests/sources/streams/concurrent/test_partition_reader.py (4)
airbyte_cdk/sources/message/repository.py (1)
InMemoryMessageRepository
(74-95)airbyte_cdk/sources/streams/concurrent/cursor.py (7)
FinalStateCursor
(89-131)observe
(56-60)observe
(111-112)observe
(224-234)close_partition
(63-67)close_partition
(114-115)close_partition
(239-247)airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
process_partition
(63-86)airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
PartitionCompleteSentinel
(15-31)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)
airbyte_cdk/sources/types.py (4)
Record
(21-72)StreamSlice
(75-169)data
(35-36)partition
(99-104)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (4)
DeclarativePartitionFactory
(22-50)stream_name
(101-102)create
(42-50)read
(71-96)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/message/concurrent_repository.py (1)
ConcurrentMessageRepository
(11-43)airbyte_cdk/sources/message/repository.py (2)
InMemoryMessageRepository
(74-95)MessageRepository
(45-60)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
stream_name
(101-102)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (5)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
cursor
(87-90)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor
(94-95)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
on_partition
(89-105)airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
submit
(45-46)airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
process_partition
(63-86)
unit_tests/connector_builder/test_connector_builder_handler.py (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
ConcurrentDeclarativeSource
(82-565)TestLimits
(68-79)airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
resolved_manifest
(158-168)airbyte_cdk/connector_builder/connector_builder_handler.py (2)
resolve_manifest
(131-145)create_source
(65-89)
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
TestLimits
(68-79)ConcurrentDeclarativeSource
(82-565)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (10)
airbyte_cdk/sources/concurrent_source/stream_thread_exception.py (1)
StreamThreadException
(6-25)airbyte_cdk/sources/message/repository.py (1)
MessageRepository
(45-60)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
cursor
(87-90)airbyte_cdk/sources/streams/concurrent/cursor.py (7)
Cursor
(50-86)observe
(56-60)observe
(111-112)observe
(224-234)close_partition
(63-67)close_partition
(114-115)close_partition
(239-247)airbyte_cdk/sources/types.py (1)
partition
(99-104)airbyte_cdk/sources/streams/concurrent/partitions/partition.py (3)
Partition
(11-48)to_slice
(25-33)read
(17-22)airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
PartitionCompleteSentinel
(15-31)airbyte_cdk/sources/utils/slice_logger.py (5)
SliceLogger
(18-46)should_log_slice_message
(41-46)should_log_slice_message
(50-56)should_log_slice_message
(60-61)create_slice_log_message
(26-38)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
to_slice
(98-99)read
(71-96)airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)
read
(103-130)
airbyte_cdk/connector_builder/connector_builder_handler.py (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
ConcurrentDeclarativeSource
(82-565)TestLimits
(68-79)unit_tests/connector_builder/test_connector_builder_handler.py (1)
manifest_declarative_source
(956-957)airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
ManifestDeclarativeSource
(97-610)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (4)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
PartitionLogger
(22-42)PartitionReader
(45-86)airbyte_cdk/sources/utils/slice_logger.py (2)
SliceLogger
(18-46)DebugSliceLogger
(49-56)airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)
PartitionEnqueuer
(16-64)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (24)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)
14-21
: LGTM: expanded import block is consistent with AirbyteMessage.record usageImporting AirbyteRecordMessage alongside AirbyteMessage and friends is aligned with the protocol types and doesn’t alter test behavior. All good here, wdyt?
unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)
53-57
: LGTM: switching to keyword args for ConcurrentSource is correctThe constructor mapping (threadpool, logger, slice_logger, message_repository) looks accurate and mirrors the updated API. No issues spotted, wdyt?
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1)
7-11
: LGTM: import cleanupRemoving unused typing imports and StreamState keeps this module lean with no behavior change. Looks good, wdyt?
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
11-14
: Switching to TestLimits import aligns with the new ConcurrentDeclarativeSource APIBringing in TestLimits from concurrent_declarative_source is consistent with the PR’s direction and avoids pulling in lower-level factory details. Looks good to me.
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
7-14
: Import and usage style looks solid, and switching to Record improves fidelityImporting the module as declarative_partition_generator and using Record objects in tests increases parity with runtime behavior. Nice improvement.
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
98-105
: Ignore the cursor‐as‐method suggestionIt turns out
cursor
is defined with an@property
on each stream class (e.g. indefault_stream.py
,abstract_stream.py
, etc.), sostream.cursor
already returns theCursor
instance. Callingcursor()
would actually break existing call sites. No change is needed here—please disregard the proposed diff. wdyt?Likely an incorrect or invalid review comment.
unit_tests/sources/streams/concurrent/test_partition_reader.py (3)
30-30
: PartitionReader initialization looks good!The updated initialization with queue and None for the partition logger aligns well with the production code changes where
PartitionReader
now accepts an optionalPartitionLogger
.
33-39
: Good test setup for FinalStateCursorThe test properly creates a
FinalStateCursor
with the required parameters, which aligns with the new cursor-based interaction pattern introduced in the production code.
75-91
: Excellent test coverage for exception handlingGreat addition of the test case for exceptions raised during
close_partition
! This ensures proper error handling and queue ordering when cursor operations fail.airbyte_cdk/sources/concurrent_source/concurrent_source.py (5)
46-46
: Well-designed queue injection for testingThe optional queue parameter is a clean way to allow external queue injection for testing while maintaining backward compatibility. Nice design choice!
97-101
: Queue size configuration looks reasonableThe hardcoded queue size of 10,000 seems reasonable for most use cases. The comment clearly explains the rationale behind this choice. Should we consider making this configurable in the future if different sources have varying memory constraints? Just a thought for future consideration.
115-118
: Good use of PartitionLogger wrapperThe introduction of
PartitionLogger
wrapper with slice logger, logger, and message repository provides a clean abstraction for partition-level logging. This separation of concerns is well done!
150-155
: Robust termination logic improvementExcellent fix for the queue consumption loop! Checking
queue.empty()
beforeis_done()
prevents missing items when exceptions occur. This addresses the race condition mentioned in the comment effectively.
173-174
: Necessary addition for AirbyteMessage handlingGood addition of the
AirbyteMessage
case to properly propagate these messages downstream. This ensures that all message types flow through the concurrent processing pipeline correctly.airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (3)
72-79
: Early return optimization for max_records_limitGood implementation of the pre-check to avoid unnecessary processing when the limit is already reached. The early return pattern is clean and efficient.
118-127
: Clean slice limit implementation with decorator patternNice use of the decorator pattern with
StreamSlicerTestReadDecorator
to wrap the stream slicer when slice limits are applied. The type casting is appropriate here.
95-96
: Consider atomic increment for thread safetySince we're in a concurrent context, the increment of
total_record_counter
after yielding the record could potentially lead to race conditions. Should we ensure the increment happens atomically or consider incrementing before yielding to guarantee the count is accurate? wdyt?if self._max_records_limit: - total_record_counter += 1 + with _counter_lock: # Assuming we add the lock as suggested above + total_record_counter += 1Likely an incorrect or invalid review comment.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
67-79
: Well-structured TestLimits dataclassThe
TestLimits
dataclass is well-designed with sensible defaults and the__test__
class variable to prevent pytest from treating it as a test class. Good attention to detail!
105-109
: Queue size consideration aligns with concurrent sourceThe queue initialization with maxsize 10,000 matches the concurrent source implementation, maintaining consistency across the codebase. The explanatory comment is helpful.
121-121
: Smart use of ConcurrentMessageRepository wrapperWrapping the
InMemoryMessageRepository
withConcurrentMessageRepository
is a clean way to ensure thread-safe message handling in the concurrent processing model. This decorator pattern works well here!
324-371
: Consistent keyword argument usage in partition factory creationGreat job converting all
DeclarativePartitionFactory
instantiations to use keyword arguments! This makes the code more readable and less prone to parameter ordering mistakes. The consistent pattern across all branches (lines 324-371, 395-405, 459-467) is well maintained.airbyte_cdk/connector_builder/connector_builder_handler.py (3)
65-70
: API signature evolution looks goodThe updated
create_source
signature properly accepts catalog and state parameters, aligning with theConcurrentDeclarativeSource
requirements. The return type annotation is also correctly updated.
73-78
: Smart concurrency level enforcement for builder messagesExcellent approach to enforce
concurrency_level = 1
for the Connector Builder! This ensures message ordering is preserved, which is critical for the grouping of builder message responses. The code handles both cases where the concurrency_level key exists or needs to be added.
36-43
: Clean transition to imported TestLimitsThe migration from local
TestLimits
constants to using the importedTestLimits
class defaults is well executed. The code maintains backward compatibility while leveraging the centralized limit definitions.
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Show resolved
Hide resolved
PyTest Results (Full)3 695 tests - 2 3 684 ✅ - 2 9m 37s ⏱️ +3s Results for commit c0b35ab. ± Comparison against base commit cd48741. This pull request removes 6 and adds 4 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte_cdk/sources/streams/http/http_client.py (2)
146-149
: Remove duplicated comments for clarityThe “Use in-memory cache if cache_dir is not set” comment block is duplicated. Can we condense it to one succinct version, wdyt?
- # Use in-memory cache if cache_dir is not set - # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests - # Use in-memory cache if cache_dir is not set - # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests + # Use in-memory cache if cache_dir is not set. + # This non-obvious interface ensures we don't write SQLite files during unit tests.
162-166
: Drop redundantcache_name
when passing abackend
Since you’re already instantiating a
SQLiteCache
with the full path, thecache_name
argument will be ignored byinit_backend
when abackend
instance is provided. To keep things clear, could we removecache_name
here?File: airbyte_cdk/sources/streams/http/http_client.py
- return CachedLimiterSession( - cache_name=sqlite_path, - backend=backend, - api_budget=self._api_budget, - match_headers=True, - ) + return CachedLimiterSession( + backend=backend, + api_budget=self._api_budget, + match_headers=True, + )wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
airbyte_cdk/sources/streams/http/http_client.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (python)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (5)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
17-21
: Global record counter is shared state; thread-safety and scoping are risky—can we guard or narrow it, wdyt?This module-level
total_record_counter
will be updated by multiple partitions (and potentially multiple streams) when concurrency > 1, which can lead to race conditions and cross-stream interference. The intent seems limited to Connector Builder test reads (concurrency=1), but this file is more general-purpose.Would you consider adding a lock and helper funcs to make updates atomic (and optionally provide a reset/get API for tests), or scoping the counter per stream/partition generator instead, wdyt?
+import threading @@ -total_record_counter = 0 +_record_counter_lock = threading.Lock() +total_record_counter = 0 + +def _increment_total_record_counter() -> int: + global total_record_counter + with _record_counter_lock: + total_record_counter += 1 + return total_record_counter + +def _get_total_record_counter() -> int: + with _record_counter_lock: + return total_record_counter + +def _reset_total_record_counter() -> None: + global total_record_counter + with _record_counter_lock: + total_record_counter = 0Could you confirm all Connector Builder test-read paths still enforce
concurrency=1
(e.g., via create_source) so we’re safe short-term, and if so, do you want me to follow up with a lock-scoped implementation as above? wdyt?unit_tests/connector_builder/test_connector_builder_handler.py (4)
896-905
: Avoid mutating TEST_READ_CONFIG in-place—copy before editing to prevent order-dependent flakes.This test modifies the module-level
TEST_READ_CONFIG
, which can leak across tests. Shall we deep-copy before mutation, wdyt?- # Add backoff strategy to avoid default endless backoff loop - TEST_READ_CONFIG["__injected_declarative_manifest"]["definitions"]["retriever"]["requester"][ - "error_handler" - ] = {"backoff_strategies": [{"type": "ConstantBackoffStrategy", "backoff_time_in_seconds": 5}]} - - config = TEST_READ_CONFIG + # Add backoff strategy to avoid default endless backoff loop without mutating module-level state + config = copy.deepcopy(TEST_READ_CONFIG) + config["__injected_declarative_manifest"]["definitions"]["retriever"]["requester"]["error_handler"] = { + "backoff_strategies": [{"type": "ConstantBackoffStrategy", "backoff_time_in_seconds": 5}] + } @@ - source = create_source(config=config, limits=limits, catalog=catalog, state=None) + source = create_source(config=config, limits=limits, catalog=catalog, state=None)
1259-1264
: Don’t mutate MANIFEST across tests; deep-copy it before overriding url_base.Mutating
MANIFEST
in-place can cause test interference. Shall we deep-copy it first, wdyt?- test_manifest = MANIFEST - test_manifest["streams"][0]["$parameters"]["url_base"] = url_base + test_manifest = copy.deepcopy(MANIFEST) + test_manifest["streams"][0]["$parameters"]["url_base"] = url_base config = {"__injected_declarative_manifest": test_manifest} @@ - source = create_source(config=config, limits=limits, catalog=catalog, state=None) + source = create_source(config=config, limits=limits, catalog=catalog, state=None)
1277-1279
: Tighten assertion: check records list length, not the dict length.
page_records
is a dict;len(page_records)
counts keys, not records. Can we assert on the records list length instead, wdyt?- page_records = output_data["slices"][0]["pages"][0] - assert len(page_records) == len(MOCK_RESPONSE["result"]) + page_records = output_data["slices"][0]["pages"][0]["records"] + assert len(page_records) == len(MOCK_RESPONSE["result"])
1353-1360
: Avoid mutating MANIFEST in OAuth test as well—copy before editing.Same concern; can we deep-copy
MANIFEST
before injecting the authenticator, wdyt?- test_manifest = MANIFEST + test_manifest = copy.deepcopy(MANIFEST) test_manifest["definitions"]["retriever"]["requester"]["authenticator"] = ( oauth_authenticator_config ) config = {"__injected_declarative_manifest": test_manifest} @@ - source = create_source(config=config, limits=limits, catalog=catalog, state=None) + source = create_source(config=config, limits=limits, catalog=catalog, state=None)
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
125-130
:max_records_limit
constructor arg on StreamSlicerPartitionGenerator is unused—remove to avoid confusion?It’s accepted but never stored or referenced; the limit is enforced inside partitions via the factory. Would you like to drop it here for clarity, wdyt?
class StreamSlicerPartitionGenerator(PartitionGenerator): def __init__( self, partition_factory: DeclarativePartitionFactory, stream_slicer: StreamSlicer, - slice_limit: Optional[int] = None, - max_records_limit: Optional[int] = None, + slice_limit: Optional[int] = None, ) -> None:unit_tests/connector_builder/test_connector_builder_handler.py (1)
56-59
: Duplicate import of TestLimits—prefer a single source of truth to avoid confusion.You import
TestLimits
both from connector_builder_handler (Lines 19-24) and fromconcurrent_declarative_source
(Lines 56-59). To keep things unambiguous, shall we import it from only one place? Given the class lives inconcurrent_declarative_source
, I’d suggest removing it from the earlier import and keeping this one, wdyt?-from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( - ConcurrentDeclarativeSource, - TestLimits, -) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSourceAnd then remove
TestLimits
from theconnector_builder_handler
import:# elsewhere in the file (not in this hunk) from airbyte_cdk.connector_builder.connector_builder_handler import ( create_source, get_limits, resolve_manifest, )Would you like me to sweep the file for any remaining
TestLimits
import ambiguity and update accordingly?airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
124-133
: Component factory wiring for limits and concurrent message repo looks sound—one question on toggles.Using
ConcurrentMessageRepository
and feeding limits into the factory is solid. One thought:disable_retries
/disable_cache
flip to True wheneverlimits
is provided—even when running under non-test conditions that still pass limits. Should we tie those toggles explicitly toemit_connector_builder_messages
instead, or keep as-is? wdyt?
111-119
: Minor nit: comment grammar might need a tweak for clarity.“there are less threads generating partitions that than are max number of workers” reads a bit awkwardly. Maybe “fewer threads generating partitions than the max number of workers” for clarity, wdyt? No code impact.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(8 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(6 hunks)unit_tests/connector_builder/test_connector_builder_handler.py
(15 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(14 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(1 hunks)unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(6 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
- unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
- unit_tests/sources/declarative/retrievers/test_simple_retriever.py
- unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Applied to files:
unit_tests/connector_builder/test_connector_builder_handler.py
🧬 Code Graph Analysis (3)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (6)
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
Retriever
(13-54)airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader
(11-17)airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1)
StreamSlicerTestReadDecorator
(14-28)airbyte_cdk/sources/message/repository.py (1)
MessageRepository
(45-60)airbyte_cdk/sources/streams/concurrent/partitions/partition_generator.py (1)
PartitionGenerator
(11-18)airbyte_cdk/utils/slice_hasher.py (2)
SliceHasher
(15-37)hash
(19-37)
unit_tests/connector_builder/test_connector_builder_handler.py (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
TestLimits
(74-85)ConcurrentDeclarativeSource
(88-574)airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
resolved_manifest
(159-169)airbyte_cdk/connector_builder/connector_builder_handler.py (2)
resolve_manifest
(131-145)create_source
(65-89)airbyte_cdk/connector_builder/main.py (1)
handle_connector_builder_request
(70-88)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/sources/message/concurrent_repository.py (1)
ConcurrentMessageRepository
(11-43)airbyte_cdk/sources/message/repository.py (2)
InMemoryMessageRepository
(74-95)MessageRepository
(45-60)airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
streams
(301-338)message_repository
(269-270)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
ModelToComponentFactory
(643-4169)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
73-86
: Nice: TestLimits encapsulates defaults cleanly.The dataclass + ClassVars are clear and make the defaults explicit. This looks good to me.
331-345
: Passing max_records_limit and slice_limit through to the partition generator/factory is consistent.This aligns with the new limit semantics and keeps enforcement close to where records are produced. LGTM.
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
1504-1517
: Make the exception expectation explicit and prevent false positives; also avoid recomputing statesAs written, if no exception is raised the test silently passes. Using
pytest.raises
makes the expectation explicit and avoids false positives. While here, computing the expected state count from the number of successful slices and reusing the already-collectedlocations_states
tightens the test and removes the magic number. Wdyt?- try: - for message in source.read( - logger=source.logger, config=_CONFIG, catalog=catalog, state=[] - ): - messages.append(message) - except AirbyteTracedException: - locations_states = get_states_for_stream(stream_name="locations", messages=messages) - assert len(locations_states) == 3 - assert ( - get_states_for_stream(stream_name="locations", messages=messages)[ - -1 - ].stream.stream_state.__dict__ - == expected_stream_state - ) + with pytest.raises(AirbyteTracedException): + for message in source.read( + logger=source.logger, config=_CONFIG, catalog=catalog, state=[] + ): + messages.append(message) + + locations_states = get_states_for_stream(stream_name="locations", messages=messages) + expected_states_count = len(location_slices) + 1 # one partial per successful slice + final aggregate + assert len(locations_states) == expected_states_count + assert locations_states[-1].stream.stream_state.__dict__ == expected_stream_state
🧹 Nitpick comments (5)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
1494-1498
: Avoid re-declaringlocation_slices
twice
location_slices
is defined earlier in this test with identical content. Reusing the first definition trims duplication and avoids accidental drift in future edits. Wdyt?- location_slices = [ - {"start": "2024-07-01", "end": "2024-07-31"}, - # missing slice `{"start": "2024-08-01", "end": "2024-08-31"}` here - {"start": "2024-09-01", "end": "2024-09-10"}, - ] + # reuse the 'location_slices' defined aboveairbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (4)
99-106
: Defensive check for Optional cursor and confirm thread-safety, wdyt?
PartitionReader.process_partition
requires a non-OptionalCursor
, butadapters.cursor()
can be Optional. PassingNone
would explode in the worker thread and be hard to trace. Could we fail fast here and also document/verify that the stream cursor is thread-safe for concurrentobserve/close_partition
calls?Suggested diff:
- cursor = self._stream_name_to_instance[stream_name].cursor + cursor = self._stream_name_to_instance[stream_name].cursor + if cursor is None: + raise RuntimeError( + f"Stream {stream_name} has no cursor; concurrent reads require a cursor." + ) if self._slice_logger.should_log_slice_message(self._logger): self._message_repository.emit_message( self._slice_logger.create_slice_log_message(partition.to_slice()) ) self._thread_pool_manager.submit( self._partition_reader.process_partition, partition, cursor )Also, since
PartitionReader.process_partition(...)
now invokescursor.observe(record)
(see streams/concurrent/partition_reader.py), we still invokestream.cursor.observe(record)
inon_record
(Line 157). Isobserve
intended to be called both pre-emit and post-emit, or could this double counting skew cursor semantics? If the latter, would you consider removing the call inon_record
or replacing it with an explicit “ack” method if available, wdyt?For reference (outside the changed hunk), this is the line to consider removing:
# in on_record(...) stream.cursor.observe(record) # consider removing if PartitionReader already observes
119-129
: Optional: Flush queued messages/exceptions before deciding stream completion, wdyt?Today you check “done” and then
consume_queue()
. While_on_stream_is_done
also flushes, processing the queue first here would guarantee that any exception emitted by the worker is accounted for before computing completion state. It’s a small ordering improvement.Suggested diff:
- partitions_running = self._streams_to_running_partitions[partition.stream_name()] - if partition in partitions_running: - partitions_running.remove(partition) - # If all partitions were generated and this was the last one, the stream is done - if ( - partition.stream_name() not in self._streams_currently_generating_partitions - and len(partitions_running) == 0 - ): - yield from self._on_stream_is_done(partition.stream_name()) - yield from self._message_repository.consume_queue() + partitions_running = self._streams_to_running_partitions[partition.stream_name()] + if partition in partitions_running: + partitions_running.remove(partition) + # Flush first so any exceptions/status emitted by the worker are accounted for + yield from self._message_repository.consume_queue() + # If all partitions were generated and this was the last one, the stream is done + if ( + partition.stream_name() not in self._streams_currently_generating_partitions + and len(partitions_running) == 0 + ): + yield from self._on_stream_is_done(partition.stream_name())
158-161
: Make test-only logging opt-in and less brittle (env flag + debug level), wdyt?Gating on a specific test filename in
PYTEST_CURRENT_TEST
is brittle. Would you switch to an explicit opt-in env flag and useDEBUG
level to avoid noisy logs, while avoidingmessage.__dict__
(which might contain unserializable fields)?Suggested diff:
- test_env = os.getenv("PYTEST_CURRENT_TEST") - if test_env and "test_concurrent_declarative_source.py" in test_env: - self._logger.info(f"Processing and emitting: {message.__dict__}") + # Developer-friendly opt-in trace for concurrent reads + if os.getenv("AB_DEBUG_CONCURRENT_SOURCE") == "1" and self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug( + "Processing and emitting message", + extra={"stream": record.stream_name, "message_type": str(message.type)}, + )This keeps prod logs quiet by default, is explicit, and avoids relying on pytest internals. If you prefer, we can wire this to your existing PartitionLogger infra instead, wdyt?
5-5
: Nit: Keep os import local to the test-only path, wdyt?Since
os
is only used for a test/debug guard, would you consider moving the import intoon_record
right before the guard to keep top-level imports lean? It’s minor, but helps readability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(4 hunks)airbyte_cdk/sources/message/concurrent_repository.py
(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/message/concurrent_repository.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (4)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor
(92-93)airbyte_cdk/sources/streams/concurrent/adapters.py (2)
cursor
(196-197)stream_name
(311-312)airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
submit
(45-46)airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
process_partition
(63-86)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
119-129
: LGTM: Simplified completion handling aligns with worker-owned responsibilities.Centralizing partition closure and exception handling in
PartitionReader
and making this method focus on lifecycle bookkeeping improves readability and reduces duplication. Nice cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
99-106
: Guard against a missing Cursor and ensure the right object type is passed to PartitionReaderPartitionReader.process_partition expects a Cursor and calls observe/close_partition on it. Since adapters.cursor is typed Optional[Cursor], passing None would crash at runtime. Also, now that PartitionReader observes records, the stream cursor is observed in two places (here in the worker thread and again in on_record), which may double-update state unless observe is idempotent.
Would you add a light guard and pass a guaranteed Cursor, and confirm observe() idempotency to avoid double updates, wdyt?
- cursor = self._stream_name_to_instance[stream_name].cursor + cursor = self._stream_name_to_instance[stream_name].cursor + if cursor is None: + # Defensive: concurrent path requires a cursor to track partition/record progress + raise AirbyteTracedException( + message=f"Stream {stream_name} has no cursor configured for concurrent processing.", + internal_message="Missing cursor while submitting partition to PartitionReader", + failure_type=FailureType.config_error, + ) @@ - self._thread_pool_manager.submit( - self._partition_reader.process_partition, partition, cursor - ) + self._thread_pool_manager.submit( + self._partition_reader.process_partition, + partition, + cursor, + )Additionally, if observe-in-worker is intended to represent “read” and observe-in-main (on_record) represents “emitted,” should we make that explicit (e.g., separate methods) or keep one and rely on idempotence? Happy to draft either option, wdyt?
100-103
: Avoid duplicate slice logging (now logged both in on_partition and in PartitionReader via PartitionLogger)With PartitionLogger wired into PartitionReader, slices get logged there too. This block will produce duplicate slice logs.
Shall we remove this on_partition slice log and rely on PartitionReader.PartitionLogger for a single consistent slice log, wdyt?
- if self._slice_logger.should_log_slice_message(self._logger): - self._message_repository.emit_message( - self._slice_logger.create_slice_log_message(partition.to_slice()) - )airbyte_cdk/sources/concurrent_source/concurrent_source.py (3)
98-103
: Consider centralizing the queue maxsize constantYou hardcode 10_000 here while ThreadPoolManager also uses a 10_000 default. Unifying this (single constant or config) would avoid drift.
Would you like me to extract a DEFAULT_QUEUE_MAXSIZE constant or reuse ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE for clarity, wdyt?
141-159
: Leverage timeout_seconds (currently unused) to avoid indefinite blocking on queue.get()The docstring explains timeout_seconds, but _consume_from_queue blocks indefinitely. Using a timed get with Empty handling keeps behavior predictable and exercises the timeout.
Would this refactor work for you, wdyt?
-from queue import Queue +from queue import Queue, Empty @@ - def _consume_from_queue( + def _consume_from_queue( self, queue: Queue[QueueItem], concurrent_stream_processor: ConcurrentReadProcessor, ) -> Iterable[AirbyteMessage]: - while airbyte_message_or_record_or_exception := queue.get(): - test_env = os.getenv("PYTEST_CURRENT_TEST") - if test_env and "test_concurrent_declarative_source.py" in test_env: - self._logger.info(f"Processing and emitting {type(airbyte_message_or_record_or_exception)}: {airbyte_message_or_record_or_exception.__dict__}") - yield from self._handle_item( - airbyte_message_or_record_or_exception, - concurrent_stream_processor, - ) - # In the event that a partition raises an exception, anything remaining in - # the queue will be missed because is_done() can raise an exception and exit - # out of this loop before remaining items are consumed - if queue.empty() and concurrent_stream_processor.is_done(): - # all partitions were generated and processed. we're done here - break + while True: + try: + airbyte_message_or_record_or_exception = queue.get(timeout=self._timeout_seconds) + except Empty: + # Periodically check if we're done to avoid hanging when producers are finished + if concurrent_stream_processor.is_done(): + break + continue + + test_env = os.getenv("PYTEST_CURRENT_TEST") + if test_env and "test_concurrent_declarative_source.py" in test_env: + self._logger.debug( + "Processing and emitting %s", + type(airbyte_message_or_record_or_exception).__name__, + ) + + yield from self._handle_item( + airbyte_message_or_record_or_exception, concurrent_stream_processor + ) + + # Avoid missing items if an exception triggers is_done() to raise: check emptiness first + if queue.empty() and concurrent_stream_processor.is_done(): + break
147-149
: Lower verbosity and cost of the pytest-only debug logLogging the full dict at INFO can be noisy/heavy. Using DEBUG and logging the item type keeps this useful without spamming logs.
Would you be open to downgrading to DEBUG and logging only the item type as in the diff above, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(3 hunks)airbyte_cdk/sources/concurrent_source/concurrent_source.py
(7 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (3)
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
cursor
(196-197)stream_name
(311-312)airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
submit
(45-46)airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
process_partition
(63-86)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (5)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
PartitionLogger
(22-42)PartitionReader
(45-86)airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (2)
ThreadPoolManager
(10-115)is_done
(81-82)airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
ConcurrentReadProcessor
(31-249)is_done
(204-229)airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)
PartitionEnqueuer
(16-64)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/concurrent_source/concurrent_source.py
[error] 146-154: poetry run ruff format --diff . failed: 1 file would be reformatted (airbyte_cdk/sources/concurrent_source/concurrent_source.py).
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (4)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
119-129
: Simplified completion flow reads wellRemoving per-partition exception/cursor handling here (now done inside PartitionReader) and centralizing stream-done conditions is cleaner and reduces duplicated responsibilities. Nice.
airbyte_cdk/sources/concurrent_source/concurrent_source.py (3)
41-71
: API extension looks sound (shared queue is optional and non-breaking)Adding an optional shared queue to create() and plumbing it through keeps existing call sites working while enabling testability and shared-queue scenarios. Looks good.
177-179
: Pass-through handling for AirbyteMessage is a good additionExplicitly yielding AirbyteMessage items ensures they propagate correctly through the shared queue path. Looks good.
146-154
: Fix the ruff formatting failure (CI)CI reports this file would be reformatted by ruff. Could you run the formatter to unblock the pipeline, wdyt?
You can run:
- poetry run ruff format airbyte_cdk/sources/concurrent_source/concurrent_source.py
- or simply poetry run ruff format .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
7-7
: Import for pytest gating is fine; consider isolating test instrumentation.Since
os
is only used to peek at PYTEST_CURRENT_TEST, would you prefer to centralize this pytest-specific check behind a small helper (e.g., _should_log_concurrent_debug()) or an explicit feature flag like AIRBYTE_DEBUG_CONCURRENT_CURSOR to avoid hard-coding a test filename and keep production code agnostic of pytest internals, wdyt?
241-245
: DRY the pytest check and switch to parameterized logging (avoid eager f-strings).This keeps behavior identical while:
- Avoiding duplicate substring checks
- Using parameterized logging to skip formatting when the log level is disabled
- Optionally making the level DEBUG to reduce test log noise (left at INFO below for parity)
Apply this focused refactor?
- test_env = os.getenv("PYTEST_CURRENT_TEST") - if test_env and "test_concurrent_declarative_source.py" in test_env: - LOGGER.info(f"Closing partition {partition.to_slice()}") - LOGGER.info(f"\tstate before is {self._concurrent_state}") + pytest_current_test = os.getenv("PYTEST_CURRENT_TEST", "") + should_log_for_test = "test_concurrent_declarative_source.py" in pytest_current_test + if should_log_for_test and LOGGER.isEnabledFor(logging.INFO): + LOGGER.info("Closing partition %s", partition.to_slice()) + LOGGER.info("\tstate before is %s", self._concurrent_state) @@ - if test_env and "test_concurrent_declarative_source.py" in test_env: - LOGGER.info(f"\tstate after merged partition is {self._concurrent_state}") + if should_log_for_test and LOGGER.isEnabledFor(logging.INFO): + LOGGER.info("\tstate after merged partition is %s", self._concurrent_state)As an optional follow-up, would you want these logs at DEBUG and to include stream identifiers for easier correlation (e.g., "[%s.%s] Closing partition …", self._stream_namespace, self._stream_name), wdyt?
Also applies to: 251-253
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
airbyte_cdk/sources/streams/concurrent/cursor.py
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
to_slice
(25-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
This reverts commit 1c3975a.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
267-276
: Reuse the computeddebug_logs_enabled
and (optionally) emit stable JSON for determinism.To avoid repeating the test-name check and to keep consistency with the above block, would you update this to reuse
debug_logs_enabled
? Optionally, emit the state as sorted JSON for stable test assertions, wdyt?- if test_env and "test_concurrent_declarative_source.py" in test_env: + if debug_logs_enabled: self._message_repository.emit_message( AirbyteMessage( type=Type.LOG, log=AirbyteLogMessage( level=Level.INFO, - message=f"\tstate after merged partition is {self._concurrent_state}", + message=f"\tstate after merged partition is {self._concurrent_state}", ), ) )If adopting deterministic JSON formatting:
- message=f"\tstate after merged partition is {self._concurrent_state}", + message=f"\tstate after merged partition is {json.dumps(self._concurrent_state, default=str, sort_keys=True)}",
242-260
: Decouple cursor debug logs from specific test filenames and introduce an explicit debug flagThe grep search didn’t find any tests referencing these log messages (only matches in
cursor.py
itself), so we can safely remove the hard-coded"test_concurrent_declarative_source.py"
check without breaking existing suites. I’d suggest:• Add an
AIRBYTE_DEBUG_CONCURRENT_CURSOR
env var to opt-in to these logs.
• Compute aself._cursor_debug_enabled
boolean once (e.g. in__init__
or via a lazy‐loaded attribute).
• Centralize the emit logic in a helper_emit_cursor_debug_log
.
• Optionally usejson.dumps(..., sort_keys=True, default=str)
for stable, deterministic state output in tests.Example diff:
airbyte_cdk/sources/streams/concurrent/cursor.py +import os +import json class ConcurrentCursor: + def __init__(self, ...): + ... + test_env = os.getenv("PYTEST_CURRENT_TEST", "") + self._cursor_debug_enabled = ( + os.getenv("AIRBYTE_DEBUG_CONCURRENT_CURSOR", "").lower() in {"1", "true", "yes"} + or "test_concurrent_declarative_source.py" in test_env + ) + def _emit_cursor_debug_log(self, msg: str) -> None: + if self._cursor_debug_enabled: + self._message_repository.emit_message( + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=msg), + ) + ) def some_method(self, partition, ...): - test_env = os.getenv("PYTEST_CURRENT_TEST") - if test_env and "test_concurrent_declarative_source.py" in test_env: - self._message_repository.emit_message( - AirbyteMessage(..., message=f"Closing partition {partition.to_slice()}") - ) - self._message_repository.emit_message( - AirbyteMessage(..., message=f"\tstate before is {self._concurrent_state}") - ) + self._emit_cursor_debug_log(f"Closing partition {partition.to_slice()}") + self._emit_cursor_debug_log( + f"\tstate before is {json.dumps(self._concurrent_state, sort_keys=True, default=str)}" + ) # … - self._message_repository.emit_message( - AirbyteMessage(..., message=f"\tstate after merged partition is {self._concurrent_state}") - ) + self._emit_cursor_debug_log( + f"\tstate after merged partition is {json.dumps(self._concurrent_state, sort_keys=True, default=str)}" + )Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
airbyte_cdk/sources/streams/concurrent/cursor.py
(3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/sources/message/repository.py (4)
emit_message
(47-48)emit_message
(64-65)emit_message
(79-80)emit_message
(109-110)airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
to_slice
(25-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-intercom
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
7-7
: LGTM on addingos
import.Used for test-gated logging; no concerns on import scope.
21-21
: LGTM on importing Airbyte log/message types.These match the Airbyte protocol schema and are appropriate for emitting log messages via the MessageRepository.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)
88-90
: Docstring promises a timeout but code no longer uses it
timeout_seconds
is documented as a hard stop if no record is read in time, but_consume_from_queue
uses a blockingqueue.get()
with no timeout. This can hang indefinitely on empty queues. Can we restore timeout semantics to match the contract, wdyt?Suggested targeted change below (see the
_consume_from_queue
hunk).
141-156
: Reintroduce bounded wait using queue.get(timeout=…) to avoid indefinite hangsCurrently we block forever on
queue.get()
. Let’s use the configured timeout and exit cleanly with logs if nothing arrives, while preserving your “only call is_done() when the queue is empty” safeguard to avoid missing items. Wdyt?Apply this diff:
+from queue import Empty @@ - def _consume_from_queue( + def _consume_from_queue( self, queue: Queue[QueueItem], concurrent_stream_processor: ConcurrentReadProcessor, ) -> Iterable[AirbyteMessage]: - while airbyte_message_or_record_or_exception := queue.get(): - yield from self._handle_item( - airbyte_message_or_record_or_exception, - concurrent_stream_processor, - ) - # In the event that a partition raises an exception, anything remaining in - # the queue will be missed because is_done() can raise an exception and exit - # out of this loop before remaining items are consumed - if queue.empty() and concurrent_stream_processor.is_done(): - # all partitions were generated and processed. we're done here - break + while True: + try: + airbyte_message_or_record_or_exception = queue.get(timeout=self._timeout_seconds) + except Empty: + self._logger.warning( + f"No items received from queue within {self._timeout_seconds}s; stopping read." + ) + break + + yield from self._handle_item( + airbyte_message_or_record_or_exception, + concurrent_stream_processor, + ) + # In the event that a partition raises an exception, anything remaining in + # the queue will be missed because is_done() can raise an exception and exit + # out of this loop before remaining items are consumed + if queue.empty() and concurrent_stream_processor.is_done(): + # all partitions were generated and processed. we're done here + breakNote:
queue.empty()
isn’t perfect under concurrency, but combining it withis_done()
keeps behavior aligned with your intent to avoid missing items before the aggregated exception check.
♻️ Duplicate comments (1)
unit_tests/connector_builder/test_connector_builder_handler.py (1)
1259-1264
: Avoid mutating MANIFEST: fixed; tighten record-count assertion (repeat suggestion)Nice job deep-copying MANIFEST before edits—this addresses the previous leak. One remaining nit: page_records is the entire page dict; could we assert against the records key to make the check precise, wdyt?
- page_records = output_data["slices"][0]["pages"][0] - assert len(page_records) == len(MOCK_RESPONSE["result"]) + page_records = output_data["slices"][0]["pages"][0]["records"] + assert len(page_records) == len(MOCK_RESPONSE["result"])Also applies to: 1277-1279
🧹 Nitpick comments (13)
unit_tests/connector_builder/test_connector_builder_handler.py (7)
541-546
: Prefer create_source for consistency and to centralize defaults (concurrency=1, migration/normalization)Would you switch this test to use create_source so it benefits from the enforced concurrency_level=1 and any future wiring baked into the factory, wdyt?
Proposed minimal change:
- source = ConcurrentDeclarativeSource( - catalog=None, config=config, state=None, source_config=MANIFEST - ) - limits = TestLimits() + limits = TestLimits() + source = create_source(config=config, limits=limits, catalog=None, state=None)
704-706
: Optional: use create_source here too for parity with other testsYou instantiate ConcurrentDeclarativeSource directly here while other tests use create_source. Unifying on the factory would reduce future breakage surface if the constructor or defaults change, wdyt?
895-905
: Great fix: deep-copying config + using create_source eliminates test flake; make the call-count assertion resilientNice job copying TEST_READ_CONFIG before mutation and routing through create_source—this resolves the prior global-state flakiness concern. For the final assertion, would you key it off limits.max_slices instead of the hard-coded 5 so it won’t drift if defaults change, wdyt?
- assert mock_send.call_count == 5 + assert mock_send.call_count == limits.max_slicesAlso applies to: 911-911, 916-919
1032-1037
: create_source behavior checks are appropriate (but be aware of private attributes)Asserting limit flags through _constructor is effective here; just noting it’s private API and may be brittle across refactors. Keep as-is if that tradeoff is acceptable, wdyt?
1293-1300
: Brittleness risk: asserting on specific exception class names in logsThe “StreamThreadException” string may change with internal refactors. Would you consider matching on a stable substring/pattern (e.g., contains “thread” or validating error category/HTTP scheme validation) instead of the exact class name, wdyt?
1416-1419
: Optional: build source via create_source for full_resolve_manifest tooFor consistency and to reuse centralized wiring, would you switch to create_source here as well, wdyt?
- source = ConcurrentDeclarativeSource( - catalog=None, config=config, state=None, source_config=DYNAMIC_STREAM_MANIFEST - ) - limits = TestLimits(max_streams=2) + limits = TestLimits(max_streams=2) + source = create_source(config=config, limits=limits, catalog=None, state=None)
1489-1608
: Nit: reduce duplication by using _stream_url_base in expected_resolved_manifestThe expected_resolved_manifest repeats "https://api.sendgrid.com" many times; would you reference _stream_url_base (as you do earlier) to avoid drift if the base changes, wdyt? This keeps the expectation concise and consistent.
airbyte_cdk/sources/message/concurrent_repository.py (3)
2-11
: Prune unused imports to keep the module lean
os
andType as MessageType
are imported but never used. Can we drop them to reduce noise, wdyt?Apply this diff:
import logging -import os from queue import Queue from typing import Callable, Iterable from airbyte_cdk.models import AirbyteMessage, Level -from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message.repository import LogMessage, MessageRepository from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
12-13
: Remove unused module-level logger
logger = logging.getLogger("airbyte")
isn’t used in this module. Shall we remove it to avoid confusion, wdyt?-logger = logging.getLogger("airbyte") -
28-41
: Guard draining with a lock to avoid concurrent drains on the decorated repository
emit_message
andlog_message
both callconsume_queue()
on the decorated repository. If multiple worker threads use this wrapper simultaneously, they may interleave drains and cause subtle reordering or missed items if the underlying repository isn’t thread-safe. Would you add a lightweight lock around the “emit/log + drain” critical section, wdyt?Minimal targeted change:
+import threading @@ class ConcurrentMessageRepository(MessageRepository): @@ def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository): self._queue = queue self._decorated_message_repository = message_repository + self._drain_lock = threading.Lock() @@ def emit_message(self, message: AirbyteMessage) -> None: - self._decorated_message_repository.emit_message(message) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + with self._drain_lock: + self._decorated_message_repository.emit_message(message) + for drained in self._decorated_message_repository.consume_queue(): + self._queue.put(drained) @@ def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: - self._decorated_message_repository.log_message(level, message_provider) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + with self._drain_lock: + self._decorated_message_repository.log_message(level, message_provider) + for drained in self._decorated_message_repository.consume_queue(): + self._queue.put(drained)Small nit while we’re here: I renamed the loop variable from
message
todrained
to avoid shadowing the method parameter. wdyt?airbyte_cdk/sources/concurrent_source/concurrent_source.py (3)
35-36
: Fix typo in class docstring“The read is done when all partitions for all streams w ere generated and read.” → “were”. Shall we tidy this up, wdyt?
- The read is done when all partitions for all streams w ere generated and read. + The read is done when all partitions for all streams were generated and read.
98-103
: Queue sizing looks reasonable; consider making it configurable laterDefaulting to
maxsize=10_000
is sane. If we see sources with extreme fan-out, would you want to plumb this as an optional parameter oncreate()
for testability? Not blocking.
111-120
: Consolidate slice‐logging in one placeI noticed we’re logging each slice twice:
- In
ConcurrentReadProcessor.on_partition
(airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py:100–104), we emit a slice log before handing the partition off.- And again via the
PartitionLogger
passed intoPartitionReader
(airbyte_cdk/sources/concurrent_source/concurrent_source.py:116–120), which also emits a slice log.This will produce duplicate
"slice: …"
messages for every partition. Could we simplify by removing thePartitionLogger
from thePartitionReader
instantiation (and its internal log call), and rely solely on the centralized log inon_partition
for determinism on the main thread? wdyt?• Remove or replace
PartitionLogger(self._slice_logger, self._logger, self._message_repository)
in
airbyte_cdk/sources/concurrent_source/concurrent_source.py
(lines ~116–120)
• Confirm the sole slice‐log emission remains in
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(lines ~100–104)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
airbyte_cdk/sources/concurrent_source/concurrent_source.py
(7 hunks)airbyte_cdk/sources/message/concurrent_repository.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(4 hunks)airbyte_cdk/sources/streams/http/http_client.py
(1 hunks)unit_tests/connector_builder/test_connector_builder_handler.py
(20 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/streams/http/http_client.py
- airbyte_cdk/sources/streams/concurrent/cursor.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Applied to files:
unit_tests/connector_builder/test_connector_builder_handler.py
🧬 Code Graph Analysis (3)
unit_tests/connector_builder/test_connector_builder_handler.py (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
TestLimits
(74-85)ConcurrentDeclarativeSource
(88-574)airbyte_cdk/connector_builder/connector_builder_handler.py (2)
resolve_manifest
(131-145)create_source
(65-89)airbyte_cdk/connector_builder/main.py (1)
handle_connector_builder_request
(70-88)
airbyte_cdk/sources/message/concurrent_repository.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/connector_builder/models.py (1)
LogMessage
(25-29)airbyte_cdk/sources/message/repository.py (1)
MessageRepository
(45-60)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (5)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
PartitionLogger
(22-42)PartitionReader
(45-86)airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (2)
ThreadPoolManager
(10-115)is_done
(81-82)airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
ConcurrentReadProcessor
(31-249)is_done
(204-229)airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)
PartitionEnqueuer
(16-64)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: preview_docs
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (9)
unit_tests/connector_builder/test_connector_builder_handler.py (7)
55-58
: Import shift to ConcurrentDeclarativeSource and TestLimits looks correctThe new import path and pulling in TestLimits align with the concurrent builder re-revert. No action needed.
692-699
: Error path coverage via MockConcurrentDeclarativeSource is spot onThis neatly validates resolve_manifest error handling without touching internals.
969-971
: Fixture spec updated to ConcurrentDeclarativeSourceRenaming aside, the spec switch to ConcurrentDeclarativeSource is correct and keeps mocks aligned with the new API.
995-1005
: get_limits assertions correctly track new TestLimits defaultsReferencing TestLimits.DEFAULT_* keeps the tests resilient to future changes.
1126-1127
: LGTM: end-to-end “read_source” stitched through create_sourceThis uses the factory and validates pagination/slicing behavior end-to-end.
1175-1176
: LGTM: single-page/slice path also routed through factoryGood parity with the multi-page test.
1353-1360
: OAuth path also deep-copies MANIFEST and uses create_sourceGood isolation and parity with the non-OAuth external-requests test.
airbyte_cdk/sources/message/concurrent_repository.py (1)
15-26
: Nice wrapper intent and doc—ordering rationale is clearThe class purpose and the “load onto the main-thread queue immediately” behavior are well documented and align with the concurrent queue flow. LGTM on the high-level design.
airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)
174-175
: AirbyteMessage passthrough on the shared queue looks goodHandling
AirbyteMessage
directly keeps the shared-queue flow cohesive, especially withConcurrentMessageRepository
now enqueuing messages. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Thanks for debugging this very sneaky issue. Let's ship it!
Oh! One other comment that popped my mind: Do we need to do something about the in memory stuff for sqlite being set on some tests but applying on all of the tests that run after? |
maybe? but we haven't seen any issues w/ tests bonking into each other so maybe not a critical must have at the moment. I think this was just one of our original hypotheses when we were just trying to conceive what was happening |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
178-183
: Per-stream lock is the right move; consider clarifying scope and future lock orderingThis implements the “lock per stream” idea discussed earlier and should prevent lost updates to
_concurrent_state
. Two small nits:
- Would renaming
_lock
to_state_lock
make its responsibility clearer?- If we later introduce per-partition locks (as explored in the thread), should we document a lock acquisition order (e.g., always acquire
_state_lock
before any per-partition lock) to avoid deadlocks, wdyt?
232-243
: Good rationale for not lockingobserve
; please add a breadcrumb to track the “per-partition locks” follow-upThe comment addresses the previous concern about thread-safety and current invocation contexts. Would you add a TODO with the ticket/issue ID for the proposed per-partition locking, so future changes won’t miss this assumption, wdyt?
Example:
- TODO: Investigate per-partition locks for
observe()
once/if it’s invoked off the main thread (link to ticket).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
airbyte_cdk/sources/streams/concurrent/cursor.py
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
_emit_state_message
(230-248)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
7-7
: Importthreading
addition looks goodBrings in
Lock
for concurrency control; no concerns here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
255-264
: Move and gate_has_closed_at_least_one_slice
update inside the lock to prevent a race (boundaryless mode).Today two concurrent
close_partition()
calls can both pass the boundaryless check inside_add_slice_to_state()
because_has_closed_at_least_one_slice
is set after the lock is released. It’s not a write–write conflict; it’s a read-after-unlock ordering issue: the second thread can acquire the lock before the first thread sets the flag, so both append a slice. Let’s set the flag under the same lock and only when a slice was actually appended (to avoid falsely tripping the invariant when no slice was added), wdyt?Minimal, targeted fix:
def close_partition(self, partition: Partition) -> None: - with self._lock: - slice_count_before = len(self._concurrent_state.get("slices", [])) - self._add_slice_to_state(partition) - if slice_count_before < len( - self._concurrent_state["slices"] - ): # only emit if at least one slice has been processed - self._merge_partitions() - self._emit_state_message() - self._has_closed_at_least_one_slice = True + with self._lock: + slice_count_before = len(self._concurrent_state.get("slices", [])) + self._add_slice_to_state(partition) + appended = slice_count_before < len(self._concurrent_state.get("slices", [])) + if appended: # only emit if at least one slice has been processed + self._merge_partitions() + self._emit_state_message() + # Ensure subsequent closes observe the updated flag while holding the same lock. + self._has_closed_at_least_one_slice = TrueOptional follow-up: to shrink the critical section, we could build the state message under the lock but emit it afterward (requires a tiny helper). Happy to draft if useful, wdyt?
🧹 Nitpick comments (6)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
178-183
: Clarify the lock rationale to call out the boundaryless single-slice invariant.The comment is good, but could we explicitly mention that the lock also protects the boundaryless-mode invariant enforced via
_has_closed_at_least_one_slice
? Calling this out will help future readers understand why reads/writes of bothself._concurrent_state["slices"]
and_has_closed_at_least_one_slice
must be serialized, wdyt?Proposed comment tweak:
- # A lock is required when closing a partition because updating the cursor's concurrent_state is - # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is - # possible for one partition to update concurrent_state after a second partition has already read - # the previous state. This can lead to the second partition overwriting the previous one's state. + # A lock is required when closing a partition because updating the cursor's concurrent_state is + # not thread safe. When multiple partitions close concurrently, one partition can update state after + # another has already read the previous state, causing lost updates. + # The lock also protects the boundaryless single-slice invariant (gated by `_has_closed_at_least_one_slice`): + # both the check and any update that would allow subsequent closes must happen under the same lock.
255-263
: Optional: avoid I/O under the lock by building the message inside and emitting outside.If you’d like, we can avoid holding the mutex during
emit_message()
by returning a pre-built message. Example (outside the selected range, shown as plain Python for clarity):# Inside ConcurrentCursor (new helper) def _build_state_message(self): self._connector_state_manager.update_state_for_stream( self._stream_name, self._stream_namespace, self.state ) return self._connector_state_manager.create_state_message( self._stream_name, self._stream_namespace ) # In close_partition() msg = None with self._lock: slice_count_before = len(self._concurrent_state.get("slices", [])) self._add_slice_to_state(partition) appended = slice_count_before < len(self._concurrent_state.get("slices", [])) if appended: self._merge_partitions() msg = self._build_state_message() self._has_closed_at_least_one_slice = True if msg is not None: self._message_repository.emit_message(msg)This keeps state construction consistent while minimizing time under lock. Want me to push this as a follow-up commit and add a concurrency unit test, wdyt?
232-240
: Verifiedobserve()
call sites—only invoked byPartitionReader.process_partition
(airbyte_cdk/sources/streams/concurrent/partition_reader.py:81)Great work documenting the thread-safety assumption in
cursor.py
(lines 232–240). To future-proof this, would you be open to:• Adding a brief note above the doc block in
airbyte_cdk/sources/streams/concurrent/cursor.py
stating that onlyPartitionReader.process_partition
should callobserve()
, and that any new call sites will require per-partition locks?
• Optionally, including a lightweight CI check (for example, the providedrg -nPU -C3 '\bobserve\s*\(' --type=py
script) to flag any newobserve()
usages outside ofPartitionReader
?wdyt?
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (3)
119-129
: Avoid double-draining the message repository when the last partition completes
_on_stream_is_done()
already callsconsume_queue()
. Immediately callingconsume_queue()
again here is redundant (at best) and may be confusing when tracing message flow. Shall we only drain in the non-terminal path, wdyt?Proposed tweak:
partitions_running = self._streams_to_running_partitions[partition.stream_name()] if partition in partitions_running: partitions_running.remove(partition) # If all partitions were generated and this was the last one, the stream is done if ( partition.stream_name() not in self._streams_currently_generating_partitions and len(partitions_running) == 0 ): yield from self._on_stream_is_done(partition.stream_name()) - yield from self._message_repository.consume_queue() + return + yield from self._message_repository.consume_queue()
132-139
: Docstring drift: observation now happens in PartitionReader, not hereStep 4 in this docstring says this method ensures the cursor is updated after successful emit, but the actual observation now occurs inside
PartitionReader.process_partition(...)
. Could we update the docstring to reflect the current behavior, wdyt?Example change:
""" This method is called when a record is read from a partition. 1. Convert the record to an AirbyteMessage 2. If this is the first record for the stream, mark the stream as RUNNING 3. Increment the record counter for the stream - 4. Ensures the cursor knows the record has been successfully emitted - 5. Emit the message - 6. Emit messages that were added to the message repository + 4. Emit the message + 5. Emit messages that were added to the message repository + + Note: Cursor observation is handled within PartitionReader.process_partition(). """
97-106
: Ensure non-None cursors and acknowledge built-in locking in concurrent cursors, wdyt?I dug through the code and confirmed:
• The
cursor
property on concurrent streams is declared asOptional[Cursor]
inairbyte_cdk/sources/streams/concurrent/adapters.py
(lines 196-199). In practice, every adapter’s__init__
sets_cursor
to a concreteCursor
instance—so you won’t actually observeNone
at runtime unless someone manually disables the cursor before using the concurrent source .
• File-based streams likewise declarecursor -> Optional[AbstractFileBasedCursor]
indefault_file_based_stream.py
(lines 80-83), but their constructors always assign a default cursor, so again it shouldn’t beNone
at runtime .
• All concurrent‐cursor implementations use locks around their state-mutation methods. For example,ConcurrentCursor
inairbyte_cdk/sources/streams/concurrent/cursor.py
createsself._lock = threading.Lock()
(around line 182) and uses it inclose_partition
to protect shared state, even ifobserve
remains unlocked by design with an explicit comment justifying why overlapping updates there are harmless .Given these findings:
- The guard against
cursor is None
is unlikely to ever trigger in normal use—but it would catch misconfigurations early and give a clearer error than an AttributeError.- Thread-safety is already addressed via per-cursor locks;
observe
is intentionally lock-free with a comment explaining the trade-off. If we wanted stronger guarantees, we could wrapobserve
inwith self._lock
, but that’d add runtime cost.Suggestions, wdyt?
• Add the
if cursor is None: raise RuntimeError(...)
guard as you proposed.
• Optionally wrapcursor.observe(...)
in a lock if you’d rather be absolutely safe.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (6)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor
(92-93)airbyte_cdk/sources/streams/concurrent/adapters.py (2)
cursor
(196-197)stream_name
(311-312)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
cursor
(87-90)airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
cursor
(80-81)cursor
(84-89)airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
submit
(45-46)airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
process_partition
(63-86)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
_emit_state_message
(230-248)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
7-7
: Import of threading looks appropriate.Bringing in
threading
is warranted by the new per-cursor lock usage. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (7)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
3376-3379
: Replace magic number with the new constant for consistency, wdyt?This makes the fallback source-of-truth explicit and avoids silent divergence if the default ever changes.
- StreamSlicerTestReadDecorator( - wrapped_slicer=stream_slicer, - maximum_number_of_slices=self._limit_slices_fetched or 5, - ), + StreamSlicerTestReadDecorator( + wrapped_slicer=stream_slicer, + maximum_number_of_slices=self._limit_slices_fetched or MAX_SLICES, + ),
3644-3645
: Ditto here: use the constant instead of 5, wdyt?- maximum_number_of_slices=self._limit_slices_fetched or 5, + maximum_number_of_slices=self._limit_slices_fetched or MAX_SLICES,unit_tests/sources/streams/test_stream_read.py (5)
593-596
: Clearer intent for intermediate vs final state would help future readersDefining two expected state messages (after partition "1" and after partitions "1" and "2") makes the test intent explicit. Would you consider adding a short inline comment or slightly more descriptive variable names (e.g., expected_state_after_p1 / expected_state_after_p2) to make the distinction obvious to future readers, wdyt?
624-631
: Triggering state via PartitionReader is correct; consider asserting queue interactions to strengthen the testManually driving PartitionReader to hit Cursor.close_partition() is a solid way to validate the concurrent path. Would you like to also assert the queue interactions (number of puts and presence of sentinels) to make the test more robust and guard against regressions in PartitionReader, wdyt?
Suggested change in this block to keep a reference to the queue mock:
- partition_reader = PartitionReader(queue=Mock(spec=Queue)) + queue_mock = Mock(spec=Queue) + partition_reader = PartitionReader(queue=queue_mock) assert isinstance(stream, StreamFacade) abstract_stream = stream._abstract_stream for partition in abstract_stream.generate_partitions(): partition_reader.process_partition(partition=partition, cursor=cursor)Then, outside this hunk (right after Line 633), you could add assertions like:
# Optional strengthening assertions # Expect 4 record puts + 2 partition-complete sentinels assert queue_mock.put.call_count == 6 # Sanity check: last two puts are partition completion sentinels last_two = [call.args[0].__class__.__name__ for call in queue_mock.put.call_args_list[-2:]] assert all(name == "PartitionCompleteSentinel" for name in last_two)
611-619
: Unused ConcurrentReadProcessor instantiationIt looks like handler is created but never used in this test. Shall we drop it to reduce noise, wdyt?
- handler = ConcurrentReadProcessor( - [stream], - Mock(spec=PartitionEnqueuer), - Mock(spec=ThreadPoolManager), - logger, - slice_logger, - message_repository, - Mock(spec=PartitionReader), - )
85-96
: Test isolation: class-level _state in _MockIncrementalStream can leak between testsAlthough not part of this change, _MockIncrementalStream keeps state in a class attribute (_state), so state can persist across tests if not reset. Would you consider making it instance-scoped or resetting it in a fixture to avoid subtle, order-dependent flakes, wdyt?
Option A — make it instance-scoped:
class _MockIncrementalStream(_MockStream, CheckpointMixin): - _state = {} + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._state = {}Option B — add a module-level autouse fixture to reset it:
import pytest @pytest.fixture(autouse=True) def _reset_mock_incremental_stream_state(): _MockIncrementalStream._state = {} yield _MockIncrementalStream._state = {}If you’d like, I can add the autouse fixture and run a quick grep to ensure no tests rely on the current cross-test persistence.
636-644
: Prefer model serialization over__dict__
for state comparisonsComparing internal
__dict__
snapshots can be fragile as our state blobs evolve. Instead, we can serialize each state model (using.dict()
on V1,.model_dump()
on V2, or falling back to__dict__
) to decouple from implementation details—wdyt?• File:
unit_tests/sources/streams/test_stream_read.py
, around lines 636–644
• Replace the two__dict__
comparisons with a small helper that calls the appropriate serializer, for example:- assert len(actual_state) == 2 - assert ( - actual_state[0].state.stream.stream_state.__dict__ - == expected_state_1.state.stream.stream_state.__dict__ - ) - assert ( - actual_state[1].state.stream.stream_state.__dict__ - == expected_state_2.state.stream.stream_state.__dict__ - ) + assert len(actual_state) == 2 + # serialize state depending on Pydantic/dataclass API + to_dict = lambda blob: ( + blob.dict() if hasattr(blob, "dict") + else blob.model_dump() if hasattr(blob, "model_dump") + else blob.__dict__ + ) + states = [to_dict(m.state.stream.stream_state) for m in actual_state] + assert states[0] == to_dict(expected_state_1.state.stream.stream_state) + assert states[1] == to_dict(expected_state_2.state.stream.stream_state)If you’d like to guard against ordering changes in partition enumeration, we could also assert presence rather than positional equality—happy to sketch that variant too, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)unit_tests/sources/streams/test_stream_read.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/streams/test_stream_read.py (2)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
PartitionReader
(45-86)process_partition
(63-86)airbyte_cdk/sources/message/repository.py (4)
consume_queue
(59-60)consume_queue
(70-71)consume_queue
(93-95)consume_queue
(118-119)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
634-637
: Great, let's locate where that default is defined and see how it's used inConcurrentDeclarativeSource
. Once we have the results, we can determine if adding theMAX_SLICES: int = 5
annotation and replacing other literals with the constant makes sense across the codebase.unit_tests/sources/streams/test_stream_read.py (1)
7-7
: LGTM: importing Queue to spec the mock queueBringing in Queue to use as a Mock spec keeps the PartitionReader dependency surface honest. This looks good to me, wdyt?
Hello darkness my old friend.
Reverts #705
Let's see how flaky this test is since can't really replicate this locally
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Tests