Skip to content

Integrate the Elastic Transport package #1753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 128 additions & 56 deletions elasticsearch/_async/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@


import logging
import warnings
from typing import Optional

from ..transport import AsyncTransport, TransportError
from elastic_transport import AsyncTransport, NotFoundError, TransportError
from elastic_transport.client_utils import DEFAULT

from ._base import BaseClient
from .async_search import AsyncSearchClient
from .autoscaling import AutoscalingClient
from .cat import CatClient
Expand Down Expand Up @@ -50,14 +55,22 @@
from .tasks import TasksClient
from .text_structure import TextStructureClient
from .transform import TransformClient
from .utils import SKIP_IN_PATH, _bulk_body, _make_path, _normalize_hosts, query_params
from .utils import (
_TYPE_HOSTS,
CLIENT_META_SERVICE,
SKIP_IN_PATH,
_deprecated_options,
_make_path,
client_node_configs,
query_params,
)
from .watcher import WatcherClient
from .xpack import XPackClient

logger = logging.getLogger("elasticsearch")


class AsyncElasticsearch:
class AsyncElasticsearch(BaseClient):
"""
Elasticsearch low-level client. Provides a straightforward mapping from
Python to ES REST endpoints.
Expand All @@ -74,12 +87,6 @@ class AsyncElasticsearch:
preferred (and only supported) way to get access to those classes and their
methods.

You can specify your own connection class which should be used by providing
the ``connection_class`` parameter::

# create connection to localhost using the ThriftConnection
es = Elasticsearch(connection_class=ThriftConnection)

If you want to turn on :ref:`sniffing` you have several options (described
in :class:`~elasticsearch.Transport`)::

Expand Down Expand Up @@ -111,7 +118,7 @@ class AsyncElasticsearch:
detailed description of the options)::

es = Elasticsearch(
['localhost:443', 'other_host:443'],
['https://localhost:443', 'https://other_host:443'],
# turn on SSL
use_ssl=True,
# make sure we verify SSL certificates
Expand All @@ -125,7 +132,7 @@ class AsyncElasticsearch:
detailed description of the options)::

es = Elasticsearch(
['localhost:443', 'other_host:443'],
['https://localhost:443', 'https://other_host:443'],
# turn on SSL
use_ssl=True,
# no verify SSL certificates
Expand All @@ -139,7 +146,7 @@ class AsyncElasticsearch:
detailed description of the options)::

es = Elasticsearch(
['localhost:443', 'other_host:443'],
['https://localhost:443', 'https://other_host:443'],
# turn on SSL
use_ssl=True,
# make sure we verify SSL certificates
Expand All @@ -163,41 +170,113 @@ class AsyncElasticsearch:
verify_certs=True
)

By default, `JSONSerializer
<https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/serializer.py>`_
is used to encode all outgoing requests.
By default, ``JsonSerializer`` is used to encode all outgoing requests.
However, you can implement your own custom serializer::

from elasticsearch.serializer import JSONSerializer
from elasticsearch.serializer import JsonSerializer

class SetEncoder(JSONSerializer):
class SetEncoder(JsonSerializer):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
if isinstance(obj, Something):
return 'CustomSomethingRepresentation'
return JSONSerializer.default(self, obj)
return JsonSerializer.default(self, obj)

es = Elasticsearch(serializer=SetEncoder())

"""

