Skip to content

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Sep 9, 2025

The existing abstract_source.py had functionality that would emit an INCOMPLETE status if a stream in the incoming configured catalog did not exist in the source. This PR adds this into the concurrent declarative source to emulate the same behavior.

A few things to note:

  • With this change we will always emit INCOMPLETE for all missing streams, but the sync will continue for other streams. This is a slight change from the legacy abstract source where the sync will end.
  • The existing behavior relied on raise_exception_on_missing_stream field to also surface a config error for missing streams. This was only set to true on 2 source (source-facebook-marketing, source-google-ads).
  • This change will only emit INCOMPLETE for declarative sources. I briefly looked into this for Python concurrent sources, but there a few iffy things about that approach because the underlying queue/message repository is private. I can look into this more, but right now we're targeting unblocking google ads

Summary by CodeRabbit

  • Bug Fixes

    • When a catalog references a stream that the source doesn't provide, the sync now emits an INCOMPLETE stream status message (instead of failing), improving error visibility.
  • Tests

    • Added a unit test that verifies an INCOMPLETE stream-status trace is emitted when a catalog contains a missing stream.

@brianjlai brianjlai requested review from maxi297 and tolik0 September 9, 2025 02:10
@github-actions github-actions bot added the enhancement New feature or request label Sep 9, 2025
Copy link

github-actions bot commented Sep 9, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@brian/missing_stream_emit_incomplete_status#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch brian/missing_stream_emit_incomplete_status

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link
Contributor

coderabbitai bot commented Sep 9, 2025

📝 Walkthrough

Walkthrough

Selects configured catalog streams against available streams using an instance method; when a configured stream is not present, the source emits an INCOMPLETE StreamStatus trace message (instead of raising) and returns matched streams.

Changes

Cohort / File(s) Summary
Core source logic
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Converted _select_streams from @staticmethod to instance method (self); on unmatched configured streams emit an INCOMPLETE AirbyteStreamStatus via message repository (using as_airbyte_message & StreamDescriptor) and continue returning matched streams; added imports for protocol dataclasses and stream status utilities.
Unit tests
unit_tests/sources/declarative/test_concurrent_declarative_source.py
Added test_catalog_contains_missing_stream_in_source (time-frozen) asserting the source queue contains a TRACE StreamStatus INCOMPLETE message for a catalog stream named missing; imported protocol dataclasses used in assertions.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Runner as test / runner
  participant CDS as ConcurrentDeclarativeSource
  participant Selector as _select_streams(self)
  participant Repo as MessageRepository / Queue

  Runner->>CDS: read(config, configured_catalog)
  CDS->>Selector: select_streams(available_streams, configured_catalog)
  alt configured stream matches available stream
    Selector-->>CDS: return matched stream
  else configured stream missing
    Selector-->>Repo: emit as_airbyte_message(StreamDescriptor, AirbyteStreamStatus.INCOMPLETE)
    Note right of Repo #DDEBF7: INCOMPLETE trace enqueued\n(uses as_airbyte_message)
    Selector-->>CDS: continue without that stream
  end
  CDS-->>Runner: yield messages (including emitted INCOMPLETE trace)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested reviewers

  • tolik0 — wdyt?
  • maxi297 — wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "feat(concurrent cdk): Emit incomplete status for missing streams" clearly and concisely summarizes the primary change — adding emission of an INCOMPLETE status for streams present in the catalog but missing from concurrent CDK sources — and follows conventional commit style without extraneous noise. It accurately reflects the changes in the diff and is specific enough for a reviewer scanning history to understand the main intent.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch brian/missing_stream_emit_incomplete_status

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

21-26: Avoid mixing protocol model flavors for Level

You’re importing Level from airbyte_protocol_dataclasses while most of this module uses airbyte_cdk.models types. To prevent subtle enum mismatches across repos, would you switch Level to the cdk models’ enum, wdyt?

-from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
+from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, StreamDescriptor
+from airbyte_cdk.models import Level

644-659: Include namespace in StreamDescriptor for precision

When emitting the config_error trace, could we include the namespace so downstream UIs/logs disambiguate streams with the same name across namespaces, wdyt?

-                stream_descriptor=StreamDescriptor(name=configured_stream.stream.name),
+                stream_descriptor=StreamDescriptor(
+                    name=configured_stream.stream.name,
+                    namespace=configured_stream.stream.namespace,
+                ),

639-659: Emit missing-stream signals once per name

