@@ -61,6 +61,7 @@ def _actions_generator(
61
61
doc_type : str | None ,
62
62
keys_to_write : list [str ] | None ,
63
63
id_keys : list [str ] | None ,
64
+ routing_keys : list [str ] | None = None ,
64
65
bulk_size : int = 10000 ,
65
66
) -> Generator [list [dict [str , Any ]], None , None ]:
66
67
bulk_chunk_documents = []
@@ -77,6 +78,11 @@ def _actions_generator(
77
78
"_source" : _selected_keys (document , keys_to_write ),
78
79
}
79
80
)
81
+
82
+ if routing_keys :
83
+ _routing = "-" .join ([str (document [routing_key ]) for routing_key in routing_keys ])
84
+ bulk_chunk_documents [- 1 ]["_routing" ] = _routing
85
+
80
86
if (i + 1 ) % bulk_size == 0 :
81
87
yield bulk_chunk_documents
82
88
bulk_chunk_documents = []
@@ -496,6 +502,7 @@ def index_documents(
496
502
doc_type : str | None = None ,
497
503
keys_to_write : list [str ] | None = None ,
498
504
id_keys : list [str ] | None = None ,
505
+ routing_keys : list [str ] | None = None ,
499
506
ignore_status : list [Any ] | tuple [Any ] | None = None ,
500
507
bulk_size : int = 1000 ,
501
508
chunk_size : int | None = 500 ,
@@ -540,6 +547,8 @@ def index_documents(
540
547
id_keys
541
548
list of keys that compound document unique id. If not provided will use `_id` key if exists,
542
549
otherwise will generate unique identifier for each document.
550
+ routing_keys
551
+ list of keys that compound document routing key. Optional.
543
552
ignore_status
544
553
list of HTTP status codes that you want to ignore (not raising an exception)
545
554
bulk_size
@@ -599,7 +608,7 @@ def index_documents(
599
608
_logger .debug ("indexing %s documents into %s" , total_documents , index )
600
609
601
610
actions = _actions_generator (
602
- documents , index , doc_type , keys_to_write = keys_to_write , id_keys = id_keys , bulk_size = bulk_size
611
+ documents , index , doc_type , keys_to_write = keys_to_write , id_keys = id_keys , routing_keys = routing_keys , bulk_size = bulk_size
603
612
)
604
613
605
614
success = 0
0 commit comments