Skip to content

Commit 2a9e49f

Browse files
committed
Merge branch 'main' into brian/property_chunking
2 parents 9c41d67 + caa1e7d commit 2a9e49f

File tree

14 files changed

+832
-125
lines changed

14 files changed

+832
-125
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
from dataclasses import asdict, dataclass, field
7-
from typing import Any, List, Mapping
7+
from typing import Any, Dict, List, Mapping
88

99
from airbyte_cdk.connector_builder.test_reader import TestReader
1010
from airbyte_cdk.models import (
@@ -27,30 +27,34 @@
2727
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
2828
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
2929
DEFAULT_MAXIMUM_RECORDS = 100
30+
DEFAULT_MAXIMUM_STREAMS = 100
3031

3132
MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
3233
MAX_SLICES_KEY = "max_slices"
3334
MAX_RECORDS_KEY = "max_records"
35+
MAX_STREAMS_KEY = "max_streams"
3436

3537

3638
@dataclass
37-
class TestReadLimits:
39+
class TestLimits:
3840
max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
3941
max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
4042
max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
43+
max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
4144

4245

43-
def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
46+
def get_limits(config: Mapping[str, Any]) -> TestLimits:
4447
command_config = config.get("__test_read_config", {})
4548
max_pages_per_slice = (
4649
command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
4750
)
4851
max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
4952
max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
50-
return TestReadLimits(max_records, max_pages_per_slice, max_slices)
53+
max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS
54+
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
5155

5256

53-
def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource:
57+
def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
5458
manifest = config["__injected_declarative_manifest"]
5559
return ManifestDeclarativeSource(
5660
config=config,
@@ -71,7 +75,7 @@ def read_stream(
7175
config: Mapping[str, Any],
7276
configured_catalog: ConfiguredAirbyteCatalog,
7377
state: List[AirbyteStateMessage],
74-
limits: TestReadLimits,
78+
limits: TestLimits,
7579
) -> AirbyteMessage:
7680
try:
7781
test_read_handler = TestReader(
@@ -117,13 +121,23 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
117121
return error.as_airbyte_message()
118122

119123

120-
def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
124+
def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
121125
try:
122126
manifest = {**source.resolved_manifest}
123127
streams = manifest.get("streams", [])
124128
for stream in streams:
125129
stream["dynamic_stream_name"] = None
126-
streams.extend(source.dynamic_streams)
130+
131+
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
132+
for stream in source.dynamic_streams:
133+
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
134+
135+
if len(generated_streams) < limits.max_streams:
136+
generated_streams += [stream]
137+
138+
for generated_streams_list in mapped_streams.values():
139+
streams.extend(generated_streams_list)
140+
127141
manifest["streams"] = streams
128142
return AirbyteMessage(
129143
type=Type.RECORD,

airbyte_cdk/connector_builder/main.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from airbyte_cdk.connector import BaseConnector
1212
from airbyte_cdk.connector_builder.connector_builder_handler import (
13-
TestReadLimits,
13+
TestLimits,
1414
create_source,
1515
full_resolve_manifest,
1616
get_limits,
@@ -73,7 +73,7 @@ def handle_connector_builder_request(
7373
config: Mapping[str, Any],
7474
catalog: Optional[ConfiguredAirbyteCatalog],
7575
state: List[AirbyteStateMessage],
76-
limits: TestReadLimits,
76+
limits: TestLimits,
7777
) -> AirbyteMessage:
7878
if command == "resolve_manifest":
7979
return resolve_manifest(source)
@@ -83,7 +83,7 @@ def handle_connector_builder_request(
8383
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
8484
return read_stream(source, config, catalog, state, limits)
8585
elif command == "full_resolve_manifest":
86-
return full_resolve_manifest(source)
86+
return full_resolve_manifest(source, limits)
8787
else:
8888
raise ValueError(f"Unrecognized command {command}.")
8989

airbyte_cdk/sources/declarative/checks/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
from pydantic.v1 import BaseModel
88

99
from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
10-
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
10+
from airbyte_cdk.sources.declarative.checks.check_stream import (
11+
CheckStream,
12+
DynamicStreamCheckConfig,
13+
)
1114
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1215
from airbyte_cdk.sources.declarative.models import (
1316
CheckDynamicStream as CheckDynamicStreamModel,
@@ -21,4 +24,4 @@
2124
"CheckDynamicStream": CheckDynamicStreamModel,
2225
}
2326

24-
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
27+
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 113 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,23 @@
55
import logging
66
import traceback
77
from dataclasses import InitVar, dataclass
8-
from typing import Any, List, Mapping, Tuple
8+
from typing import Any, Dict, List, Mapping, Optional, Tuple
99

1010
from airbyte_cdk import AbstractSource
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1212
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1313

1414

15+
@dataclass(frozen=True)
16+
class DynamicStreamCheckConfig:
17+
"""Defines the configuration for dynamic stream during connection checking. This class specifies
18+
what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation
19+
and type enforcement."""
20+
21+
dynamic_stream_name: str
22+
stream_count: int = 0
23+
24+
1525
@dataclass
1626
class CheckStream(ConnectionChecker):
1727
"""
@@ -23,34 +33,126 @@ class CheckStream(ConnectionChecker):
2333

2434
stream_names: List[str]
2535
parameters: InitVar[Mapping[str, Any]]
36+
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
2637

2738
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2839
self._parameters = parameters
40+
if self.dynamic_streams_check_configs is None:
41+
self.dynamic_streams_check_configs = []
42+
43+
def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]:
44+
"""Logs an error and returns a formatted error message."""
45+
error_message = f"Encountered an error while {action}. Error: {error}"
46+
logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True)
47+
return False, error_message
2948

3049
def check_connection(
3150
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3251
) -> Tuple[bool, Any]:
33-
streams = source.streams(config=config)
52+
"""Checks the connection to the source and its streams."""
53+
try:
54+
streams = source.streams(config=config)
55+
if not streams:
56+
return False, f"No streams to connect to from source {source}"
57+
except Exception as error:
58+
return self._log_error(logger, "discovering streams", error)
59+
3460
stream_name_to_stream = {s.name: s for s in streams}
35-
if len(streams) == 0:
36-
return False, f"No streams to connect to from source {source}"
3761
for stream_name in self.stream_names:
38-
if stream_name not in stream_name_to_stream.keys():
62+
if stream_name not in stream_name_to_stream:
3963
raise ValueError(
40-
f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}."
64+
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
4165
)
4266

67+
stream_availability, message = self._check_stream_availability(
68+
stream_name_to_stream, stream_name, logger
69+
)
70+
if not stream_availability:
71+
return stream_availability, message
72+
73+
should_check_dynamic_streams = (
74+
hasattr(source, "resolved_manifest")
75+
and hasattr(source, "dynamic_streams")
76+
and self.dynamic_streams_check_configs
77+
)
78+
79+
if should_check_dynamic_streams:
80+
return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
81+
82+
return True, None
83+
84+
def _check_stream_availability(
85+
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
86+
) -> Tuple[bool, Any]:
87+
"""Checks if streams are available."""
88+
availability_strategy = HttpAvailabilityStrategy()
89+
try:
4390
stream = stream_name_to_stream[stream_name]
44-
availability_strategy = HttpAvailabilityStrategy()
91+
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
92+
if not stream_is_available:
93+
message = f"Stream {stream_name} is not available: {reason}"
94+
logger.warning(message)
95+
return stream_is_available, message
96+
except Exception as error:
97+
return self._log_error(logger, f"checking availability of stream {stream_name}", error)
98+
return True, None
99+
100+
def _check_dynamic_streams_availability(
101+
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
102+
) -> Tuple[bool, Any]:
103+
"""Checks the availability of dynamic streams."""
104+
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
105+
dynamic_stream_name_to_dynamic_stream = {
106+
ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
107+
}
108+
generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
109+
110+
for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
111+
if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream:
112+
return (
113+
False,
114+
f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.",
115+
)
116+
117+
generated = generated_streams.get(check_config.dynamic_stream_name, [])
118+
stream_availability, message = self._check_generated_streams_availability(
119+
generated, stream_name_to_stream, logger, check_config.stream_count
120+
)
121+
if not stream_availability:
122+
return stream_availability, message
123+
124+
return True, None
125+
126+
def _map_generated_streams(
127+
self, dynamic_streams: List[Dict[str, Any]]
128+
) -> Dict[str, List[Dict[str, Any]]]:
129+
"""Maps dynamic stream names to their corresponding generated streams."""
130+
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
131+
for stream in dynamic_streams:
132+
mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream)
133+
return mapped_streams
134+
135+
def _check_generated_streams_availability(
136+
self,
137+
generated_streams: List[Dict[str, Any]],
138+
stream_name_to_stream: Dict[str, Any],
139+
logger: logging.Logger,
140+
max_count: int,
141+
) -> Tuple[bool, Any]:
142+
"""Checks availability of generated dynamic streams."""
143+
availability_strategy = HttpAvailabilityStrategy()
144+
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
145+
stream = stream_name_to_stream[declarative_stream["name"]]
45146
try:
46147
stream_is_available, reason = availability_strategy.check_availability(
47148
stream, logger
48149
)
49150
if not stream_is_available:
50-
return False, reason
151+
message = f"Dynamic Stream {stream.name} is not available: {reason}"
152+
logger.warning(message)
153+
return False, message
51154
except Exception as error:
52-
logger.error(
53-
f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
155+
return self._log_error(
156+
logger, f"checking availability of dynamic stream {stream.name}", error
54157
)
55-
return False, f"Unable to connect to stream {stream_name} - {error}"
56158
return True, None

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,6 @@ definitions:
316316
type: object
317317
required:
318318
- type
319-
- stream_names
320319
properties:
321320
type:
322321
type: string
@@ -330,6 +329,28 @@ definitions:
330329
examples:
331330
- ["users"]
332331
- ["users", "contacts"]
332+
dynamic_streams_check_configs:
333+
type: array
334+
items:
335+
"$ref": "#/definitions/DynamicStreamCheckConfig"
336+
DynamicStreamCheckConfig:
337+
type: object
338+
required:
339+
- type
340+
- dynamic_stream_name
341+
properties:
342+
type:
343+
type: string
344+
enum: [ DynamicStreamCheckConfig ]
345+
dynamic_stream_name:
346+
title: Dynamic Stream Name
347+
description: The dynamic stream name.
348+
type: string
349+
stream_count:
350+
title: Stream Count
351+
description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.
352+
type: integer
353+
default: 0
333354
CheckDynamicStream:
334355
title: Dynamic Streams to Check
335356
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,12 @@ def _dynamic_stream_configs(
397397
for dynamic_stream in components_resolver.resolve_components(
398398
stream_template_config=stream_template_config
399399
):
400+
dynamic_stream = {
401+
**ManifestComponentTransformer().propagate_types_and_parameters(
402+
"", dynamic_stream, {}, use_parent_parameters=True
403+
)
404+
}
405+
400406
if "type" not in dynamic_stream:
401407
dynamic_stream["type"] = "DeclarativeStream"
402408

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ class BearerAuthenticator(BaseModel):
4444
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
4545

4646

47-
class CheckStream(BaseModel):
48-
type: Literal["CheckStream"]
49-
stream_names: List[str] = Field(
50-
...,
51-
description="Names of the streams to try reading from when running a check operation.",
52-
examples=[["users"], ["users", "contacts"]],
53-
title="Stream Names",
47+
class DynamicStreamCheckConfig(BaseModel):
48+
type: Literal["DynamicStreamCheckConfig"]
49+
dynamic_stream_name: str = Field(
50+
..., description="The dynamic stream name.", title="Dynamic Stream Name"
51+
)
52+
stream_count: Optional[int] = Field(
53+
0,
54+
description="Numbers of the streams to try reading from when running a check operation.",
55+
title="Stream Count",
5456
)
5557

5658

@@ -1568,6 +1570,17 @@ class AuthFlow(BaseModel):
15681570
oauth_config_specification: Optional[OAuthConfigSpecification] = None
15691571

15701572

1573+
class CheckStream(BaseModel):
1574+
type: Literal["CheckStream"]
1575+
stream_names: Optional[List[str]] = Field(
1576+
None,
1577+
description="Names of the streams to try reading from when running a check operation.",
1578+
examples=[["users"], ["users", "contacts"]],
1579+
title="Stream Names",
1580+
)
1581+
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
1582+
1583+
15711584
class IncrementingCountCursor(BaseModel):
15721585
type: Literal["IncrementingCountCursor"]
15731586
cursor_field: str = Field(

0 commit comments

Comments
 (0)