Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
Expand Down Expand Up @@ -76,11 +77,17 @@ def primary_key(self, value: str) -> None:

@property
def exit_on_rate_limit(self) -> bool:
if isinstance(self.retriever, AsyncRetriever):
return self.retriever.exit_on_rate_limit

return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute

@exit_on_rate_limit.setter
def exit_on_rate_limit(self, value: bool) -> None:
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]
if isinstance(self.retriever, AsyncRetriever):
self.retriever.exit_on_rate_limit = value
else:
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]

@property # type: ignore
def name(self) -> str:
Expand Down
30 changes: 30 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/async_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,36 @@ class AsyncRetriever(Retriever):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

@property
def exit_on_rate_limit(self) -> bool:
"""
Whether to exit on rate limit. This is a property of the job repository
and not the stream slicer. The stream slicer is responsible for creating
the jobs, but the job repository is responsible for managing the rate
limits and other job-related properties.

Note:
- If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
- If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
to complete the results.
"""
job_orchestrator = self.stream_slicer._job_orchestrator
if job_orchestrator is None:
# Default value when orchestrator is not available
return False
return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore

@exit_on_rate_limit.setter
def exit_on_rate_limit(self, value: bool) -> None:
"""
Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
meaning that the Job cannot be placed / created if the rate limit is reached.
Thus no further work on managing jobs is expected to be done.
"""
job_orchestrator = self.stream_slicer._job_orchestrator
if job_orchestrator is not None:
job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment]

@property
def state(self) -> StreamState:
"""
Expand Down
Loading