If a catalog accidentally lists the same missing stream multiple times, you’ll emit duplicate INCOMPLETE + error traces. Want to guard with a small seen set, wdyt?

 def _select_streams(
     self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
 ) -> List[AbstractStream]:
     stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
     abstract_streams: List[AbstractStream] = []
+    seen_missing: set[str] = set()
     for configured_stream in configured_catalog.streams:
         stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
         if stream_instance:
             abstract_streams.append(stream_instance)
         else:
-            self._message_repository.emit_message(
-                as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
-            )
+            if configured_stream.stream.name not in seen_missing:
+                self._message_repository.emit_message(
+                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
+                )
 
-            missing_stream_exception = AirbyteTracedException(
+                missing_stream_exception = AirbyteTracedException(
                     message="A stream listed in your configuration was not found in the source. Please check the logs for more "
                     "details.",
                     internal_message=(
                         f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
                         f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
                     ),
                     failure_type=FailureType.config_error,
-                    stream_descriptor=StreamDescriptor(name=configured_stream.stream.name),
+                    stream_descriptor=StreamDescriptor(
+                        name=configured_stream.stream.name,
+                        namespace=configured_stream.stream.namespace,
+                    ),
                 )
-            self._message_repository.emit_message(missing_stream_exception.as_airbyte_message())
+                self._message_repository.emit_message(missing_stream_exception.as_airbyte_message())
+                seen_missing.add(configured_stream.stream.name)
 
     return abstract_streams
unit_tests/sources/declarative/test_concurrent_declarative_source.py (3)

22-27: Prefer one model family in tests to avoid equality surprises

You’re mixing airbyte_cdk.models.AirbyteMessage with dataclasses AirbyteTraceMessage/TraceType. To avoid model-flavor drift in equality checks, would you build both expected messages using the same helper used in prod (as_airbyte_message) and keep everything consistent, wdyt?

-from airbyte_protocol_dataclasses.models import (
-    AirbyteStreamStatus,
-    AirbyteStreamStatusTraceMessage,
-    AirbyteTraceMessage,
-    TraceType,
-)
+from airbyte_protocol_dataclasses.models import AirbyteStreamStatus
+from airbyte_cdk.utils.stream_status_utils import as_airbyte_message

1907-1931: Construct the first expected message via the same helper as prod

This avoids hand-rolling timestamps and model mismatches. Also makes the test resilient to any future serializer tweaks. Wdyt?

-    expected_messages = [
-        AirbyteMessage(
-            type=Type.TRACE,
-            trace=AirbyteTraceMessage(
-                type=TraceType.STREAM_STATUS,
-                stream_status=AirbyteStreamStatusTraceMessage(
-                    stream_descriptor=StreamDescriptor(name="missing"),
-                    status=AirbyteStreamStatus.INCOMPLETE,
-                ),
-                emitted_at=1735689600000.0,
-            ),
-        ),
+    expected_messages = [
+        as_airbyte_message(
+            AirbyteStream(name="missing", json_schema={}),
+            AirbyteStreamStatus.INCOMPLETE,
+        ),
         AirbyteTracedException(
             message="A stream listed in your configuration was not found in the source. Please check the logs for more "
             "details.",
             internal_message=(
                 "The stream 'missing' in your connection configuration was not found in the source. Refresh the schema in your replication settings and remove this stream from future sync attempts."
             ),
             failure_type=FailureType.config_error,
             stream_descriptor=StreamDescriptor(name="missing"),
         ).as_airbyte_message(stream_descriptor=StreamDescriptor(name="missing")),
     ]

1951-1954: Optional: assert no extra messages

After verifying the two expected messages, do you want to assert queue.empty() to ensure nothing else leaked into the queue, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ae0e8aa and a0ece6b.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/utils/traced_exception.py (3)
  • as_airbyte_message (52-81)
  • emit_message (93-100)
  • AirbyteTracedException (25-145)
airbyte_cdk/sources/message/repository.py (5)
  • emit_message (47-48)
  • emit_message (64-65)
  • emit_message (79-80)
  • emit_message (107-109)
  • emit_message (129-130)
airbyte_cdk/sources/message/concurrent_repository.py (1)
  • emit_message (32-35)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • FailureType (743-746)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (4)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (79-88)
airbyte_cdk/utils/traced_exception.py (2)
  • AirbyteTracedException (25-145)
  • as_airbyte_message (52-81)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • FailureType (743-746)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • streams (388-420)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Manifest Server Docker Image Build
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

91-91: LGTM: centralizing stream status emission

Importing as_airbyte_message is a good call and keeps status emission consistent. 👍


635-637: Signature change is reasonable

Converting _select_streams to an instance method to access the message repository makes sense.

Copy link

github-actions bot commented Sep 9, 2025

PyTest Results (Fast)

3 747 tests  +1   3 735 ✅ +1   6m 12s ⏱️ -6s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit fe55075. ± Comparison against base commit 9a075a1.

♻️ This comment has been updated with latest results.

Copy link

github-actions bot commented Sep 9, 2025

PyTest Results (Full)

3 750 tests  +1   3 738 ✅ +1   10m 56s ⏱️ -3s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit fe55075. ± Comparison against base commit 9a075a1.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one concern regarding how different the output would be from the non concurrent solution. Can we confirm this?

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

372-381: Missing-stream traces won’t be emitted when zero streams match; drain the queue or we’ll silently drop them

If all configured streams are missing, you enqueue INCOMPLETE traces during selection but skip self._concurrent_source.read(...) (to avoid the noted infinite loop). Nothing then drains self._concurrent_source._queue, so the platform never sees those traces. Could we drain the queue when selected_concurrent_streams is empty, as a minimal fix, and consider a proper flush() API on ConcurrentSource later, wdyt?

Suggested minimal fix:

- from queue import Queue
+ from queue import Queue, Empty
@@
         selected_concurrent_streams = self._select_streams(
             streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
             configured_catalog=catalog,
         )
 
-        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
-        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
-        if len(selected_concurrent_streams) > 0:
+        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
+        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
+        if len(selected_concurrent_streams) > 0:
             yield from self._concurrent_source.read(selected_concurrent_streams)
+        else:
+            # Drain any traces enqueued during selection (e.g., INCOMPLETE for missing streams)
+            while True:
+                try:
+                    yield self._concurrent_source._queue.get_nowait()
+                except Empty:
+                    break

Longer-term, would you like a small follow-up to add a flush_messages() on ConcurrentSource so we don’t reach into a private _queue, wdyt?

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

637-643: Avoid name-only matching; include namespace to disambiguate homonymous streams?

If two streams share the same name across namespaces, name-only matching can select the wrong stream. Would you consider keying by (name, namespace) with a name-only fallback for backward compatibility, wdyt?

-        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
+        # Prefer precise matching on (name, namespace); fall back to name-only for compatibility.
+        stream_by_name: Mapping[str, AbstractStream] = {s.name: s for s in streams}
+        stream_by_name_ns: Mapping[tuple[str, Optional[str]], AbstractStream] = {
+            (s.as_airbyte_stream().name, s.as_airbyte_stream().namespace): s for s in streams
+        }
@@
-            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
+            key = (configured_stream.stream.name, configured_stream.stream.namespace)
+            stream_instance = stream_by_name_ns.get(key) or stream_by_name.get(configured_stream.stream.name)

644-651: Emitting INCOMPLETE for missing streams is aligned with the stated intent; consider adding a reason and using a descriptor

This is great. Two optional tweaks:

  • Include a short human-readable reason to aid triage (if as_airbyte_message supports it).
  • Pass a StreamDescriptor(name, namespace) explicitly to avoid relying on implicit extraction. You already import it.

Would you like to fold in either of these for clarity, wdyt?


647-648: Tiny comment nit

“behavior,we” is missing a space after the comma. Want to fix while you’re here, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a0ece6b and fe55075.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
airbyte_cdk/sources/message/repository.py (5)
  • emit_message (47-48)
  • emit_message (64-65)
  • emit_message (79-80)
  • emit_message (107-109)
  • emit_message (129-130)
airbyte_cdk/sources/message/concurrent_repository.py (1)
  • emit_message (32-35)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

23-23: Imports for protocol enums look right

Brings in the exact types needed for status traces and log levels. LGTM.


91-91: Good call using the helper to build a stream-status trace

Centralizes protocol shape via as_airbyte_message; fewer chances to drift. LGTM.


634-636: Safe to convert — no external static call sites found

rg shows only an instance call (self._select_streams) at airbyte_cdk/sources/declarative/concurrent_declarative_source.py:372 and the method definition at ~lines 634–636; no ConcurrentDeclarativeSource._select_streams(...) usages were found. Safe to proceed, wdyt?

@brianjlai brianjlai merged commit 069a06e into main Sep 13, 2025
29 checks passed
@brianjlai brianjlai deleted the brian/missing_stream_emit_incomplete_status branch September 13, 2025 01:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants