-
Notifications
You must be signed in to change notification settings - Fork 29
fix: do not break on bad cursor value in perpartition cursor #758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/fix_per_partition_global_cursor_error#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/fix_per_partition_global_cursor_error Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
/autofix
|
📝 WalkthroughWalkthroughAdds defensive parsing in ConcurrentPerPartitionCursor.observe: cursor-value parsing is wrapped in try/except, logs a one-time warning on ValueError, skips state updates for that record, and a unit test verifies sync doesn't break on a bad global cursor value. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant SRC as Source/Stream
participant CPC as ConcurrentPerPartitionCursor
participant GCur as Global Cursor
participant PCur as Per-Partition Cursor
participant Log as Logger
SRC->>CPC: observe(record)
rect rgba(230,240,255,0.6)
note right of CPC: attempt parse/format record cursor
CPC->>CPC: parse/format cursor_value
alt Parse fails (ValueError)
opt first failure
CPC->>Log: warn("failed to parse cursor value ...")
end
CPC-->>SRC: return (skip state updates)
else Parse succeeds
CPC->>CPC: record_cursor := formatted value\n_synced_some_data := True
CPC->>GCur: update global cursor with record_cursor
alt using per-partition cursors
CPC->>PCur: PCur.observe(record)
end
CPC-->>SRC: return
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Would you like me to propose a short docstring update for the new flag and the observe branch to address the docstring coverage warning, wdyt? Pre-merge checks (2 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
4406-4433
: Tighten the new test and align name with behavior.
- Unused variable cursor_factory_mock.
- The test name mentions “global state” but the cursor runs with the default per-partition mode. Should we set use_global_cursor=True to match intent and prevent regressions, wdyt?
- Optional: assert no state update occurred (e.g., _new_global_cursor is None) and/or capture the single warning via caplog.
-def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_break_sync(): - cursor_factory_mock = MagicMock() - cursor_factory_mock.create.side_effect = [_make_inner_cursor("2024-01-01T00:00:00Z")] +def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_break_sync(): cursor = ConcurrentPerPartitionCursor( - cursor_factory=MagicMock(), + cursor_factory=MagicMock(), partition_router=ListPartitionRouter(values=["1"], cursor_field="partition_id", config={}, parameters={}), stream_name="test_stream", stream_namespace=None, stream_state={}, message_repository=MagicMock(), connector_state_manager=MagicMock(), connector_state_converter=CustomFormatConcurrentStreamStateConverter( datetime_format="%Y-%m-%dT%H:%M:%SZ", input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], is_sequential_state=True, cursor_granularity=timedelta(0), ), cursor_field=CursorField(cursor_field_key="updated_at"), + use_global_cursor=True, ) cursor.observe( Record( data={"updated_at": ""}, stream_name="test_stream", associated_slice=StreamSlice(partition={"partition_id": "1"}, cursor_slice={}) ) ) + # No exception thrown and no global state advancement + assert cursor._new_global_cursor is None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(2 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4)
airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py (1)
ListPartitionRouter
(18-115)airbyte_cdk/sources/types.py (7)
StreamSlice
(75-169)Record
(21-72)values
(143-144)data
(35-36)associated_slice
(39-40)partition
(99-104)cursor_slice
(107-112)airbyte_cdk/sources/streams/concurrent/cursor.py (2)
cursor_field
(191-192)CursorField
(40-48)airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py (1)
CustomFormatConcurrentStreamStateConverter
(193-223)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[error] 525-531: Ruff format would reformat this file. Run 'poetry run ruff format .' to apply formatting changes.
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[error] 4408-4429: Ruff format would reformat this file. Run 'poetry run ruff format .' to apply formatting changes.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)
23-23
: Import looks good.ListPartitionRouter is used below; no concerns.
33-34
: Import looks good.Record is used in the new test; no concerns.
4408-4429
: Use direct ruff/black commands to format
CI is flagging this hunk—sincepoetry
isn’t available, could you try running:ruff --fix . && ruff . --format=github && black . && pytest -qto get the suite green, wdyt?
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code wise looks good, but any concern over thread safety since observe()
is now done in PartitionReader()
?
If our only exposure is that we might accidentally log this more than once during a race condition, that's probably okay. but just wanted to bring this up if its worth discussing
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
That was my guess and I'm fine with logging multiple times. Honestly, it probably doesn't happen very often given that we've only seen this in CI for source-bing-ads and it was for only one record. So yeah, except if the datetime_formats are not configured properly, this should be straighforward |
What
Following this discussion on our Slack about source-bing-ads CI failing (it does not happen in prod so we have something weird in our sandbox account), it seems like we don't fail the sync if we can't parse a datetime when it's ConcurrentCursor but we do on perpartition.
This PR aligns to behavior to never fail. The drawback is that if there is a cursor value that is misconfigured, we won't get paged.
Summary by CodeRabbit
New Features
Bug Fixes
Tests