Skip to content

Conversation

aldogonzalez8
Copy link
Contributor

@aldogonzalez8 aldogonzalez8 commented May 15, 2025

Initial scope...

Pass state transformations to concurrent cursor flow creation for async retriever with stream slicer flow.

Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/13019

But Also!

If we try to apply a transformation to the stream_state that already represents the cursor for the partition we can end up with things like happening here:

From

{'TimePeriod': '2025-05-12T08:00:00+00:00'}

To:


{
"states": [
      {
         'cursor': '2025-05-12T08:00:00+00:00', 
         'partition': {'account_id': 'TimePeriod'}
      }
   ]
}

Then I'm falling False if the internal stream_state items value is not a dict, according to the docstrings:

        The expected state format is
        "<parent_key_id>" : {
          "<cursor_field>" : "<cursor_value>"
        }

Summary by CodeRabbit

Summary by CodeRabbit

  • Bug Fixes

    • Improved validation for state migration to ensure only correctly formatted partitioned states are processed.
  • New Features

    • Added support for applying state migrations to streams using async retrievers in addition to simple retrievers.
    • Enhanced configuration options for partition routing by supporting hierarchical (parent-child) stream relationships.
  • Tests

    • Expanded test coverage for state migration logic, including new scenarios for both legacy and newest state formats, and updated error message assertions for retriever compatibility.

@aldogonzalez8 aldogonzalez8 self-assigned this May 15, 2025
@github-actions github-actions bot added the bug Something isn't working label May 15, 2025
@aldogonzalez8 aldogonzalez8 requested a review from tolik0 May 15, 2025 20:04
@aldogonzalez8 aldogonzalez8 changed the title fix(cdk): pass transformations to concurrent cursor and fix transofmation fix(cdk): pass transformations to concurrent cursor and fix transformation May 15, 2025
Copy link
Contributor

coderabbitai bot commented May 15, 2025

📝 Walkthrough

Walkthrough

The updates introduce stricter validation for state migration logic, expand support for async retrievers in migration components, and enhance test coverage for partitioned state handling. Configuration files and tests are updated to reflect more complex partition routing and to verify correct migration and cursor behaviors under both legacy and modern state formats.

Changes

Files / Group Change Summary
airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py Tightened validation in should_migrate to require all state values to be dictionaries, explicitly returning False for non-dict entries and empty states.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Allowed LegacyToPerPartitionStateMigration to work with both SimpleRetrieverModel and AsyncRetrieverModel. Updated error messages and ensured state migrations are passed to cursor creation for async retrievers.
unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py Added test cases to ensure migration does not occur for flat dictionary states or empty states. Updated error assertion to mention both SimpleRetriever and AsyncRetriever as valid types.
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml Added state_migrations to the stream config and replaced the partition router with a nested SubstreamPartitionRouter using a parent-child stream relationship for partitioning.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py Introduced a parameterized test to verify stream and cursor behavior with both legacy and newest state formats when using an async retriever and partition router.

Sequence Diagram(s)

sequenceDiagram
    participant Test as Test Suite
    participant Factory as ModelToComponentFactory
    participant Stream as DeclarativeStream
    participant Retriever as AsyncRetriever
    participant Cursor as ConcurrentCursor
    participant StateMgr as ConnectorStateManager

    Test->>Factory: create_stream(manifest, config, state_manager)
    Factory->>Stream: Instantiate DeclarativeStream
    Stream->>Retriever: Instantiate AsyncRetriever
    Retriever->>Cursor: create_concurrent_cursor_from_perpartition_cursor(state_migrations)
    Cursor->>StateMgr: Access state (legacy or newest format)
    Cursor-->>Retriever: Return concurrent cursor with correct state
    Retriever-->>Stream: Attach cursor
    Stream-->>Test: Return configured stream
Loading

Would you like me to add a comparison diagram showing differences between legacy and newest state flows as well, or is this overview sufficient for your needs? Wdyt?

Note

⚡️ AI Code Reviews for VS Code, Cursor, Windsurf

CodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback.
Learn more here.


Note

⚡️ Faster reviews with caching

CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.
Enjoy the performance boost—your workflow just got faster.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between f359981 and 9d45658.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py (1 hunks)
  • unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py
  • airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)

959-963: Consider enhancing the docstring to better explain state formats?

The docstring explains the test's purpose, but perhaps it could include brief descriptions of what constitutes "legacy" vs "newest" state formats for better context. For example, mentioning that legacy uses an account ID as the key, while newest uses a structured object with a states array. Wdyt?


965-966: Small typo in the file path.

There's a typo in the file path - "aync" instead of "async".

- "resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml"
+ "resources/stream_with_incremental_and_async_retriever_with_partition_router.yaml"
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1)

67-112: Consider abstracting the SubstreamPartitionRouter parent‐stream config to improve reuse? wdyt?
Right now we inline a full ParentStreamConfig+DeclarativeStream block—perhaps we could pull this into a shared anchor or a top‐level reusable definition to reduce duplication and ease maintenance.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between db07eff and f359981.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3 hunks)
  • unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py (2 hunks)
  • unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (12)
unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py (2)

192-195: Good test case addition for flat dictionary state!

This new test case verifies that the migrator doesn't attempt migration when the state is a flat dictionary with a single cursor field key rather than the expected nested format. This helps prevent potential state format issues when handling incorrectly formatted states, aligning with the PR objective of ensuring consistent state formats.


