diff --git a/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte_cdk/sources/declarative/declarative_stream.py index f7b97f3b4..0ae117459 100644 --- a/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte_cdk/sources/declarative/declarative_stream.py @@ -14,6 +14,7 @@ from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader @@ -76,11 +77,17 @@ def primary_key(self, value: str) -> None: @property def exit_on_rate_limit(self) -> bool: + if isinstance(self.retriever, AsyncRetriever): + return self.retriever.exit_on_rate_limit + return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute @exit_on_rate_limit.setter def exit_on_rate_limit(self, value: bool) -> None: - self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] + if isinstance(self.retriever, AsyncRetriever): + self.retriever.exit_on_rate_limit = value + else: + self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] @property # type: ignore def name(self) -> str: diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 7dad06a54..c0728d438 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -36,6 +36,36 @@ class AsyncRetriever(Retriever): def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters + @property + def exit_on_rate_limit(self) -> bool: + """ + Whether to exit on rate limit. This is a property of the job repository + and not the stream slicer. The stream slicer is responsible for creating + the jobs, but the job repository is responsible for managing the rate + limits and other job-related properties. + + Note: + - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits + - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage + to complete the results. + """ + job_orchestrator = self.stream_slicer._job_orchestrator + if job_orchestrator is None: + # Default value when orchestrator is not available + return False + return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore + + @exit_on_rate_limit.setter + def exit_on_rate_limit(self, value: bool) -> None: + """ + Sets the `exit_on_rate_limit` property of the job repository > creation_requester, + meaning that the Job cannot be placed / created if the rate limit is reached. + Thus no further work on managing jobs is expected to be done. + """ + job_orchestrator = self.stream_slicer._job_orchestrator + if job_orchestrator is not None: + job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] + @property def state(self) -> StreamState: """