From b3da6d36e93ffa88250efd10263acc3b12267d50 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 13 Mar 2025 16:05:33 +0200 Subject: [PATCH 1/2] fix --- .../sources/declarative/declarative_stream.py | 9 ++++++- .../declarative/retrievers/async_retriever.py | 24 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte_cdk/sources/declarative/declarative_stream.py index f7b97f3b4..0ae117459 100644 --- a/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte_cdk/sources/declarative/declarative_stream.py @@ -14,6 +14,7 @@ from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader @@ -76,11 +77,17 @@ def primary_key(self, value: str) -> None: @property def exit_on_rate_limit(self) -> bool: + if isinstance(self.retriever, AsyncRetriever): + return self.retriever.exit_on_rate_limit + return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute @exit_on_rate_limit.setter def exit_on_rate_limit(self, value: bool) -> None: - self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] + if isinstance(self.retriever, AsyncRetriever): + self.retriever.exit_on_rate_limit = value + else: + self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] @property # type: ignore def name(self) -> str: diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 7dad06a54..970b14cc2 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -36,6 +36,30 @@ class AsyncRetriever(Retriever): def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters + @property + def exit_on_rate_limit(self) -> bool: + """ + Whether to exit on rate limit. This is a property of the job repository + and not the stream slicer. The stream slicer is responsible for creating + the jobs, but the job repository is responsible for managing the rate + limits and other job-related properties. + + Note: + - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits + - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage + to complete the results. + """ + return self.stream_slicer._job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore[return-value] + + @exit_on_rate_limit.setter + def exit_on_rate_limit(self, value: bool) -> None: + """ + Sets the `exit_on_rate_limit` property of the job repository > creation_requester, + meaning that the Job cannot be placed / created if the rate limit is reached. + Thus no futher work on managing jobs is expected to be done. + """ + self.stream_slicer._job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[assignment] + @property def state(self) -> StreamState: """ From 8e0881fa5e0ba025fe72da9c425c362af5bd3435 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 13 Mar 2025 16:24:35 +0200 Subject: [PATCH 2/2] updated / formatted --- .../declarative/retrievers/async_retriever.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 970b14cc2..c0728d438 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -49,16 +49,22 @@ def exit_on_rate_limit(self) -> bool: - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage to complete the results. """ - return self.stream_slicer._job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore[return-value] + job_orchestrator = self.stream_slicer._job_orchestrator + if job_orchestrator is None: + # Default value when orchestrator is not available + return False + return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore @exit_on_rate_limit.setter def exit_on_rate_limit(self, value: bool) -> None: """ Sets the `exit_on_rate_limit` property of the job repository > creation_requester, meaning that the Job cannot be placed / created if the rate limit is reached. - Thus no futher work on managing jobs is expected to be done. + Thus no further work on managing jobs is expected to be done. """ - self.stream_slicer._job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[assignment] + job_orchestrator = self.stream_slicer._job_orchestrator + if job_orchestrator is not None: + job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] @property def state(self) -> StreamState: