Skip to content

Commit e5a1fc2

Browse files
fix(concurrent_declarative_source): set None defaults for optional constructor args, remove unnecessary Generic implementation (#738)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent c4ff128 commit e5a1fc2

File tree

9 files changed

+27
-23
lines changed

9 files changed

+27
-23
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
6262

6363
def create_source(
6464
config: Mapping[str, Any],
65-
limits: TestLimits,
66-
catalog: Optional[ConfiguredAirbyteCatalog],
67-
state: Optional[List[AirbyteStateMessage]],
68-
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
65+
limits: TestLimits | None = None,
66+
catalog: ConfiguredAirbyteCatalog | None = None,
67+
state: List[AirbyteStateMessage] | None = None,
68+
) -> ConcurrentDeclarativeSource:
6969
manifest = config["__injected_declarative_manifest"]
7070

7171
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
@@ -88,7 +88,7 @@ def create_source(
8888

8989

9090
def read_stream(
91-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
91+
source: ConcurrentDeclarativeSource,
9292
config: Mapping[str, Any],
9393
configured_catalog: ConfiguredAirbyteCatalog,
9494
state: List[AirbyteStateMessage],
@@ -127,7 +127,7 @@ def read_stream(
127127

128128

129129
def resolve_manifest(
130-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
130+
source: ConcurrentDeclarativeSource,
131131
) -> AirbyteMessage:
132132
try:
133133
return AirbyteMessage(
@@ -146,7 +146,7 @@ def resolve_manifest(
146146

147147

148148
def full_resolve_manifest(
149-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], limits: TestLimits
149+
source: ConcurrentDeclarativeSource, limits: TestLimits
150150
) -> AirbyteMessage:
151151
try:
152152
manifest = {**source.resolved_manifest}

airbyte_cdk/connector_builder/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
def get_config_and_catalog_from_args(
3636
args: List[str],
37-
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
37+
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]]:
3838
# TODO: Add functionality for the `debug` logger.
3939
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
4040
parsed_args = AirbyteEntrypoint.parse_args(args)
@@ -70,7 +70,7 @@ def get_config_and_catalog_from_args(
7070

7171

7272
def handle_connector_builder_request(
73-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
73+
source: ConcurrentDeclarativeSource,
7474
command: str,
7575
config: Mapping[str, Any],
7676
catalog: Optional[ConfiguredAirbyteCatalog],

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __init__(
8585

8686
def run_test_read(
8787
self,
88-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
88+
source: ConcurrentDeclarativeSource,
8989
config: Mapping[str, Any],
9090
configured_catalog: ConfiguredAirbyteCatalog,
9191
stream_name: str,
@@ -383,7 +383,7 @@ def _get_latest_config_update(
383383

384384
def _read_stream(
385385
self,
386-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
386+
source: ConcurrentDeclarativeSource,
387387
config: Mapping[str, Any],
388388
configured_catalog: ConfiguredAirbyteCatalog,
389389
state: List[AirbyteStateMessage],

airbyte_cdk/manifest_server/command_processor/processor.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@
2121

2222

2323
class ManifestCommandProcessor:
24-
_source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
24+
_source: ConcurrentDeclarativeSource
2525
_logger = logging.getLogger("airbyte.manifest-server")
2626

27-
def __init__(
28-
self, source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
29-
) -> None:
27+
def __init__(self, source: ConcurrentDeclarativeSource) -> None:
3028
self._source = source
3129

3230
def test_read(

airbyte_cdk/manifest_server/command_processor/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def build_source(
6363
record_limit: Optional[int] = None,
6464
page_limit: Optional[int] = None,
6565
slice_limit: Optional[int] = None,
66-
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
66+
) -> ConcurrentDeclarativeSource:
6767
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
6868
# to retain ordering for the grouping of the builder message responses.
6969
definition = copy.deepcopy(manifest)

airbyte_cdk/manifest_server/routers/manifest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def safe_build_source(
4040
page_limit: Optional[int] = None,
4141
slice_limit: Optional[int] = None,
4242
record_limit: Optional[int] = None,
43-
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
43+
) -> ConcurrentDeclarativeSource:
4444
"""Wrapper around build_source that converts ValidationError to HTTPException."""
4545
try:
4646
return build_source(

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,16 +162,17 @@ def _get_declarative_component_schema() -> Dict[str, Any]:
162162
# is no longer inherited from since the only external dependency is from that class.
163163
#
164164
# todo: It is worth investigating removal of the Generic[TState] since it will always be Optional[List[AirbyteStateMessage]]
165-
class ConcurrentDeclarativeSource(AbstractSource, Generic[TState]):
165+
class ConcurrentDeclarativeSource(AbstractSource):
166166
# By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
167167
# because it has hit the limit of futures but not partition reader is consuming them.
168168
_LOWEST_SAFE_CONCURRENCY_LEVEL = 2
169169

170170
def __init__(
171171
self,
172-
catalog: Optional[ConfiguredAirbyteCatalog],
173-
config: Optional[Mapping[str, Any]],
174-
state: TState,
172+
catalog: Optional[ConfiguredAirbyteCatalog] = None,
173+
config: Optional[Mapping[str, Any]] = None,
174+
state: Optional[List[AirbyteStateMessage]] = None,
175+
*,
175176
source_config: ConnectionDefinition,
176177
debug: bool = False,
177178
emit_connector_builder_messages: bool = False,

airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from airbyte_cdk.sources.types import ConnectionDefinition
1515

1616

17-
class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
17+
class YamlDeclarativeSource(ConcurrentDeclarativeSource):
1818
"""Declarative source defined by a yaml file"""
1919

2020
def __init__(

unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2219,7 +2219,12 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess
22192219
)
22202220
config = {}
22212221
state = {}
2222-
source = ConcurrentDeclarativeSource(catalog, config, state, manifest)
2222+
source = ConcurrentDeclarativeSource(
2223+
catalog=catalog,
2224+
config=config,
2225+
source_config=manifest,
2226+
state=state,
2227+
)
22232228
return list(source.read(logger, {}, catalog, state))
22242229

22252230

0 commit comments

Comments
 (0)