def __init__(self, hosts=None, transport_class=AsyncTransport, **kwargs):
"""
:arg hosts: list of nodes, or a single node, we should connect to.
Node should be a dictionary ({"host": "localhost", "port": 9200}),
the entire dictionary will be passed to the :class:`~elasticsearch.Connection`
class as kwargs, or a string in the format of ``host[:port]`` which will be
translated to a dictionary automatically. If no value is given the
:class:`~elasticsearch.Connection` class defaults will be used.

:arg transport_class: :class:`~elasticsearch.Transport` subclass to use.
def __init__(
self,
hosts: Optional[_TYPE_HOSTS] = None,
*,
# API
cloud_id: Optional[str] = None,
api_key=None,
basic_auth=None,
bearer_auth=None,
# Node
headers=DEFAULT,
connections_per_node=DEFAULT,
http_compress=DEFAULT,
verify_certs=DEFAULT,
ca_certs=DEFAULT,
client_cert=DEFAULT,
client_key=DEFAULT,
ssl_assert_hostname=DEFAULT,
ssl_assert_fingerprint=DEFAULT,
ssl_version=DEFAULT,
ssl_context=DEFAULT,
ssl_show_warn=DEFAULT,
# Transport
transport_class=AsyncTransport,
request_timeout=DEFAULT,
node_class=DEFAULT,
node_pool_class=DEFAULT,
randomize_nodes_in_pool=DEFAULT,
node_selector_class=DEFAULT,
dead_backoff_factor=DEFAULT,
max_dead_backoff=DEFAULT,
serializers=DEFAULT,
default_mimetype="application/json",
max_retries=DEFAULT,
retry_on_status=DEFAULT,
retry_on_timeout=DEFAULT,
sniff_on_start=DEFAULT,
sniff_before_requests=DEFAULT,
sniff_on_node_failure=DEFAULT,
sniff_timeout=DEFAULT,
min_delay_between_sniffing=DEFAULT,
# Deprecated
timeout=DEFAULT,
# Internal use only
_transport: Optional[AsyncTransport] = None,
) -> None:
super().__init__()

if hosts is None and cloud_id is None and _transport is None:
raise ValueError("Either 'hosts' or 'cloud_id' must be specified")

if timeout is not DEFAULT:
if request_timeout is not DEFAULT:
raise ValueError(
"Can't specify both 'timeout' and 'request_timeout', "
"instead only specify 'request_timeout'"
)
warnings.warn(
"The 'timeout' parameter is deprecated in favor of 'request_timeout'",
category=DeprecationWarning,
stacklevel=2,
)
request_timeout = timeout

:arg kwargs: any additional arguments will be passed on to the
:class:`~elasticsearch.Transport` class and, subsequently, to the
:class:`~elasticsearch.Connection` instances.
"""
self.transport = transport_class(_normalize_hosts(hosts), **kwargs)
self.transport: AsyncTransport
if _transport is not None:
self.transport = _transport
else:
node_configs = client_node_configs(
hosts,
cloud_id=cloud_id,
headers=headers,
connections_per_node=connections_per_node,
http_compress=http_compress,
verify_certs=verify_certs,
ca_certs=ca_certs,
client_cert=client_cert,
client_key=client_key,
ssl_assert_hostname=ssl_assert_hostname,
ssl_assert_fingerprint=ssl_assert_fingerprint,
ssl_version=ssl_version,
ssl_context=ssl_context,
ssl_show_warn=ssl_show_warn,
)
transport_kwargs = {}
self.transport = transport_class(
node_configs,
client_meta_service=CLIENT_META_SERVICE,
**transport_kwargs,
)

# namespaced clients for compatibility with API names
self.async_search = AsyncSearchClient(self)
Expand Down Expand Up @@ -1193,11 +1272,6 @@ async def rank_eval(self, body, index=None, params=None, headers=None):

`<https://www.elastic.co/guide/en/elasticsearch/reference/master/search-rank-eval.html>`_

.. warning::

This API is **experimental** so may include breaking changes
or be removed in a future version

:arg body: The ranking evaluation search definition, including
search requests, document ratings and ranking metric definition.
:arg index: A comma-separated list of index names to search; use
Expand Down Expand Up @@ -1651,11 +1725,6 @@ async def get_script_context(self, params=None, headers=None):
Returns all script contexts.

`<https://www.elastic.co/guide/en/elasticsearch/painless/master/painless-contexts.html>`_

.. warning::