284-284: Updated error message acknowledges AsyncRetriever support, nice!

The error message now correctly indicates that LegacyToPerPartitionStateMigrations can be applied to streams with either a SimpleRetriever or AsyncRetriever. This change aligns with the PR objective of ensuring state transformations work correctly with async retrievers.

airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py (1)

91-93: Great defensive check for dictionary value types!

This addition provides stronger validation by ensuring each value in the state is actually a dictionary, as expected per the docstring. This directly addresses the issue mentioned in the PR objectives where incorrect state formats could occur when the internal stream_state items were expected to be dictionaries but were instead strings or other types.

The comment above the check is also helpful for explaining why this validation is necessary.

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (5)

947-957: Well-structured parameterized test setup.

The test is parameterized to run with both newest and legacy state formats, which is a good approach to verify compatibility with different state structures.


974-976: LGTM!

Good setup of the account ID and partition key structures needed for the test.


977-981: LGTM!

Clear definition of legacy state and state entry structures.


982-994: Good state preparation logic with clear structure.

This section properly handles both legacy and newest state formats while maintaining proper structure across both cases. The additional fields added to the newest state format provide good coverage of potential real-world use cases.


994-1002: Clean state setup based on test parameter.

The code selects the appropriate state format and sets it up in the connector state manager correctly.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

931-934: Great update to support AsyncRetrievers with state migrations!

This change correctly expands the type check to include AsyncRetrieverModel, allowing LegacyToPerPartitionStateMigration to work with both SimpleRetriever and AsyncRetriever. The error message was also updated to match the new supported types.


2002-2011: Good addition of state transformation support for async retrievers!

This change builds a list of state migration components from the model's state_migrations attribute when creating a concurrent cursor for async retrievers. This ensures state migrations are applied during cursor creation for async retrievers, aligning with the behavior for other retriever types.


2021-2021: Perfect - passing the state transformations to the concurrent cursor creation!

This is the key fix that ensures state migrations are correctly applied when creating concurrent cursors for async retrievers, addressing the issue where state formats could be incorrect when transformations weren't properly applied.

unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1)

59-60:

❓ Verification inconclusive

Have we verified that the new state_migrations block is correctly picked up by the parser for AsyncRetriever streams? wdyt?
This state_migrations entry is crucial to ensure legacy cursor state gets converted per partition—let’s confirm our ModelToComponentFactory logic has been updated to consume it under AsyncRetrieverModel.


🏁 Script executed:

#!/bin/bash
# Verify ModelToComponentFactory supports state_migrations for AsyncRetrieverModel
rg -A3 "class ModelToComponentFactory" -n src/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py | rg "AsyncRetrieverModel"

Length of output: 256


Let’s first locate the factory and check for any AsyncRetrieverModel handling:


🏁 Script executed:

#!/bin/bash
# Locate the ModelToComponentFactory class definition anywhere in the repo
rg -n "class ModelToComponentFactory" .

# Search within that file (and across the repo) for AsyncRetrieverModel references
rg -n "AsyncRetrieverModel" .

Length of output: 1371


Let’s verify that create_async_retriever actually consumes state_migrations:


🏁 Script executed:

#!/bin/bash
# Locate the create_async_retriever method
rg -n "def create_async_retriever" -n airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

# Dump its body to check for state_migrations handling
rg -A50 -n "def create_async_retriever" -n airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

# Search for any state_migrations references in the factory
rg -n "state_migrations" -n airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Length of output: 3764


Let’s pull in the full create_async_retriever body and check whether it wires through state_migrations from the model into the AsyncRetriever constructor:

Would love to confirm this wiring—wdyt?


🏁 Script executed:

#!/bin/bash
# Extract create_async_retriever and look for AsyncRetriever instantiation
rg -A200 -n "def create_async_retriever" -n airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py \
  | rg -n "AsyncRetriever("

# Within that same block, search for any mention of state_migrations
rg -A200 -n "def create_async_retriever" -n airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py \
  | rg -n "state_migrations"

Length of output: 363


Let’s pull out the AsyncRetriever instantiation and see if state_migrations is passed in:


🏁 Script executed:

#!/bin/bash
# Grab the create_async_retriever block
start=3288
end=$((start + 300))
sed -n "${start},${end}p" airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py > create_async_retriever_block.txt

# Look for where AsyncRetriever is constructed
grep -n "AsyncRetriever\s*(" create_async_retriever_block.txt

# Check around any AsyncRetriever calls for state_migrations
grep -n -C3 "AsyncRetriever\s*(" create_async_retriever_block.txt | grep -C3 "state_migrations"

Length of output: 308


Verify state_migrations forwarding in create_async_retriever
I didn’t spot any state_migrations argument being passed into the AsyncRetriever(...) call in create_async_retriever. Could you confirm that model.state_migrations is forwarded into the constructor? wdyt?

Points to check:

  • File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py around def create_async_retriever (lines ~3288–3588)
  • Ensure return AsyncRetriever(...) includes state_migrations=model.state_migrations

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@aldogonzalez8 aldogonzalez8 merged commit d6a087c into main May 16, 2025
30 of 32 checks passed
@aldogonzalez8 aldogonzalez8 deleted the ac8/fix-legacy-partition-migration-component branch May 16, 2025 09:56
@coderabbitai coderabbitai bot mentioned this pull request Aug 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants