diff --git a/awswrangler/opensearch/_write.py b/awswrangler/opensearch/_write.py index 07e2dc3b4..25dfe738c 100644 --- a/awswrangler/opensearch/_write.py +++ b/awswrangler/opensearch/_write.py @@ -61,6 +61,7 @@ def _actions_generator( doc_type: str | None, keys_to_write: list[str] | None, id_keys: list[str] | None, + routing_keys: list[str] | None = None, bulk_size: int = 10000, ) -> Generator[list[dict[str, Any]], None, None]: bulk_chunk_documents = [] @@ -77,6 +78,11 @@ def _actions_generator( "_source": _selected_keys(document, keys_to_write), } ) + + if routing_keys: + _routing = "-".join([str(document[routing_key]) for routing_key in routing_keys]) + bulk_chunk_documents[-1]["_routing"] = _routing + if (i + 1) % bulk_size == 0: yield bulk_chunk_documents bulk_chunk_documents = [] @@ -496,6 +502,7 @@ def index_documents( doc_type: str | None = None, keys_to_write: list[str] | None = None, id_keys: list[str] | None = None, + routing_keys: list[str] | None = None, ignore_status: list[Any] | tuple[Any] | None = None, bulk_size: int = 1000, chunk_size: int | None = 500, @@ -540,6 +547,8 @@ def index_documents( id_keys list of keys that compound document unique id. If not provided will use `_id` key if exists, otherwise will generate unique identifier for each document. + routing_keys + list of keys that compound document routing key. Optional. ignore_status list of HTTP status codes that you want to ignore (not raising an exception) bulk_size @@ -599,7 +608,7 @@ def index_documents( _logger.debug("indexing %s documents into %s", total_documents, index) actions = _actions_generator( - documents, index, doc_type, keys_to_write=keys_to_write, id_keys=id_keys, bulk_size=bulk_size + documents, index, doc_type, keys_to_write=keys_to_write, id_keys=id_keys, routing_keys=routing_keys, bulk_size=bulk_size ) success = 0