Skip to content

Commit 964441e

Browse files
committed
Automatically retry on 429 Too Many Requests
refs #441, #764
1 parent c6bc1c9 commit 964441e

File tree

7 files changed

+379
-6
lines changed

7 files changed

+379
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- Support `collection_property` based property filtering in `load_stac` ([#246](https://github.com/Open-EO/openeo-python-client/issues/246))
1313
- Add `validate()` method to `SaveResult`, `VectorCube`, `MlModel` and `StacResource` classes ([#766](https://github.com/Open-EO/openeo-python-client/issues/766))
14+
- Automatically retry on `429 Too Many Requests` HTTP errors ([#441](https://github.com/Open-EO/openeo-python-client/issues/441))
1415

1516
### Changed
1617

openeo/extra/job_management/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ def _make_resilient(connection):
284284
503 Service Unavailable
285285
504 Gateway Timeout
286286
"""
287-
# TODO: refactor this helper out of this class and unify with `openeo_driver.util.http.requests_with_retry`
287+
# TODO: migrate this to now built-in retry configuration of `Connection` or `openeo.util.http.retry_adapter`?
288288
status_forcelist = [500, 502, 503, 504]
289289
retries = Retry(
290290
total=MAX_RETRIES,

openeo/rest/_connection.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
from typing import Iterable, Optional, Union
66

77
import requests
8+
import requests.adapters
89
from requests import Response
910
from requests.auth import AuthBase
1011

1112
import openeo
1213
from openeo.rest import OpenEoApiError, OpenEoApiPlainError, OpenEoRestError
1314
from openeo.rest.auth.auth import NullAuth
1415
from openeo.util import ContextTimer, ensure_list, str_truncate, url_join
16+
from openeo.utils.http import session_with_retries
1517

1618
_log = logging.getLogger(__name__)
1719

@@ -26,15 +28,22 @@ class RestApiConnection:
2628
def __init__(
2729
self,
2830
root_url: str,
31+
*,
2932
auth: Optional[AuthBase] = None,
3033
session: Optional[requests.Session] = None,
3134
default_timeout: Optional[int] = None,
3235
slow_response_threshold: Optional[float] = None,
36+
retry: Union[requests.adapters.Retry, dict, bool, None] = None,
3337
):
3438
self._root_url = root_url
3539
self._auth = None
3640
self.auth = auth or NullAuth()
37-
self.session = session or requests.Session()
41+
if session:
42+
self.session = session
43+
elif retry is not False:
44+
self.session = session_with_retries(retry=retry)
45+
else:
46+
self.session = requests.Session()
3847
self.default_timeout = default_timeout or DEFAULT_TIMEOUT
3948
self.default_headers = {
4049
"User-Agent": "openeo-python-client/{cv} {py}/{pv} {pl}".format(

openeo/rest/connection.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
)
2929

3030
import requests
31+
import requests.adapters
3132
import shapely.geometry.base
3233
from requests.auth import AuthBase, HTTPBasicAuth
3334

@@ -114,6 +115,16 @@ class Connection(RestApiConnection):
114115
optional :class:`OidcAuthenticator` object to use for renewing OIDC tokens.
115116
:param auth: Optional ``requests.auth.AuthBase`` object to use for requests.
116117
Usage of this parameter is deprecated, use the specific authentication methods instead.
118+
:param retry: general request retry settings, can be specified as:
119+
120+
- :py:class:`requests.adapters.Retry` object
121+
or a dictionary with corresponding keyword arguments
122+
(e.g. ``total``, ``backoff_factor``, ``status_forcelist``, ...)
123+
- ``None`` (default) to use default openEO-oriented retry settings
124+
- ``False`` to disable retrying requests
125+
126+
.. versionchanged:: 0.41.0
127+
Added ``retry`` argument.
117128
"""
118129

119130
_MINIMUM_API_VERSION = ComparableVersion("1.0.0")
@@ -130,6 +141,7 @@ def __init__(
130141
refresh_token_store: Optional[RefreshTokenStore] = None,
131142
oidc_auth_renewer: Optional[OidcAuthenticator] = None,
132143
auth: Optional[AuthBase] = None,
144+
retry: Union[requests.adapters.Retry, dict, bool, None] = None,
133145
):
134146
if "://" not in url:
135147
url = "https://" + url
@@ -139,6 +151,7 @@ def __init__(
139151
root_url=self.version_discovery(url, session=session, timeout=default_timeout),
140152
auth=auth, session=session, default_timeout=default_timeout,
141153
slow_response_threshold=slow_response_threshold,
154+
retry=retry,
142155
)
143156

144157
# Initial API version check.
@@ -1885,6 +1898,7 @@ def connect(
18851898
session: Optional[requests.Session] = None,
18861899
default_timeout: Optional[int] = None,
18871900
auto_validate: bool = True,
1901+
retry: Union[requests.adapters.Retry, dict, bool, None] = None,
18881902
) -> Connection:
18891903
"""
18901904
This method is the entry point to OpenEO.
@@ -1904,9 +1918,19 @@ def connect(
19041918
:param auth_options: Options/arguments specific to the authentication type
19051919
:param default_timeout: default timeout (in seconds) for requests
19061920
:param auto_validate: toggle to automatically validate process graphs before execution
1921+
:param retry: general request retry settings, can be specified as:
1922+
1923+
- :py:class:`requests.adapters.Retry` object
1924+
or a dictionary with corresponding keyword arguments
1925+
(e.g. ``total``, ``backoff_factor``, ``status_forcelist``, ...)
1926+
- ``None`` (default) to use default openEO-oriented retry settings
1927+
- ``False`` to disable retrying requests
19071928
1908-
.. versionadded:: 0.24.0
1909-
added ``auto_validate`` argument
1929+
.. versionchanged:: 0.24.0
1930+
Added ``auto_validate`` argument
1931+
1932+
.. versionchanged:: 0.41.0
1933+
Added ``retry`` argument.
19101934
"""
19111935

19121936
def _config_log(message):
@@ -1931,7 +1955,9 @@ def _config_log(message):
19311955

19321956
if not url:
19331957
raise OpenEoClientException("No openEO back-end URL given or known to connect to.")
1934-
connection = Connection(url, session=session, default_timeout=default_timeout, auto_validate=auto_validate)
1958+
connection = Connection(
1959+
url, session=session, default_timeout=default_timeout, auto_validate=auto_validate, retry=retry
1960+
)
19351961

19361962
auth_type = auth_type.lower() if isinstance(auth_type, str) else auth_type
19371963
if auth_type in {None, False, 'null', 'none'}:

openeo/utils/http.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""
2+
openEO-oriented HTTP utilities
3+
"""
4+
5+
from typing import Collection, Union
6+
7+
import requests
8+
import requests.adapters
9+
10+
DEFAULT_RETRIES_TOTAL = 5
11+
12+
# On `backoff_factor`: it influences how much to sleep according to the formula:
13+
# sleep = {backoff factor} * (2 ** ({consecutive errors - 1}))
14+
# The sleep before the first retry will be skipped however.
15+
# For example with backoff_factor=2.5, the sleeps between consecutive attempts would be:
16+
# 0, 5, 10, 20, 40, ...
17+
DEFAULT_BACKOFF_FACTOR = 2.5
18+
19+
20+
DEFAULT_RETRY_FORCELIST = frozenset(
21+
[
22+
429, # Too Many Requests
23+
500, # Internal Server Error
24+
502, # Bad Gateway
25+
503, # Service Unavailable
26+
504, # Gateway Timeout
27+
]
28+
)
29+
30+
31+
def retry_adapter(
32+
*,
33+
total: int = DEFAULT_RETRIES_TOTAL,
34+
backoff_factor: float = DEFAULT_BACKOFF_FACTOR,
35+
status_forcelist: Collection[int] = DEFAULT_RETRY_FORCELIST,
36+
**kwargs,
37+
) -> requests.adapters.Retry:
38+
"""
39+
Factory for creating a `requests.adapters.Retry` configuration object with
40+
openEO-oriented retry settings.
41+
42+
:param total: Total number of retries to allow
43+
:param backoff_factor: scaling factor for sleeps between retries
44+
:param status_forcelist: A set of integer HTTP status codes that we should force a retry on.
45+
:param kwargs: additional kwargs to pass to `requests.adapters.Retry`
46+
:return:
47+
48+
Inspiration and references:
49+
- https://requests.readthedocs.io/en/latest/api/#requests.adapters.HTTPAdapter
50+
- https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry
51+
- https://findwork.dev/blog/advanced-usage-python-requests-timeouts-retries-hooks/#retry-on-failure
52+
"""
53+
retry = requests.adapters.Retry(
54+
total=total,
55+
backoff_factor=backoff_factor,
56+
status_forcelist=status_forcelist,
57+
**kwargs,
58+
)
59+
return retry
60+
61+
62+
def _to_retry(
63+
retry: Union[requests.adapters.Retry, dict, None],
64+
) -> requests.adapters.Retry:
65+
"""
66+
Convert a retry specification to a `requests.adapters.Retry` object.
67+
"""
68+
if isinstance(retry, requests.adapters.Retry):
69+
return retry
70+
elif isinstance(retry, dict):
71+
adapter = retry_adapter(**retry)
72+
elif retry in {None, True}:
73+
adapter = retry_adapter()
74+
else:
75+
raise ValueError(f"Invalid retry setting: {retry!r}")
76+
return adapter
77+
78+
79+
def session_with_retries(
80+
retry: Union[requests.adapters.Retry, dict, None] = None,
81+
) -> requests.Session:
82+
"""
83+
Factory for a requests session with openEO-oriented retry settings.
84+
85+
:param retry: The retry configuration, can be specified as:
86+
- :py:class:`requests.adapters.Retry`
87+
- a dictionary with :py:class:`requests.adapters.Retry` arguments,
88+
e.g. ``total``, ``backoff_factor``, ``status_forcelist``, ...
89+
- ``None`` for default openEO-oriented retry settings
90+
"""
91+
session = requests.Session()
92+
retry = _to_retry(retry)
93+
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
94+
session.mount("http://", adapter)
95+
session.mount("https://", adapter)
96+
return session

tests/rest/test_job.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextlib
12
import itertools
23
import json
34
import logging
@@ -6,12 +7,19 @@
67
from typing import Optional
78
from unittest import mock
89

10+
import dirty_equals
11+
import httpretty
912
import pytest
1013
import requests
1114

1215
import openeo
1316
import openeo.rest.job
14-
from openeo.rest import JobFailedException, OpenEoApiPlainError, OpenEoClientException
17+
from openeo.rest import (
18+
DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL,
19+
JobFailedException,
20+
OpenEoApiPlainError,
21+
OpenEoClientException,
22+
)
1523
from openeo.rest.job import BatchJob, ResultAsset
1624
from openeo.rest.models.general import Link
1725
from openeo.rest.models.logs import LogEntry
@@ -310,6 +318,90 @@ def test_execute_batch_with_excessive_soft_errors(con100, requests_mock, tmpdir,
310318
]
311319

312320

321+
@httpretty.activate(allow_net_connect=False)
322+
@pytest.mark.parametrize(
323+
["retry", "expectation_context", "expected_sleeps"],
324+
[
325+
( # Default retry settings
326+
None,
327+
contextlib.nullcontext(),
328+
[0.1, 23, 34],
329+
),
330+
(
331+
# Only retry on 429 (and fail on 500)
332+
{"status_forcelist": [429]},
333+
pytest.raises(OpenEoApiPlainError, match=re.escape("[500] Internal Server Error")),
334+
[0.1, 23, DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL],
335+
),
336+
(
337+
# No retry setup
338+
False,
339+
pytest.raises(OpenEoApiPlainError, match=re.escape("[429] Too Many Requests")),
340+
[0.1],
341+
),
342+
],
343+
)
344+
def test_execute_batch_retry_after_429_too_many_requests(tmpdir, retry, expectation_context, expected_sleeps):
345+
httpretty.register_uri(
346+
httpretty.GET,
347+
uri=API_URL + "/",
348+
body=json.dumps({"api_version": "1.0.0", "endpoints": [{"path": "/credentials/basic", "methods": ["GET"]}]}),
349+
)
350+
httpretty.register_uri(
351+
httpretty.GET,
352+
uri=API_URL + "/file_formats",
353+
body=json.dumps({"output": {"GTiff": {"gis_data_types": ["raster"]}}}),
354+
)
355+
httpretty.register_uri(
356+
httpretty.GET,
357+
uri=API_URL + "/collections/SENTINEL2",
358+
body=json.dumps({"foo": "bar"}),
359+
)
360+
httpretty.register_uri(
361+
httpretty.POST, uri=API_URL + "/jobs", status=201, adding_headers={"OpenEO-Identifier": "f00ba5"}, body=""
362+
)
363+
httpretty.register_uri(httpretty.POST, uri=API_URL + "/jobs/f00ba5/results", status=202)
364+
httpretty.register_uri(
365+
httpretty.GET,
366+
uri=API_URL + "/jobs/f00ba5",
367+
responses=[
368+
httpretty.Response(body=json.dumps({"status": "queued"})),
369+
httpretty.Response(status=429, body="Too Many Requests", adding_headers={"Retry-After": "23"}),
370+
httpretty.Response(body=json.dumps({"status": "running", "progress": 80})),
371+
httpretty.Response(status=502, body="Bad Gateway"),
372+
httpretty.Response(status=500, body="Internal Server Error"),
373+
httpretty.Response(body=json.dumps({"status": "running", "progress": 80})),
374+
httpretty.Response(status=429, body="Too Many Requests", adding_headers={"Retry-After": "34"}),
375+
httpretty.Response(body=json.dumps({"status": "finished", "progress": 100})),
376+
],
377+
)
378+
httpretty.register_uri(
379+
httpretty.GET,
380+
uri=API_URL + "/jobs/f00ba5/results",
381+
body=json.dumps(
382+
{
383+
"assets": {
384+
"output.tiff": {
385+
"href": API_URL + "/jobs/f00ba5/files/output.tiff",
386+
"type": "image/tiff; application=geotiff",
387+
},
388+
}
389+
}
390+
),
391+
)
392+
httpretty.register_uri(httpretty.GET, uri=API_URL + "/jobs/f00ba5/files/output.tiff", body="tiffdata")
393+
httpretty.register_uri(httpretty.GET, uri=API_URL + "/jobs/f00ba5/logs", body=json.dumps({"logs": []}))
394+
395+
con = openeo.connect(API_URL, retry=retry)
396+
397+
with mock.patch("time.sleep") as sleep_mock:
398+
job = con.load_collection("SENTINEL2").create_job()
399+
with expectation_context:
400+
job.start_and_wait(max_poll_interval=0.1)
401+
402+
assert sleep_mock.call_args_list == dirty_equals.Contains(*(mock.call(s) for s in expected_sleeps))
403+
404+
313405
class LogGenerator:
314406
"""Helper to generate log entry (dicts) with auto-generated ids, messages, etc."""
315407

0 commit comments

Comments
 (0)