Skip to content

Commit a1e1830

Browse files
authored
Add support for sniffing via elastic-transport
1 parent d5c076f commit a1e1830

File tree

14 files changed

+1111
-384
lines changed

14 files changed

+1111
-384
lines changed

elasticsearch/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@
3232

3333
from ._async.client import AsyncElasticsearch
3434
from ._sync.client import Elasticsearch
35+
from .exceptions import ElasticsearchDeprecationWarning # noqa: F401
3536
from .exceptions import (
3637
ApiError,
3738
AuthenticationException,
3839
AuthorizationException,
3940
ConflictError,
4041
ConnectionError,
4142
ConnectionTimeout,
42-
ElasticsearchDeprecationWarning,
4343
ElasticsearchException,
4444
ElasticsearchWarning,
4545
NotFoundError,
@@ -73,5 +73,4 @@
7373
"AuthorizationException",
7474
"UnsupportedProductError",
7575
"ElasticsearchWarning",
76-
"ElasticsearchDeprecationWarning",
7776
]

elasticsearch/_async/client/__init__.py

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@
1818

1919
import logging
2020
import warnings
21-
from typing import Optional
21+
from typing import Any, Callable, Dict, Optional, Union
2222

23-
from elastic_transport import AsyncTransport, TransportError
23+
from elastic_transport import AsyncTransport, NodeConfig, TransportError
2424
from elastic_transport.client_utils import DEFAULT
2525

2626
from ...exceptions import NotFoundError
2727
from ...serializer import DEFAULT_SERIALIZERS
28-
from ._base import BaseClient, resolve_auth_headers
28+
from ._base import (
29+
BaseClient,
30+
create_sniff_callback,
31+
default_sniff_callback,
32+
resolve_auth_headers,
33+
)
2934
from .async_search import AsyncSearchClient
3035
from .autoscaling import AutoscalingClient
3136
from .cat import CatClient
@@ -148,9 +153,21 @@ def __init__(
148153
sniff_on_node_failure=DEFAULT,
149154
sniff_timeout=DEFAULT,
150155
min_delay_between_sniffing=DEFAULT,
156+
sniffed_node_callback: Optional[
157+
Callable[[Dict[str, Any], NodeConfig], Optional[NodeConfig]]
158+
] = None,
151159
meta_header=DEFAULT,
152160
# Deprecated
153161
timeout=DEFAULT,
162+
randomize_hosts=DEFAULT,
163+
host_info_callback: Optional[
164+
Callable[
165+
[Dict[str, Any], Dict[str, Union[str, int]]],
166+
Optional[Dict[str, Union[str, int]]],
167+
]
168+
] = None,
169+
sniffer_timeout=DEFAULT,
170+
sniff_on_connection_fail=DEFAULT,
154171
# Internal use only
155172
_transport: Optional[AsyncTransport] = None,
156173
) -> None:
@@ -170,6 +187,92 @@ def __init__(
170187
)
171188
request_timeout = timeout
172189

