diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 0d3e2052b..e63c200c1 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -62,10 +62,10 @@ def should_normalize_manifest(config: Mapping[str, Any]) -> bool: def create_source( config: Mapping[str, Any], - limits: TestLimits, - catalog: Optional[ConfiguredAirbyteCatalog], - state: Optional[List[AirbyteStateMessage]], -) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: + limits: TestLimits | None = None, + catalog: ConfiguredAirbyteCatalog | None = None, + state: List[AirbyteStateMessage] | None = None, +) -> ConcurrentDeclarativeSource: manifest = config["__injected_declarative_manifest"] # We enforce a concurrency level of 1 so that the stream is processed on a single thread @@ -88,7 +88,7 @@ def create_source( def read_stream( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, state: List[AirbyteStateMessage], @@ -127,7 +127,7 @@ def read_stream( def resolve_manifest( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, ) -> AirbyteMessage: try: return AirbyteMessage( @@ -146,7 +146,7 @@ def resolve_manifest( def full_resolve_manifest( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], limits: TestLimits + source: ConcurrentDeclarativeSource, limits: TestLimits ) -> AirbyteMessage: try: manifest = {**source.resolved_manifest} diff --git a/airbyte_cdk/connector_builder/main.py b/airbyte_cdk/connector_builder/main.py index 207831c3c..06e0e3d09 100644 --- a/airbyte_cdk/connector_builder/main.py +++ b/airbyte_cdk/connector_builder/main.py @@ -34,7 +34,7 @@ def get_config_and_catalog_from_args( args: List[str], -) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]: +) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]]: # TODO: Add functionality for the `debug` logger. # Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`. parsed_args = AirbyteEntrypoint.parse_args(args) @@ -70,7 +70,7 @@ def get_config_and_catalog_from_args( def handle_connector_builder_request( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, command: str, config: Mapping[str, Any], catalog: Optional[ConfiguredAirbyteCatalog], diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index aab700951..db7d2d14a 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -85,7 +85,7 @@ def __init__( def run_test_read( self, - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, stream_name: str, @@ -383,7 +383,7 @@ def _get_latest_config_update( def _read_stream( self, - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, state: List[AirbyteStateMessage], diff --git a/airbyte_cdk/manifest_server/command_processor/processor.py b/airbyte_cdk/manifest_server/command_processor/processor.py index 16d14a799..34816ec55 100644 --- a/airbyte_cdk/manifest_server/command_processor/processor.py +++ b/airbyte_cdk/manifest_server/command_processor/processor.py @@ -21,12 +21,10 @@ class ManifestCommandProcessor: - _source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]] + _source: ConcurrentDeclarativeSource _logger = logging.getLogger("airbyte.manifest-server") - def __init__( - self, source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]] - ) -> None: + def __init__(self, source: ConcurrentDeclarativeSource) -> None: self._source = source def test_read( diff --git a/airbyte_cdk/manifest_server/command_processor/utils.py b/airbyte_cdk/manifest_server/command_processor/utils.py index 125a977c3..3aef171e0 100644 --- a/airbyte_cdk/manifest_server/command_processor/utils.py +++ b/airbyte_cdk/manifest_server/command_processor/utils.py @@ -63,7 +63,7 @@ def build_source( record_limit: Optional[int] = None, page_limit: Optional[int] = None, slice_limit: Optional[int] = None, -) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: +) -> ConcurrentDeclarativeSource: # We enforce a concurrency level of 1 so that the stream is processed on a single thread # to retain ordering for the grouping of the builder message responses. definition = copy.deepcopy(manifest) diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index 48799ddc1..035058ec1 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -40,7 +40,7 @@ def safe_build_source( page_limit: Optional[int] = None, slice_limit: Optional[int] = None, record_limit: Optional[int] = None, -) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: +) -> ConcurrentDeclarativeSource: """Wrapper around build_source that converts ValidationError to HTTPException.""" try: return build_source( diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 16ff94abf..2ebd8fec4 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -162,16 +162,17 @@ def _get_declarative_component_schema() -> Dict[str, Any]: # is no longer inherited from since the only external dependency is from that class. # # todo: It is worth investigating removal of the Generic[TState] since it will always be Optional[List[AirbyteStateMessage]] -class ConcurrentDeclarativeSource(AbstractSource, Generic[TState]): +class ConcurrentDeclarativeSource(AbstractSource): # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock # because it has hit the limit of futures but not partition reader is consuming them. _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 def __init__( self, - catalog: Optional[ConfiguredAirbyteCatalog], - config: Optional[Mapping[str, Any]], - state: TState, + catalog: Optional[ConfiguredAirbyteCatalog] = None, + config: Optional[Mapping[str, Any]] = None, + state: Optional[List[AirbyteStateMessage]] = None, + *, source_config: ConnectionDefinition, debug: bool = False, emit_connector_builder_messages: bool = False, diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 003578738..7af07a5c8 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -14,7 +14,7 @@ from airbyte_cdk.sources.types import ConnectionDefinition -class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]): +class YamlDeclarativeSource(ConcurrentDeclarativeSource): """Declarative source defined by a yaml file""" def __init__( diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index ad6735201..0a03c2e53 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -2219,7 +2219,12 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess ) config = {} state = {} - source = ConcurrentDeclarativeSource(catalog, config, state, manifest) + source = ConcurrentDeclarativeSource( + catalog=catalog, + config=config, + source_config=manifest, + state=state, + ) return list(source.read(logger, {}, catalog, state))