Skip to content

Commit a039151

Browse files
committed
Add support for elastic-transport sniffing
1 parent 620032d commit a039151

File tree

10 files changed

+782
-354
lines changed

10 files changed

+782
-354
lines changed

elasticsearch/_async/client/__init__.py

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@
1818

1919
import logging
2020
import warnings
21-
from typing import Optional
21+
from typing import Any, Callable, Dict, Optional, Union
2222

23-
from elastic_transport import AsyncTransport, TransportError
23+
from elastic_transport import AsyncTransport, NodeConfig, TransportError
2424
from elastic_transport.client_utils import DEFAULT
2525

2626
from ...exceptions import NotFoundError
2727
from ...serializer import DEFAULT_SERIALIZERS
28-
from ._base import BaseClient, resolve_auth_headers
28+
from ._base import (
29+
BaseClient,
30+
create_sniff_callback,
31+
default_sniff_callback,
32+
resolve_auth_headers,
33+
)
2934
from .async_search import AsyncSearchClient
3035
from .autoscaling import AutoscalingClient
3136
from .cat import CatClient
@@ -148,9 +153,21 @@ def __init__(
148153
sniff_on_node_failure=DEFAULT,
149154
sniff_timeout=DEFAULT,
150155
min_delay_between_sniffing=DEFAULT,
156+
sniffed_node_callback: Optional[
157+
Callable[[Dict[str, Any], NodeConfig], Optional[NodeConfig]]
158+
] = None,
151159
meta_header=DEFAULT,
152160
# Deprecated
153161
timeout=DEFAULT,
162+
randomize_hosts=DEFAULT,
163+
host_info_callback: Optional[
164+
Callable[
165+
[Dict[str, Any], Dict[str, Union[str, int]]],
166+
Optional[Dict[str, Union[str, int]]],
167+
]
168+
] = None,
169+
sniffer_timeout=DEFAULT,
170+
sniff_on_connection_fail=DEFAULT,
154171
# Internal use only
155172
_transport: Optional[AsyncTransport] = None,
156173
) -> None:
@@ -170,6 +187,86 @@ def __init__(
170187
)
171188
request_timeout = timeout
172189