This API is **experimental** so may include breaking changes
or be removed in a future version
"""
return await self.transport.perform_request(
"GET", "/_script_context", params=params, headers=headers
Expand All @@ -1667,11 +1736,6 @@ async def get_script_languages(self, params=None, headers=None):
Returns available script types, languages and contexts

`<https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting.html>`_

.. warning::

This API is **experimental** so may include breaking changes
or be removed in a future version
"""
return await self.transport.perform_request(
"GET", "/_script_language", params=params, headers=headers
Expand Down Expand Up @@ -2050,7 +2114,7 @@ async def close_point_in_time(self, body=None, params=None, headers=None):
@query_params(
"expand_wildcards", "ignore_unavailable", "keep_alive", "preference", "routing"
)
async def open_point_in_time(self, index=None, params=None, headers=None):
async def open_point_in_time(self, index, params=None, headers=None):
"""
Open a point in time that can be used in subsequent searches

Expand All @@ -2069,6 +2133,9 @@ async def open_point_in_time(self, index=None, params=None, headers=None):
be performed on (default: random)
:arg routing: Specific routing value
"""
if index in SKIP_IN_PATH:
raise ValueError("Empty value passed for a required argument 'index'.")

return await self.transport.perform_request(
"POST", _make_path(index, "_pit"), params=params, headers=headers
)
Expand All @@ -2082,11 +2149,6 @@ async def terms_enum(self, index, body=None, params=None, headers=None):

`<https://www.elastic.co/guide/en/elasticsearch/reference/master/search-terms-enum.html>`_

.. warning::

This API is **beta** so may include breaking changes
or be removed in a future version

:arg index: A comma-separated list of index names to search; use
`_all` or empty string to perform the operation on all indices
:arg body: field name, string which is the prefix expected in
Expand All @@ -2103,7 +2165,14 @@ async def terms_enum(self, index, body=None, params=None, headers=None):
body=body,
)

@query_params("exact_bounds", "extent", "grid_precision", "grid_type", "size")
@query_params(
"exact_bounds",
"extent",
"grid_precision",
"grid_type",
"size",
"track_total_hits",
)
async def search_mvt(
self, index, field, zoom, x, y, body=None, params=None, headers=None
):
Expand Down Expand Up @@ -2133,9 +2202,12 @@ async def search_mvt(
:arg grid_precision: Additional zoom levels available through
the aggs layer. Accepts 0-8. Default: 8
:arg grid_type: Determines the geometry type for features in the
aggs layer. Valid choices: grid, point Default: grid
aggs layer. Valid choices: grid, point, centroid Default: grid
:arg size: Maximum number of features to return in the hits
layer. Accepts 0-10000. Default: 10000
:arg track_total_hits: Indicate if the number of documents that
match the query should be tracked. A number can also be specified, to
accurately track the total hit count up to the number.
"""
for param in (index, field, zoom, x, y):
if param in SKIP_IN_PATH:
Expand Down
6 changes: 4 additions & 2 deletions elasticsearch/_async/client/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import logging
from typing import Any, Collection, MutableMapping, Optional, Tuple, Type, Union

from ..transport import AsyncTransport
from elastic_transport import AsyncTransport

from .async_search import AsyncSearchClient
from .autoscaling import AutoscalingClient
from .cat import CatClient
Expand Down Expand Up @@ -1110,8 +1111,8 @@ class AsyncElasticsearch:
) -> Any: ...
async def open_point_in_time(
self,
index: Any,
*,
index: Optional[Any] = ...,
expand_wildcards: Optional[Any] = ...,
ignore_unavailable: Optional[Any] = ...,
keep_alive: Optional[Any] = ...,
Expand Down Expand Up @@ -1162,6 +1163,7 @@ class AsyncElasticsearch:
grid_precision: Optional[Any] = ...,
grid_type: Optional[Any] = ...,
size: Optional[Any] = ...,
track_total_hits: Optional[Any] = ...,
pretty: Optional[bool] = ...,
human: Optional[bool] = ...,
error_trace: Optional[bool] = ...,
Expand Down
Loading