190+
if randomize_hosts is not DEFAULT:
191+
if randomize_nodes_in_pool is not DEFAULT:
192+
raise ValueError(
193+
"Can't specify both 'randomize_hosts' and 'randomize_nodes_in_pool', "
194+
"instead only specify 'randomize_nodes_in_pool'"
195+
)
196+
warnings.warn(
197+
"The 'randomize_hosts' parameter is deprecated in favor of 'randomize_nodes_in_pool'",
198+
category=DeprecationWarning,
199+
stacklevel=2,
200+
)
201+
randomize_nodes_in_pool = randomize_hosts
202+
203+
if sniffer_timeout is not DEFAULT:
204+
if min_delay_between_sniffing is not DEFAULT:
205+
raise ValueError(
206+
"Can't specify both 'sniffer_timeout' and 'min_delay_between_sniffing', "
207+
"instead only specify 'min_delay_between_sniffing'"
208+
)
209+
warnings.warn(
210+
"The 'sniffer_timeout' parameter is deprecated in favor of 'min_delay_between_sniffing'",
211+
category=DeprecationWarning,
212+
stacklevel=2,
213+
)
214+
min_delay_between_sniffing = sniffer_timeout
215+
216+
if sniff_on_connection_fail is not DEFAULT:
217+
if sniff_on_node_failure is not DEFAULT:
218+
raise ValueError(
219+
"Can't specify both 'sniff_on_connection_fail' and 'sniff_on_node_failure', "
220+
"instead only specify 'sniff_on_node_failure'"
221+
)
222+
warnings.warn(
223+
"The 'sniff_on_connection_fail' parameter is deprecated in favor of 'sniff_on_node_failure'",
224+
category=DeprecationWarning,
225+
stacklevel=2,
226+
)
227+
sniff_on_node_failure = sniff_on_connection_fail
228+
229+
# Setting min_delay_between_sniffing=True implies sniff_before_requests=True
230+
if min_delay_between_sniffing is not DEFAULT:
231+
sniff_before_requests = True
232+
233+
sniffing_options = (
234+
sniff_timeout,
235+
sniff_on_start,
236+
sniff_before_requests,
237+
sniff_on_node_failure,
238+
sniffed_node_callback,
239+
min_delay_between_sniffing,
240+
sniffed_node_callback,
241+
)
242+
if cloud_id is not None and any(
243+
x is not DEFAULT and x is not None for x in sniffing_options
244+
):
245+
raise ValueError(
246+
"Sniffing should not be enabled when connecting to Elastic Cloud"
247+
)
248+
249+
sniff_callback = None
250+
if host_info_callback is not None:
251+
if sniffed_node_callback is not None:
252+
raise ValueError(
253+
"Can't specify both 'host_info_callback' and 'sniffed_node_callback', "
254+
"instead only specify 'sniffed_node_callback'"
255+
)
256+
warnings.warn(
257+
"The 'host_info_callback' parameter is deprecated in favor of 'sniffed_node_callback'",
258+
category=DeprecationWarning,
259+
stacklevel=2,
260+
)
261+
262+
sniff_callback = create_sniff_callback(
263+
host_info_callback=host_info_callback
264+
)
265+
elif sniffed_node_callback is not None:
266+
sniff_callback = create_sniff_callback(
267+
sniffed_node_callback=sniffed_node_callback
268+
)
269+
elif (
270+
sniff_on_start is True
271+
or sniff_before_requests is True
272+
or sniff_on_node_failure is True
273+
):
274+
sniff_callback = default_sniff_callback
275+
173276
if _transport is None:
174277
node_configs = client_node_configs(
175278
hosts,
@@ -222,6 +325,7 @@ def __init__(
222325
_transport = transport_class(
223326
node_configs,
224327
client_meta_service=CLIENT_META_SERVICE,
328+
sniff_callback=sniff_callback,
225329
**transport_kwargs,
226330
)
227331

elasticsearch/_async/client/_base.py

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,33 @@
1717

1818
import re
1919
import warnings
20-
from typing import Any, Collection, Iterable, Mapping, Optional, Tuple, TypeVar, Union
20+
from typing import (
21+
Any,
22+
Callable,
23+
Collection,
24+
Dict,
25+
Iterable,
26+
List,
27+
Mapping,
28+
Optional,
29+
Tuple,
30+
TypeVar,
31+
Union,
32+
)
2133

22-
from elastic_transport import AsyncTransport, HttpHeaders
34+
from elastic_transport import AsyncTransport, HttpHeaders, NodeConfig, SniffOptions
2335
from elastic_transport.client_utils import DEFAULT, DefaultType, resolve_default
2436

2537
from ...compat import urlencode, warn_stacklevel
2638
from ...exceptions import (
2739
HTTP_EXCEPTIONS,
2840
ApiError,
41+
ConnectionError,
2942
ElasticsearchWarning,
43+
SerializationError,
3044
UnsupportedProductError,
3145
)
32-
from .utils import _base64_auth_header
46+
from .utils import _TYPE_ASYNC_SNIFF_CALLBACK, _base64_auth_header
3347

3448
SelfType = TypeVar("SelfType", bound="BaseClient")
3549
SelfNamespacedType = TypeVar("SelfNamespacedType", bound="NamespacedClient")
@@ -83,6 +97,102 @@ def resolve_auth_headers(
8397
return headers
8498

8599

100+
def create_sniff_callback(
101+
host_info_callback: Optional[
102+
Callable[[Dict[str, Any], Dict[str, Any]], Optional[Dict[str, Any]]]
103+
] = None,
104+
sniffed_node_callback: Optional[
105+
Callable[[Dict[str, Any], NodeConfig], Optional[NodeConfig]]
106+
] = None,
107+
) -> _TYPE_ASYNC_SNIFF_CALLBACK:
108+
assert (host_info_callback is None) != (sniffed_node_callback is None)
109+
110+
# Wrap the deprecated 'host_info_callback' into 'sniffed_node_callback'
111+
if host_info_callback is not None:
112+
113+
def _sniffed_node_callback(
114+
node_info: Dict[str, Any], node_config: NodeConfig
115+
) -> Optional[NodeConfig]:
116+
assert host_info_callback is not None
117+
if (
118+
host_info_callback( # type ignore[misc]
119+
node_info, {"host": node_config.host, "port": node_config.port}
120+
)
121+
is None
122+
):
123+
return None
124+
return node_config
125+
126+
sniffed_node_callback = _sniffed_node_callback
127+
128+
async def sniff_callback(
129+
transport: AsyncTransport, sniff_options: SniffOptions
130+
) -> List[NodeConfig]:
131+
for _ in transport.node_pool.all():
132+
try:
133+
meta, node_infos = await transport.perform_request(
134+
"GET",
135+
"/_nodes/_all/http",
136+
headers={"accept": "application/json"},
137+
request_timeout=(
138+
sniff_options.sniff_timeout
139+
if not sniff_options.is_initial_sniff
140+
else None
141+
),
142+
)
143+
except (SerializationError, ConnectionError):
144+
continue
145+
146+
if not 200 <= meta.status <= 299:
147+
continue
148+
149+
node_configs = []
150+
for node_info in node_infos.get("nodes", {}).values():
151+
address = node_info.get("http", {}).get("publish_address")
152+
if not address or ":" not in address:
153+
continue
154+
155+
if "/" in address:
156+
# Support 7.x host/ip:port behavior where http.publish_host has been set.
157+
fqdn, ipaddress = address.split("/", 1)
158+
host = fqdn
159+
_, port_str = ipaddress.rsplit(":", 1)
160+
port = int(port_str)
161+
else:
162+
host, port_str = address.rsplit(":", 1)
163+
port = int(port_str)
164+
165+
assert sniffed_node_callback is not None
166+
sniffed_node = sniffed_node_callback(
167+
node_info, meta.node.replace(host=host, port=port)
168+
)
169+
if sniffed_node is None:
170+
continue
171+
172+
# Use the node which was able to make the request as a base.
173+
node_configs.append(sniffed_node)
174+
175+
if node_configs:
176+
return node_configs
177+
178+
return []
179+
180+
return sniff_callback
181+
182+
183+
def _default_sniffed_node_callback(
184+
node_info: Dict[str, Any], node_config: NodeConfig
185+
) -> Optional[NodeConfig]:
186+
if node_info.get("roles", []) == ["master"]:
187+
return None
188+
return node_config
189+
190+
191+
default_sniff_callback = create_sniff_callback(
192+
sniffed_node_callback=_default_sniffed_node_callback
193+
)
194+
195+
86196
class BaseClient:
87197
def __init__(self, _transport: AsyncTransport) -> None:
88198
self._transport = _transport

elasticsearch/_async/client/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818
from ..._sync.client.utils import (
19+
_TYPE_ASYNC_SNIFF_CALLBACK,
1920
_TYPE_HOSTS,
2021
CLIENT_META_SERVICE,
2122
SKIP_IN_PATH,
@@ -30,6 +31,7 @@
3031

3132
__all__ = [
3233
"CLIENT_META_SERVICE",
34+
"_TYPE_ASYNC_SNIFF_CALLBACK",
3335
"_deprecated_options",
3436
"_TYPE_HOSTS",
3537
"SKIP_IN_PATH",

0 commit comments

Comments
 (0)