190+
if randomize_hosts is not DEFAULT:
191+
if randomize_nodes_in_pool is not DEFAULT:
192+
raise ValueError(
193+
"Can't specify both 'randomize_hosts' and 'randomize_nodes_in_pool', "
194+
"instead only specify 'randomize_nodes_in_pool'"
195+
)
196+
warnings.warn(
197+
"The 'randomize_hosts' parameter is deprecated in favor of 'randomize_nodes_in_pool'",
198+
category=DeprecationWarning,
199+
stacklevel=2,
200+
)
201+
randomize_nodes_in_pool = randomize_hosts
202+
203+
if sniffer_timeout is not DEFAULT:
204+
if min_delay_between_sniffing is not DEFAULT:
205+
raise ValueError(
206+
"Can't specify both 'sniffer_timeout' and 'min_delay_between_sniffing', "
207+
"instead only specify 'min_delay_between_sniffing'"
208+
)
209+
warnings.warn(
210+
"The 'sniffer_timeout' parameter is deprecated in favor of 'min_delay_between_sniffing'",
211+
category=DeprecationWarning,
212+
stacklevel=2,
213+
)
214+
min_delay_between_sniffing = sniffer_timeout
215+
216+
if sniff_on_connection_fail is not DEFAULT:
217+
if sniff_on_node_failure is not DEFAULT:
218+
raise ValueError(
219+
"Can't specify both 'sniff_on_connection_fail' and 'sniff_on_node_failure', "
220+
"instead only specify 'sniff_on_node_failure'"
221+
)
222+
warnings.warn(
223+
"The 'sniff_on_connection_fail' parameter is deprecated in favor of 'sniff_on_node_failure'",
224+
category=DeprecationWarning,
225+
stacklevel=2,
226+
)
227+
sniff_on_node_failure = sniff_on_connection_fail
228+
229+
# Setting min_delay_between_sniffing=True implies sniff_before_requests=True
230+
if min_delay_between_sniffing is not DEFAULT:
231+
sniff_before_requests = True
232+
233+
sniffing_options = (
234+
sniff_timeout,
235+
sniff_on_start,
236+
sniff_before_requests,
237+
sniff_on_node_failure,
238+
sniffed_node_callback,
239+
min_delay_between_sniffing,
240+
sniffed_node_callback,
241+
)
242+
if cloud_id is not None and any(
243+
x is not DEFAULT and x is not None for x in sniffing_options
244+
):
245+
raise ValueError(
246+
"Sniffing should not be enabled when connecting to Elastic Cloud"
247+
)
248+
249+
sniff_callback = None
250+
if host_info_callback is not None:
251+
if sniffed_node_callback is not None:
252+
raise ValueError(
253+
"Can't specify both 'host_info_callback' and 'sniffed_node_callback', "
254+
"instead only specify 'sniffed_node_callback'"
255+
)
256+
sniff_callback = create_sniff_callback(
257+
host_info_callback=host_info_callback
258+
)
259+
elif sniffed_node_callback is not None:
260+
sniff_callback = create_sniff_callback(
261+
sniffed_node_callback=sniffed_node_callback
262+
)
263+
elif (
264+
sniff_on_start is True
265+
or sniff_before_requests is True
266+
or sniff_on_node_failure is True
267+
):
268+
sniff_callback = default_sniff_callback
269+
173270
if _transport is None:
174271
node_configs = client_node_configs(
175272
hosts,
@@ -222,6 +319,7 @@ def __init__(
222319
_transport = transport_class(
223320
node_configs,
224321
client_meta_service=CLIENT_META_SERVICE,
322+
sniff_callback=sniff_callback,
225323
**transport_kwargs,
226324
)
227325

elasticsearch/_async/client/_base.py

Lines changed: 118 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,31 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from typing import Any, Collection, Mapping, Optional, Tuple, TypeVar, Union
19-
20-
from elastic_transport import AsyncTransport, HttpHeaders
18+
from typing import (
19+
Any,
20+
Callable,
21+
Collection,
22+
Dict,
23+
List,
24+
Mapping,
25+
Optional,
26+
Tuple,
27+
TypeVar,
28+
Union,
29+
)
30+
31+
from elastic_transport import AsyncTransport, HttpHeaders, NodeConfig, SniffOptions
2132
from elastic_transport.client_utils import DEFAULT, DefaultType, resolve_default
2233

2334
from ...compat import urlencode
24-
from ...exceptions import HTTP_EXCEPTIONS, ApiError, UnsupportedProductError
25-
from .utils import _base64_auth_header
35+
from ...exceptions import (
36+
HTTP_EXCEPTIONS,
37+
ApiError,
38+
ConnectionError,
39+
SerializationError,
40+
UnsupportedProductError,
41+
)
42+
from .utils import _TYPE_ASYNC_SNIFF_CALLBACK, _base64_auth_header
2643

2744
SelfType = TypeVar("SelfType", bound="BaseClient")
2845
SelfNamespacedType = TypeVar("SelfNamespacedType", bound="NamespacedClient")
@@ -74,6 +91,102 @@ def resolve_auth_headers(
7491
return headers
7592

7693

94+
def create_sniff_callback(
95+
host_info_callback: Optional[
96+
Callable[[Dict[str, Any], Dict[str, Any]], Optional[Dict[str, Any]]]
97+
] = None,
98+
sniffed_node_callback: Optional[
99+
Callable[[Dict[str, Any], NodeConfig], Optional[NodeConfig]]
100+
] = None,
101+
) -> _TYPE_ASYNC_SNIFF_CALLBACK:
102+
assert (host_info_callback is None) != (sniffed_node_callback is None)
103+
104+
# Wrap the deprecated 'host_info_callback' into 'sniffed_node_callback'
105+
if host_info_callback is not None:
106+
107+
def _sniffed_node_callback(
108+
node_info: Dict[str, Any], node_config: NodeConfig
109+
) -> Optional[NodeConfig]:
110+
assert host_info_callback is not None
111+
if (
112+
host_info_callback( # type ignore[misc]
113+
node_info, {"host": node_config.host, "port": node_config.port}
114+
)
115+
is None
116+
):
117+
return None
118+
return node_config
119+
120+
sniffed_node_callback = _sniffed_node_callback
121+
122+
async def sniff_callback(
123+
transport: AsyncTransport, sniff_options: SniffOptions
124+
) -> List[NodeConfig]:
125+
for _ in transport.node_pool.all():
126+
try:
127+
meta, node_infos = await transport.perform_request(
128+
"GET",
129+
"/_nodes/_all/http",
130+
headers={"accept": "application/json"},
131+
request_timeout=(
132+
sniff_options.sniff_timeout
133+
if not sniff_options.is_initial_sniff
134+
else None
135+
),
136+
)
137+
except (SerializationError, ConnectionError):
138+
continue
139+
140+
if not 200 <= meta.status <= 299:
141+
continue
142+
143+
node_configs = []
144+
for node_info in node_infos.get("nodes", {}).values():
145+
address = node_info.get("http", {}).get("publish_address")
146+
if not address or ":" not in address:
147+
continue
148+
149+
if "/" in address:
150+
# Support 7.x host/ip:port behavior where http.publish_host has been set.
151+
fqdn, ipaddress = address.split("/", 1)
152+
host = fqdn
153+
_, port_str = ipaddress.rsplit(":", 1)
154+
port = int(port_str)
155+
else:
156+
host, port_str = address.rsplit(":", 1)
157+
port = int(port_str)
158+
159+
assert sniffed_node_callback is not None
160+
sniffed_node = sniffed_node_callback(
161+
node_info, meta.node.replace(host=host, port=port)
162+
)
163+
if sniffed_node is None:
164+
continue
165+
166+
# Use the node which was able to make the request as a base.
167+
node_configs.append(sniffed_node)
168+
169+
if node_configs:
170+
return node_configs
171+
172+
return []
173+
174+
return sniff_callback
175+
176+
177+
def _default_sniffed_node_callback(
178+
node_info: Dict[str, Any], node_config: NodeConfig
179+
) -> Optional[NodeConfig]:
180+
if node_info.get("roles", []) == ["master"]:
181+
return None
182+
return node_config
183+
184+
185+
default_sniff_callback = create_sniff_callback(
186+
sniffed_node_callback=_default_sniffed_node_callback
187+
)
188+
189+
77190
class BaseClient:
78191
def __init__(self, _transport: AsyncTransport) -> None:
79192
self._transport = _transport

elasticsearch/_async/client/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818
from ..._sync.client.utils import (
19+
_TYPE_ASYNC_SNIFF_CALLBACK,
1920
_TYPE_HOSTS,
2021
CLIENT_META_SERVICE,
2122
SKIP_IN_PATH,
@@ -30,6 +31,7 @@
3031

3132
__all__ = [
3233
"CLIENT_META_SERVICE",
34+
"_TYPE_ASYNC_SNIFF_CALLBACK",
3335
"_deprecated_options",
3436
"_TYPE_HOSTS",
3537
"SKIP_IN_PATH",

0 commit comments

Comments
 (0)