-
Notifications
You must be signed in to change notification settings - Fork 29
chore: migrate client side filtering to concurrent cursor #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: migrate client side filtering to concurrent cursor #679
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/use-concurrent-cursor-for-client-side-filtering#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/use-concurrent-cursor-for-client-side-filtering Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
📝 WalkthroughWalkthroughThis change updates cursor and filtering logic for concurrent and partitioned stream processing. It introduces a standardized Changes
Sequence Diagram(s)sequenceDiagram
participant Stream as DeclarativeStream
participant Factory as ModelToComponentFactory
participant Cursor as ConcurrentCursor
participant Record as Record
Stream->>Factory: create_declarative_stream()
Factory->>Factory: _build_concurrent_cursor(...)
Factory->>Cursor: ConcurrentCursor.__init__(...)
Note right of Cursor: Cursor initialized with state and config
Record->>Cursor: should_be_synced(record)
Cursor-->>Record: True/False (based on start cursor value)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (6)📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Applied to files:
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Applied to files:
📚 Learning: in the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict modu...
Applied to files:
📚 Learning: when code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repositor...
Applied to files:
📚 Learning: the custompageincrement class in unit_tests/source_declarative_manifest/resources/source_the_guardia...
Applied to files:
📚 Learning: in the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-g...
Applied to files:
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
🔇 Additional comments (6)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(0 hunks)airbyte_cdk/sources/declarative/extractors/record_filter.py
(3 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
(2 hunks)airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(4 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(2 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧰 Additional context used
🧠 Learnings (6)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)
Learnt from: aaronsteers
PR: #174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
airbyte_cdk/sources/declarative/extractors/record_filter.py (3)
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in airbyte_cdk/cli/source_declarative_manifest/
, including _run.py
, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in airbyte_cdk/cli/source_declarative_manifest/
is being imported from another repository, avoid suggesting modifications to it during the import process.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
Learnt from: aaronsteers
PR: #174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in airbyte_cdk/cli/source_declarative_manifest/
, including _run.py
, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Learnt from: aaronsteers
PR: #174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
file, the strict module name checks in _get_class_from_fully_qualified_class_name
(requiring module_name
to be "components" and module_name_full
to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in airbyte_cdk/cli/source_declarative_manifest/
is being imported from another repository, avoid suggesting modifications to it during the import process.
Learnt from: aaronsteers
PR: #174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the declarative_component_schema.py
file is auto-generated from declarative_component_schema.yaml
and should be ignored in the recommended reviewing order.
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/extractors/record_filter.py
[error] 4-9: Ruff: Import block is un-sorted or un-formatted. Organize imports. Fixable with '--fix' option.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2184-2184: mypy: Argument "stream_name" to "get_stream_state" of "ConnectorStateManager" has incompatible type "str | None"; expected "str" [arg-type]
[error] 2202-2202: mypy: Argument "stream_name" to "create_concurrent_cursor_from_perpartition_cursor" of "ModelToComponentFactory" has incompatible type "str | None"; expected "str" [arg-type]
[error] 2212-2212: mypy: Incompatible return value type (got "ConcurrentCursor", expected "StreamSlicer | None") [return-value]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-google-drive
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (26)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py (1)
85-86
: LGTM! Implementation aligns perfectly with the cursor's purpose.The
should_be_synced
method returningTrue
unconditionally makes sense for aFileBasedFinalStateCursor
, since this cursor is designed to guarantee at least one state message is emitted and doesn't need to filter records. The consistent interface with other cursor implementations is a nice touch for the concurrent filtering migration. wdyt?unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
1427-1427
: LGTM! This cursor type change aligns perfectly with the concurrent migration.The update from
PerPartitionWithGlobalCursor
toConcurrentPerPartitionCursor
makes complete sense given the PR's objective to migrate client-side filtering to concurrent cursor handling. This reflects the enhanced concurrent processing capabilities mentioned in the AI summary, wdyt?airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (2)
31-31
: LGTM on the formatting improvement!The blank line after the class declaration helps with readability by clearly separating the class definition from its constants.
316-317
: Good default implementation for the standardized interface!The
should_be_synced
method provides a sensible default by returningTrue
for all records, maintaining backward compatibility while conforming to the new cursor interface. This aligns well with the PR's goal of standardizing cursor filtering logic, wdyt?unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
1894-1894
: Nice test adaptation to the cursor state changes!The updated assertion properly validates the cursor state values instead of comparing internal cursor objects, which aligns well with the cursor state handling refactoring. This makes the test more focused on the actual behavior we care about - that the cursor contains the expected state value, wdyt?
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
84-84
: Nice addition for migration flexibility!The new parameter enables graceful cursor creation during the migration. The naming is clear and descriptive. WDYT about the approach for handling this transition?
129-130
: Good practice documenting the temporary nature.I appreciate the FIXME comment clearly indicating this is migration-specific code. This helps future maintainers understand the context and timeline for cleanup.
520-530
: Clean implementation of the cursor creation fallback.The logic correctly handles both scenarios:
- Old behavior: raise ValueError when cursor not found and flag is False
- New behavior: create cursor dynamically when flag is True
The implementation aligns well with the migration objectives. The cursor creation uses the same parameters as the existing logic, which ensures consistency.
airbyte_cdk/sources/declarative/extractors/record_filter.py (2)
58-58
: Great simplification of the cursor type!Moving from a union of specific cursor types to the generic
Cursor
interface is a clean architectural improvement. This aligns well with the cursor abstraction introduced in the concurrent cursor framework. WDYT about how this impacts type safety?
76-76
: Nice catch on the comment improvement.The comment is now clearer and more grammatically correct. Small improvements like this enhance code readability.
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
77-79
: Excellent addition of the abstract method to the interface!This ensures all cursor implementations provide consistent filtering behavior. The method signature is clear and the abstract decorator properly enforces implementation. Great architectural improvement for the migration!
130-131
: Clean implementation in FinalStateCursor.Returning
True
makes sense for this cursor type since it's used to guarantee state messages rather than filtering records. Simple and correct!
202-213
: Smart improvement to the state extraction logic.The new approach to extract the cursor value from the first slice (using either MOST_RECENT_RECORD_KEY or END_KEY as fallback) is more robust than the previous implementation. The logic correctly handles the case where slices exist and falls back appropriately. WDYT about this approach for state initialization?
489-489
: Question about removing the upper boundary check.I notice the
<= self._end_provider()
check was removed from the should_be_synced logic. This seems like a significant behavioral change - now records are only filtered by the lower boundary. Was this intentional for the migration? Could you help me understand the reasoning behind removing the upper boundary filtering?The change means records with cursor values beyond the end boundary will now be synced, whereas before they would have been filtered out. WDYT about the implications of this change?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
1563-1564
: LGTM! Clean parameter additionThe new optional parameter
attempt_to_create_cursor_if_not_provided
is well-named and maintains backward compatibility with its default value.
1635-1636
: Parameter correctly propagatedGood to see the
attempt_to_create_cursor_if_not_provided
parameter being passed through toConcurrentPerPartitionCursor
as intended.
1942-1946
: Nice refactoring to centralize cursor creationGood move extracting the concurrent cursor creation logic into
_build_concurrent_cursor
. This improves code organization and makes the intent clearer.
2220-2229
: Additional type safety fix neededLine 2224 also needs the same null check:
-stream_name=model.name or "", +stream_name=model.name or "",Note: The
attempt_to_create_cursor_if_not_provided=True
on lines 2208 and 2228 appears intentional for client-side incremental sync scenarios.Likely an incorrect or invalid review comment.
unit_tests/sources/streams/concurrent/test_cursor.py (8)
1782-1782
: Nice refactoring with the constant!Adding
_SHOULD_BE_SYNCED_START
centralizes the test value and makes the tests more maintainable. The naming is clear and consistent with the file's conventions.
1783-1837
: LGTM on the parameterized test updates!The refactoring to use
_SHOULD_BE_SYNCED_START
constant makes the test data more readable and maintainable. The function name update also better reflects what's being tested.
1840-1858
: Solid test coverage for simple state filtering!This test nicely validates the basic filtering behavior where records are filtered based on their cursor value relative to the stored state value. The assertions clearly verify the expected boundary behavior.
1859-1879
: Good edge case coverage!This test handles the scenario where partitioned state exists but is empty - using zero as the baseline filter value is a sensible default. The test validates this fallback behavior correctly.
1881-1901
: Perfect handling of the start time scenario!When partitioned state has a start time but no slices, using the start time as the filtering baseline makes perfect sense. The test validates this behavior thoroughly.
1903-1926
: Excellent test for most_recent_cursor_value priority!This test correctly validates that when a slice has a
most_recent_cursor_value
, it takes precedence for filtering decisions. The logic and assertions are spot on.
1928-1951
: Smart fallback logic testing!When
most_recent_cursor_value
isn't available, falling back to the slice's upper boundary for filtering is a good design choice. The test validates this fallback behavior correctly.
1953-1980
: Thoughtful handling of multiple slice scenarios!This test covers the complex case of multiple slices nicely. I particularly appreciate the comment explaining why records within later slice boundaries are still synced - it's a good defensive approach since API queries should handle the slice boundaries anyway. The test validates both the primary filtering logic and this safety behavior, wdyt?
📝 WalkthroughWalkthroughThis set of changes refactors the cursor and state management logic for concurrent and client-side incremental streams in the Airbyte CDK. It introduces a unified cursor interface, updates cursor creation and retrieval mechanisms, adds and adapts filtering logic, and expands test coverage for partitioned and non-partitioned cursor scenarios. Changes
Sequence Diagram(s)sequenceDiagram
participant Stream as DeclarativeStream
participant Factory as ModelToComponentFactory
participant Cursor as ConcurrentCursor/ConcurrentPerPartitionCursor
participant StateMgr as ConnectorStateManager
Stream->>Factory: create_declarative_stream()
Factory->>StateMgr: get_stream_state()
Factory->>Factory: _build_concurrent_cursor(model, stream_slicer, config)
alt Incremental with Partition Router
Factory->>Cursor: create_concurrent_cursor_from_perpartition_cursor(..., attempt_to_create_cursor_if_not_provided=True)
else Incremental with Count/Datetime
Factory->>Cursor: create_concurrent_cursor_from_incrementing_count_cursor or create_concurrent_cursor_from_datetime_based_cursor(..., attempt_to_create_cursor_if_not_provided=True)
end
Factory-->>Stream: Stream with concurrent cursor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Would you like to have a second reviewer focus specifically on the new ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (7)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py (1)
85-86
: LGTM! The implementation looks correct for a final state cursor.The unconditional
True
return makes perfect sense here since this cursor is designed to guarantee at least one state message is emitted and shouldn't filter out any records. I noticed other cursor implementations in the codebase follow similar patterns.One small suggestion - would you consider adding a brief docstring to make the intent explicit? Something like
"""All records should be synced as this cursor doesn't perform filtering."""
- wdyt?airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)
315-317
: Redundant override – can we rely on the base implementation instead?
ConcurrentCursor
already provides a concreteshould_be_synced()
that unconditionally returnsTrue
. Re-defining it here adds one more place to keep in sync without changing behaviour. Removing the override would keep the class leaner while preserving functionality, wdyt?- def should_be_synced(self, record: Record) -> bool: - return True + # inherits should_be_synced(record) from parent – no override requiredunit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
1894-1894
: Assertion could avoid ordering assumptions
list(cursor.state.values()) == [expected]
relies on insertion order for a single-key dict. If an extra key is ever added, or insertion order changes, the test will break even though the value is still present. Would asserting membership be more future-proof, e.g.assert expected_cursor_value in record_filter._cursor.state.values()
? wdyt?- assert list(record_filter._cursor.state.values()) == [expected_cursor_value] + assert expected_cursor_value in record_filter._cursor.state.values()airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
129-130
: Consider tracking this temporary field for cleanup.The FIXME comment clearly indicates this is temporary for the migration. Would it be worth creating a tracking issue to ensure this gets cleaned up once the migration is complete? wdyt?
airbyte_cdk/sources/declarative/extractors/record_filter.py (1)
58-58
: Minor type annotation cleanup needed.Since you're using a single cursor type now, the
Union[Cursor]
can be simplified to justCursor
. The union wrapper isn't needed for a single type. wdyt?- cursor: Union[Cursor], + cursor: Cursor,airbyte_cdk/sources/streams/concurrent/cursor.py (1)
202-213
: Good state extraction logic, minor readability suggestion.The cursor value extraction logic is well thought out - preferring the most recent record over the end key makes sense. The fallback chain handles edge cases nicely.
One small suggestion for readability - could we extract the slice value logic into a helper method? Something like:
def _extract_cursor_from_slice(self, slice_data): return slice_data.get(self._connector_state_converter.MOST_RECENT_RECORD_KEY, slice_data.get(self._connector_state_converter.END_KEY))wdyt? Would make the main method a bit cleaner.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2211-2231
: Consider usingisinstance()
for type checking.Using
type() ==
for type checking doesn't handle inheritance properly. Would it be better to useisinstance()
instead?- if type(model.incremental_sync) == IncrementingCountCursorModel: + if isinstance(model.incremental_sync, IncrementingCountCursorModel): return self.create_concurrent_cursor_from_incrementing_count_cursor( # ... ) - elif type(model.incremental_sync) == DatetimeBasedCursorModel: + elif isinstance(model.incremental_sync, DatetimeBasedCursorModel): return self.create_concurrent_cursor_from_datetime_based_cursor( # ... )This would make the code more robust to future subclasses, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(0 hunks)airbyte_cdk/sources/declarative/extractors/record_filter.py
(3 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
(2 hunks)airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(4 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(2 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
Learnt from: aaronsteers
PR: #174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
airbyte_cdk/sources/declarative/extractors/record_filter.py (3)
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in airbyte_cdk/cli/source_declarative_manifest/
, including _run.py
, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in airbyte_cdk/cli/source_declarative_manifest/
is being imported from another repository, avoid suggesting modifications to it during the import process.
unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)
Learnt from: aaronsteers
PR: #174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in airbyte_cdk/cli/source_declarative_manifest/
, including _run.py
, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Learnt from: aaronsteers
PR: #174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
file, the strict module name checks in _get_class_from_fully_qualified_class_name
(requiring module_name
to be "components" and module_name_full
to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in airbyte_cdk/cli/source_declarative_manifest/
is being imported from another repository, avoid suggesting modifications to it during the import process.
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the declarative_component_schema.py
file is auto-generated from declarative_component_schema.yaml
and should be ignored in the recommended reviewing order.
🧬 Code Graph Analysis (6)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (3)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py (1)
should_be_synced
(85-86)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
should_be_synced
(78-79)should_be_synced
(130-131)should_be_synced
(477-489)airbyte_cdk/sources/types.py (1)
Record
(21-72)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
_create_cursor
(212-215)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py (3)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)
should_be_synced
(316-317)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
should_be_synced
(78-79)should_be_synced
(130-131)should_be_synced
(477-489)airbyte_cdk/sources/types.py (1)
Record
(21-72)
airbyte_cdk/sources/streams/concurrent/cursor.py (8)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)
should_be_synced
(316-317)airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py (2)
should_be_synced
(85-86)state
(44-45)airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py (1)
should_be_synced
(384-399)airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
should_be_synced
(338-339)airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
should_be_synced
(313-316)airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (1)
should_be_synced
(196-197)airbyte_cdk/sources/types.py (2)
Record
(21-72)get
(146-147)airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
deserialize
(67-78)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (4)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)
state
(70-71)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
state
(137-157)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
state
(573-574)state
(577-580)unit_tests/sources/mock_server_tests/mock_source_fixture.py (4)
state
(70-71)state
(74-75)state
(320-321)state
(324-325)
unit_tests/sources/streams/concurrent/test_cursor.py (3)
airbyte_cdk/sources/types.py (2)
Record
(21-72)data
(35-36)airbyte_cdk/sources/streams/concurrent/cursor.py (5)
should_be_synced
(78-79)should_be_synced
(130-131)should_be_synced
(477-489)ConcurrentCursor
(134-496)CursorField
(39-47)airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py (1)
EpochValueConcurrentStreamStateConverter
(114-142)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/extractors/record_filter.py
[error] 4-9: Ruff: Import block is un-sorted or un-formatted. Organize imports. 1 error found, fixable with '--fix' option.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2184-2184: mypy: Argument "stream_name" to "get_stream_state" of "ConnectorStateManager" has incompatible type "str | None"; expected "str" [arg-type]
[error] 2202-2202: mypy: Argument "stream_name" to "create_concurrent_cursor_from_perpartition_cursor" of "ModelToComponentFactory" has incompatible type "str | None"; expected "str" [arg-type]
[error] 2212-2212: mypy: Incompatible return value type (got "ConcurrentCursor", expected "StreamSlicer | None") [return-value]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (18)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
1427-1427
: LGTM! Test updated to reflect cursor refactoring.This test update correctly reflects the migration from
PerPartitionWithGlobalCursor
toConcurrentPerPartitionCursor
for client-side incremental filtering with partition routers. The assertion now matches the expected cursor type after the concurrent cursor refactoring, wdyt?airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
84-84
: LGTM on the new parameter addition!The parameter name clearly communicates its intent and the default value maintains backward compatibility. Nice work!
514-530
: Clean method structure and error handling.The conditional logic is well-structured and maintains backward compatibility. The error messages are descriptive and help with debugging. Good work on preserving the original behavior when the flag is disabled!
airbyte_cdk/sources/declarative/extractors/record_filter.py (2)
52-52
: Clean docstring update.Good job updating the parameter documentation to match the new cursor type! The documentation stays consistent with the code changes.
76-76
: Nice comment clarification!The improved comment better explains why the stream name is empty. These small documentation improvements really help with code readability!
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
77-79
: Excellent interface standardization!Adding
should_be_synced
as an abstract method is a great way to ensure all cursor implementations provide consistent filtering behavior. This makes the cursor interface much cleaner!
130-131
: Appropriate implementation for FinalStateCursor.Returning
True
for all records makes perfect sense for a final state cursor. Clean and straightforward!
489-489
: Upper‐boundary check removal in ConcurrentCursor.should_be_synced is by designUnit tests in
unit_tests/sources/streams/concurrent/test_cursor.py
confirm thatshould_be_synced
only enforces the lower bound (self.start ≤ record_cursor_value
), and intentionally leaves upper‐boundary filtering to the slice‐based API queries. No further changes are required here. wdyt?unit_tests/sources/streams/concurrent/test_cursor.py (8)
1782-1782
: Nice addition of the constant!This improves readability and maintainability by eliminating the magic number. Good refactoring!
1783-1837
: Great refactoring of the parameterized test!The use of
_SHOULD_BE_SYNCED_START
constant makes the test parameters much clearer, and the function rename totest_should_be_synced_non_partitioned_state_no_state
provides better context about what scenario is being tested. The test IDs also reflect the changes nicely.
1840-1858
: Solid test coverage for simple state filtering!This test clearly validates that
should_be_synced
correctly filters records based on the cursor value in the state. The test logic is sound - records with cursor values less than the state value should be filtered out (False), while equal or greater values should be synced (True).
1859-1879
: Good edge case coverage!This test handles the scenario where partitioned state exists but has no slices or start timestamp. Using zero as the baseline filter value is a reasonable default behavior, and the test correctly validates this edge case.
1881-1901
: Excellent coverage of the start timestamp scenario!This test properly validates that when partitioned state has no slices but does have a start value, the start timestamp is correctly used as the baseline for filtering. The assertion logic is spot-on.
1903-1926
: Perfect test for the most_recent_cursor_value scenario!This test correctly validates that when a slice contains a
most_recent_cursor_value
, that value takes precedence for filtering decisions. This is the expected behavior since it represents the actual progress within that slice.
1928-1951
: Great fallback behavior test!This test properly validates the fallback scenario where a slice exists but doesn't have a
most_recent_cursor_value
. Using the slice's upper boundary (end
) as the filter baseline is logical and the test correctly verifies this behavior.
1953-1980
: Excellent comprehensive test for multiple slices!This test thoroughly validates the complex scenario with multiple slices. I particularly appreciate the explanatory comment on lines 1978-1980 - it clearly explains why records beyond the first slice boundary are still synced (conservative approach to avoid missing data). The test logic covers all the important cases including the gap between slices.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1563-1564
: LGTM! Good addition for conditional cursor creation.The new
attempt_to_create_cursor_if_not_provided
parameter with a default value ofFalse
maintains backward compatibility while enabling the new behavior when needed.
1942-1946
: Clean refactoring of client-side incremental sync logic.The extraction of cursor creation logic to
_build_concurrent_cursor
improves code organization. This aligns well with the PR objective of migrating client-side filtering to concurrent cursor.
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
PyTest Results (Fast)3 695 tests +6 3 684 ✅ +6 6m 31s ⏱️ -3s Results for commit 62ff4df. ± Comparison against base commit e4cbaaf. This pull request removes 4 and adds 10 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 698 tests +6 3 687 ✅ +6 11m 47s ⏱️ +2s Results for commit 62ff4df. ± Comparison against base commit e4cbaaf. This pull request removes 4 and adds 10 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
/autofix |
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2020-2024
: Parameter renaming looks good, but typo propagates.The renaming from
stop_condition_on_cursor
tostop_condition_cursor
improves consistency. However, the typo inclient_side_filetering_enabled
from line 1935 propagates here too. Once you fix the variable name above, this should be updated as well, wdyt?
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2178-2237
: Great centralization, but consider using isinstance() instead of type().The new
_build_concurrent_cursor
method nicely centralizes the concurrent cursor creation logic! However, I noticed you're usingtype() == Class
comparisons on lines 2213 and 2222. Would you consider usingisinstance()
instead for better inheritance support, wdyt?- if type(model.incremental_sync) == IncrementingCountCursorModel: + if isinstance(model.incremental_sync, IncrementingCountCursorModel): # ... - elif type(model.incremental_sync) == DatetimeBasedCursorModel: + elif isinstance(model.incremental_sync, DatetimeBasedCursorModel):Otherwise, the method structure and logic flow look solid!
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)unit_tests/sources/declarative/extractors/test_record_filter.py
(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/extractors/test_record_filter.py
🧰 Additional context used
🧠 Learnings (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in airbyte_cdk/cli/source_declarative_manifest/
, including _run.py
, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Learnt from: ChristoGrab
PR: #58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the YamlDeclarativeSource
class in airbyte_cdk/sources/declarative/yaml_declarative_source.py
, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Learnt from: aaronsteers
PR: #174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
file, the strict module name checks in _get_class_from_fully_qualified_class_name
(requiring module_name
to be "components" and module_name_full
to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Learnt from: aaronsteers
PR: #58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in airbyte_cdk/cli/source_declarative_manifest/
is being imported from another repository, avoid suggesting modifications to it during the import process.
Learnt from: aaronsteers
PR: #174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the declarative_component_schema.py
file is auto-generated from declarative_component_schema.yaml
and should be ignored in the recommended reviewing order.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-google-drive
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
606-606
: LGTM! Import addition supports cursor unification.Adding the
Cursor
import aligns perfectly with the PR's goal to standardize cursor handling around the genericCursor
class.
1563-1563
: Good addition for flexible cursor creation.The
attempt_to_create_cursor_if_not_provided
parameter provides the transitional functionality described in the PR objectives, allowing cursor creation when slices exist but cursors don't. The defaultFalse
value maintains backward compatibility, wdyt?
3330-3330
: Check the logic for cursor_used_for_stop_condition assignment.The logic
cursor_used_for_stop_condition = cursor if stop_condition_cursor else None
seems potentially incorrect. Should this becursor_used_for_stop_condition = stop_condition_cursor or cursor
? The current logic assignscursor
whenstop_condition_cursor
is truthy, but ignores the actualstop_condition_cursor
value. Could you clarify the intended behavior here, wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few things I wanted to ask about but otherwise everything else makes sense to me!
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚚 💨
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Go home coderabbitai, you are drunk
What
We are starting to migrate more to the concurrent cursors. The last usages of the declarative cursors are:
This PR address the two first. A PR will follow for the substream stuff.
How
should_be_synced
in concurrent cursorsshould_be_synced
. This should be the case once the stream slices are generated from the concurrent cursors but as a transition step, we've addedattempt_to_create_cursor_if_not_provided
observe
andclose_slice
Expected Impact
None for the user
The code my be less performant and use a bit more memory because we will compute the cursors multiple times for client side fields and data feed but that should be it.
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests