-
Notifications
You must be signed in to change notification settings - Fork 30
fix: parameters propagation issues for concurrent cursors #726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: parameters propagation issues for concurrent cursors #726
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/fix_parameter_propagation_for_concurrent_cursors#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/fix_parameter_propagation_for_concurrent_cursors Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
📝 WalkthroughWalkthroughCopy "parameters" into "$parameters" when absent for two concurrent-cursor factory paths before parsing into Pydantic models, ensuring both manifest-shaped and model.dict-shaped component inputs propagate parameters into the concurrent cursor construction. No public signatures changed. Changes
Sequence Diagram(s)sequenceDiagram
participant Manifest as Manifest / Model dict
participant Factory as ModelToComponentFactory
participant Pydantic as Pydantic Model Parser
rect #F0F9FF
Manifest->>Factory: provide component_definition (may have "parameters" or "$parameters")
end
rect #F7FFF0
Note right of Factory: If "$parameters" missing\ncopy "parameters" -> "$parameters"
Factory->>Factory: ensure "$parameters" present
end
Factory->>Pydantic: parse component_definition into model (uses $parameters)
Pydantic-->>Factory: parsed component instance
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Possibly related PRs
Suggested reviewers
Would you like me to draft a short comment suggesting unifying the two parameter input shapes inside the factory (so propagation consistently happens in one place), wdyt? Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1286-1286: Command 'poetry run mypy --config-file mypy.ini airbyte_cdk' failed with mypy error: Unsupported target for indexed assignment ("Mapping[str, Any]") [index]
[error] 1596-1596: Command 'poetry run mypy --config-file mypy.ini airbyte_cdk' failed with mypy error: Unsupported target for indexed assignment ("Mapping[str, Any]") [index]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Their concern should be addressed in an upcoming PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (5)
4740-4742
: Strengthen the assertion by checking we’re actually on the concurrent path.Would you consider asserting the stream and cursor types before the final key check? That makes the intent explicit and guards against future refactors where streams() could return a different wrapper. Wdyt?
- streams = source.streams({}) - - assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override + streams = source.streams({}) + # Extra safety to ensure we hit the concurrent path + assert isinstance(streams[0], DefaultStream) + assert isinstance(streams[0].cursor, ConcurrentCursor) + assert streams[0].name == "stream_name" + assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override
4733-4741
: Optional: use _group_streams and select by name to avoid ordering assumptions.Most tests here validate concurrent constructs via _group_streams. Using it and selecting by name will make this test more resilient if stream ordering changes. Interested in switching? Wdyt?
- streams = source.streams({}) - - assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override + concurrent_streams, _ = source._group_streams(config={}) + stream = next(s for s in concurrent_streams if s.name == "stream_name") + assert isinstance(stream, DefaultStream) + assert isinstance(stream.cursor, ConcurrentCursor) + assert stream.cursor.cursor_field.cursor_field_key == cursor_field_parameter_override
4654-4660
: Tiny readability bump: add a one-line docstring explaining the scenario.This helps future readers quickly understand the regression being covered. Add a brief why here? Wdyt?
def test_parameter_propagation_for_concurrent_cursor(): + """Ensures stream $parameters['cursor_field'] propagates into the concurrent DatetimeBased cursor."""
4654-4742
: Broaden coverage: add a per-partition concurrent cursor variant.The fix also touches the per-partition concurrent cursor factory path. Would you like to add a sibling test that sets up a partition_router and verifies the same $parameters override flows into that cursor too? I can help wire it if useful. Wdyt?
Example outline you could drop below this test:
def test_parameter_propagation_for_concurrent_perpartition_cursor(): override = "created_at" manifest = { "version": "5.0.0", "definitions": { "selector": {"type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": []}}, "requester": {"type": "HttpRequester", "url_base": "https://persona.metaverse.com", "http_method": "GET"}, "retriever": { "type": "SimpleRetriever", "record_selector": {"$ref": "#/definitions/selector"}, "paginator": {"type": "NoPagination"}, "requester": {"$ref": "#/definitions/requester"}, "partition_router": {"type": "ListPartitionRouter", "cursor_field": "partition_id", "values": ["A", "B"]}, }, "incremental_cursor": { "type": "DatetimeBasedCursor", "start_datetime": {"datetime": "2024-01-01"}, "end_datetime": {"datetime": "2024-12-31"}, "datetime_format": "%Y-%m-%d", "cursor_datetime_formats": ["%Y-%m-%d"], "cursor_granularity": "P1D", "step": "P400D", "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", "start_time_option": {"type": "RequestOption", "field_name": "start", "inject_into": "request_parameter"}, "end_time_option": {"type": "RequestOption", "field_name": "end", "inject_into": "request_parameter"}, }, "incremental_stream": { "retriever": {"$ref": "#/definitions/retriever", "requester": {"$ref": "#/definitions/requester"}}, "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, "$parameters": {"name": "perpartition", "primary_key": "id", "path": "/path", "cursor_field": override}, "schema_loader": {"type": "InlineSchemaLoader", "schema": {"$schema": "https://json-schema.org/draft-07/schema#", "type": "object", "properties": {"id": {"type": ["null", "string"]}}}}, }, }, "streams": ["#/definitions/incremental_stream"], "check": {"stream_names": ["perpartition"]}, "concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 5, "max_concurrency": 25}, } source = ConcurrentDeclarativeSource(source_config=manifest, config={}, catalog=create_catalog("perpartition"), state=None) concurrent_streams, _ = source._group_streams(config={}) stream = next(s for s in concurrent_streams if s.name == "perpartition") assert isinstance(stream.cursor, ConcurrentCursor) assert stream.cursor.cursor_field.cursor_field_key == override
4676-4678
: Use object form for end_datetime for consistencyI ran a search across our declarative tests and found that every other static example in this suite uses the object shape for
end_datetime
—only this one remains as a bare string. Would you mind switching it to the object form so it matches the rest? Wdyt?• Location:
unit_tests/sources/declarative/test_concurrent_declarative_source.py:4677
- "end_datetime": "2024-12-31", + "end_datetime": {"datetime": "2024-12-31"},
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
state
(137-157)cursor_field
(133-134)airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
cursor
(92-93)cursor_field
(52-53)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
4654-4742
: Nice, focused regression test for parameter propagation into concurrent cursor.This directly validates that a stream-level $parameters override flows into the DatetimeBased concurrent cursor’s cursor_field. It aligns with the PR objective and should prevent a silent fallback to the default "updated_at". LGTM.
What
https://github.com/airbytehq/oncall/issues/8851
https://airbytehq-team.slack.com/archives/C09C4CK3BEW
How
Ensuring "$parameter" is populated from "parameters" value
Summary by CodeRabbit
Bug Fixes
Tests