Skip to content

feat: Add support for routing_keys in index_documents #3148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion awswrangler/opensearch/_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def _actions_generator(
doc_type: str | None,
keys_to_write: list[str] | None,
id_keys: list[str] | None,
routing_keys: list[str] | None = None,
bulk_size: int = 10000,
) -> Generator[list[dict[str, Any]], None, None]:
bulk_chunk_documents = []
Expand All @@ -77,6 +78,11 @@ def _actions_generator(
"_source": _selected_keys(document, keys_to_write),
}
)

if routing_keys:
_routing = "-".join([str(document[routing_key]) for routing_key in routing_keys])
bulk_chunk_documents[-1]["_routing"] = _routing

if (i + 1) % bulk_size == 0:
yield bulk_chunk_documents
bulk_chunk_documents = []
Expand Down Expand Up @@ -496,6 +502,7 @@ def index_documents(
doc_type: str | None = None,
keys_to_write: list[str] | None = None,
id_keys: list[str] | None = None,
routing_keys: list[str] | None = None,
ignore_status: list[Any] | tuple[Any] | None = None,
bulk_size: int = 1000,
chunk_size: int | None = 500,
Expand Down Expand Up @@ -540,6 +547,8 @@ def index_documents(
id_keys
list of keys that compound document unique id. If not provided will use `_id` key if exists,
otherwise will generate unique identifier for each document.
routing_keys
list of keys that compound document routing key. Optional.
ignore_status
list of HTTP status codes that you want to ignore (not raising an exception)
bulk_size
Expand Down Expand Up @@ -599,7 +608,7 @@ def index_documents(
_logger.debug("indexing %s documents into %s", total_documents, index)

actions = _actions_generator(
documents, index, doc_type, keys_to_write=keys_to_write, id_keys=id_keys, bulk_size=bulk_size
documents, index, doc_type, keys_to_write=keys_to_write, id_keys=id_keys, routing_keys=routing_keys, bulk_size=bulk_size
)

success = 0
Expand Down
Loading