Skip to content

Commit 5797a2f

Browse files
authored
feat(low-code CDK): Add ConditionalStreams component to low-code framework (#592)
1 parent 795a896 commit 5797a2f

File tree

7 files changed

+398
-14
lines changed

7 files changed

+398
-14
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,4 @@ repos:
7474
args: [--config-file=mypy.ini, --show-column-numbers]
7575
files: ^airbyte_cdk/
7676
pass_filenames: true
77+
additional_dependencies: ["types-requests", "types-PyYAML"]

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def _group_streams(
202202

203203
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
204204
# and this is validated during the initialization of the source.
205-
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
205+
streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs(
206206
self._source_config, config
207207
)
208208

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ properties:
2525
type: array
2626
items:
2727
anyOf:
28+
- "$ref": "#/definitions/ConditionalStreams"
2829
- "$ref": "#/definitions/DeclarativeStream"
2930
- "$ref": "#/definitions/StateDelegatingStream"
3031
dynamic_streams:
@@ -424,6 +425,36 @@ definitions:
424425
$parameters:
425426
type: object
426427
additionalProperties: true
428+
ConditionalStreams:
429+
title: Conditional Streams
430+
description: Streams that are only available while performing a connector operation when the condition is met.
431+
type: object
432+
required:
433+
- type
434+
- streams
435+
- condition
436+
properties:
437+
type:
438+
type: string
439+
enum: [ConditionalStreams]
440+
condition:
441+
title: Condition
442+
description: Condition that will be evaluated to determine if a set of streams should be available.
443+
type: string
444+
interpolation_context:
445+
- config
446+
- parameters
447+
examples:
448+
- "{{ config['is_sandbox'] }}"
449+
streams:
450+
title: Streams
451+
description: Streams that will be used during an operation based on the condition.
452+
type: array
453+
items:
454+
"$ref": "#/definitions/DeclarativeStream"
455+
$parameters:
456+
type: object
457+
additionalProperties: true
427458
ConstantBackoffStrategy:
428459
title: Constant Backoff
429460
description: Backoff strategy with a constant backoff interval.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
1212

1313
import orjson
1414
import yaml
@@ -35,6 +35,10 @@
3535
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
3636
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
3737
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
38+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
39+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
40+
ConditionalStreams as ConditionalStreamsModel,
41+
)
3842
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3943
DeclarativeStream as DeclarativeStreamModel,
4044
)
@@ -300,7 +304,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
300304
}
301305
)
302306

303-
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
307+
stream_configs = (
308+
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
309+
)
304310

305311
api_budget_model = self._source_config.get("api_budget")
306312
if api_budget_model:
@@ -319,7 +325,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
319325
)
320326
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
321327
]
322-
323328
return source_streams
324329

325330
@staticmethod
@@ -373,7 +378,6 @@ def update_with_cache_parent_configs(
373378
)
374379
else:
375380
stream_config["retriever"]["requester"]["use_cache"] = True
376-
377381
return stream_configs
378382

379383
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
@@ -477,12 +481,27 @@ def _parse_version(
477481
# No exception
478482
return parsed_version
479483

480-
def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
484+
def _stream_configs(
485+
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
486+
) -> List[Dict[str, Any]]:
481487
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
482-
stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
483-
for s in stream_configs:
484-
if "type" not in s:
485-
s["type"] = "DeclarativeStream"
488+
stream_configs = []
489+
for current_stream_config in manifest.get("streams", []):
490+
if (
491+
"type" in current_stream_config
492+
and current_stream_config["type"] == "ConditionalStreams"
493+
):
494+
interpolated_boolean = InterpolatedBoolean(
495+
condition=current_stream_config.get("condition"),
496+
parameters={},
497+
)
498+
499+
if interpolated_boolean.eval(config=config):
500+
stream_configs.extend(current_stream_config.get("streams", []))
501+
else:
502+
if "type" not in current_stream_config:
503+
current_stream_config["type"] = "DeclarativeStream"
504+
stream_configs.append(current_stream_config)
486505
return stream_configs
487506

488507
def _dynamic_stream_configs(

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2168,7 +2170,7 @@ class Config:
21682170

21692171
type: Literal["DeclarativeSource"]
21702172
check: Union[CheckStream, CheckDynamicStream]
2171-
streams: List[Union[DeclarativeStream, StateDelegatingStream]]
2173+
streams: List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]
21722174
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
21732175
version: str = Field(
21742176
...,
@@ -2201,7 +2203,9 @@ class Config:
22012203

22022204
type: Literal["DeclarativeSource"]
22032205
check: Union[CheckStream, CheckDynamicStream]
2204-
streams: Optional[List[Union[DeclarativeStream, StateDelegatingStream]]] = None
2206+
streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = (
2207+
None
2208+
)
22052209
dynamic_streams: List[DynamicDeclarativeStream]
22062210
version: str = Field(
22072211
...,
@@ -2280,6 +2284,22 @@ class Config:
22802284
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22812285

22822286

2287+
class ConditionalStreams(BaseModel):
2288+
type: Literal["ConditionalStreams"]
2289+
condition: str = Field(
2290+
...,
2291+
description="Condition that will be evaluated to determine if a set of streams should be available.",
2292+
examples=["{{ config['is_sandbox'] }}"],
2293+
title="Condition",
2294+
)
2295+
streams: List[DeclarativeStream] = Field(
2296+
...,
2297+
description="Streams that will be used during an operation based on the condition.",
2298+
title="Streams",
2299+
)
2300+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2301+
2302+
22832303
class FileUploader(BaseModel):
22842304
type: Literal["FileUploader"]
22852305
requester: Union[HttpRequester, CustomRequester] = Field(
@@ -2936,6 +2956,7 @@ class DynamicDeclarativeStream(BaseModel):
29362956
DeclarativeSource1.update_forward_refs()
29372957
DeclarativeSource2.update_forward_refs()
29382958
SelectiveAuthenticator.update_forward_refs()
2959+
ConditionalStreams.update_forward_refs()
29392960
FileUploader.update_forward_refs()
29402961
DeclarativeStream.update_forward_refs()
29412962
SessionTokenAuthenticator.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3150,12 +3150,12 @@ def _get_url() -> str:
31503150
This is needed because the URL is not set until the requester is created.
31513151
"""
31523152

3153-
_url = (
3153+
_url: str = (
31543154
model.requester.url
31553155
if hasattr(model.requester, "url") and model.requester.url is not None
31563156
else requester.get_url()
31573157
)
3158-
_url_base = (
3158+
_url_base: str = (
31593159
model.requester.url_base
31603160
if hasattr(model.requester, "url_base") and model.requester.url_base is not None
31613161
else requester.get_url_base()

0 commit comments

Comments
 (0)