Skip to content

Commit 84ece0b

Browse files
authored
add execute_concurrent_async and expose execute_concurrent_* in Session (#1229)
1 parent ef176a5 commit 84ece0b

File tree

3 files changed

+192
-66
lines changed

3 files changed

+192
-66
lines changed

cassandra/cluster.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2725,6 +2725,98 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
27252725
future.send_request()
27262726
return future
27272727

2728+
def execute_concurrent(self, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
2729+
"""
2730+
Executes a sequence of (statement, parameters) tuples concurrently. Each
2731+
``parameters`` item must be a sequence or :const:`None`.
2732+
2733+
The `concurrency` parameter controls how many statements will be executed
2734+
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
2735+
it is recommended that this be kept below 100 times the number of
2736+
core connections per host times the number of connected hosts (see
2737+
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
2738+
the event loop thread may attempt to block on new connection creation,
2739+
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
2740+
is 3 or higher, you can safely experiment with higher levels of concurrency.
2741+
2742+
If `raise_on_first_error` is left as :const:`True`, execution will stop
2743+
after the first failed statement and the corresponding exception will be
2744+
raised.
2745+
2746+
`results_generator` controls how the results are returned.
2747+
2748+
* If :const:`False`, the results are returned only after all requests have completed.
2749+
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
2750+
memory footprint when the results set will be large -- results are yielded
2751+
as they return instead of materializing the entire list at once. The trade for lower memory
2752+
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
2753+
on-the-fly).
2754+
2755+
`execution_profile` argument is the execution profile to use for this
2756+
request, it is passed directly to :meth:`Session.execute_async`.
2757+
2758+
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
2759+
in the same order that the statements were passed in. If ``success`` is :const:`False`,
2760+
there was an error executing the statement, and ``result_or_exc``
2761+
will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
2762+
will be the query result.
2763+
2764+
Example usage::
2765+
2766+
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
2767+
2768+
statements_and_params = []
2769+
for user_id in user_ids:
2770+
params = (user_id, )
2771+
statements_and_params.append((select_statement, params))
2772+
2773+
results = session.execute_concurrent(statements_and_params, raise_on_first_error=False)
2774+
2775+
for (success, result) in results:
2776+
if not success:
2777+
handle_error(result) # result will be an Exception
2778+
else:
2779+
process_user(result[0]) # result will be a list of rows
2780+
2781+
Note: in the case that `generators` are used, it is important to ensure the consumers do not
2782+
block or attempt further synchronous requests, because no further IO will be processed until
2783+
the consumer returns. This may also produce a deadlock in the IO event thread.
2784+
"""
2785+
from cassandra.concurrent import execute_concurrent
2786+
return execute_concurrent(self, statements_and_parameters, concurrency, raise_on_first_error, results_generator, execution_profile)
2787+
2788+
def execute_concurrent_with_args(self, statement, parameters, *args, **kwargs):
2789+
"""
2790+
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
2791+
statement and a sequence of parameters. Each item in ``parameters``
2792+
should be a sequence or :const:`None`.
2793+
2794+
Example usage::
2795+
2796+
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
2797+
parameters = [(x,) for x in range(1000)]
2798+
session.execute_concurrent_with_args(statement, parameters, concurrency=50)
2799+
"""
2800+
from cassandra.concurrent import execute_concurrent_with_args
2801+
return execute_concurrent_with_args(self, statement, parameters, *args, **kwargs)
2802+
2803+
def execute_concurrent_async(self, statements_and_parameters, concurrency=100, raise_on_first_error=False, execution_profile=EXEC_PROFILE_DEFAULT):
2804+
"""
2805+
Asynchronously executes a sequence of (statement, parameters) tuples concurrently.
2806+
2807+
Args:
2808+
session: Cassandra session object.
2809+
statement_and_parameters: Iterable of (prepared CQL statement, bind parameters) tuples.
2810+
concurrency (int, optional): Number of concurrent operations. Default is 100.
2811+
raise_on_first_error (bool, optional): If True, execution stops on the first error. Default is True.
2812+
execution_profile (ExecutionProfile, optional): Execution profile to use. Default is EXEC_PROFILE_DEFAULT.
2813+
2814+
Returns:
2815+
A `Future` object that will be completed when all operations are done.
2816+
"""
2817+
from cassandra.concurrent import execute_concurrent_async
2818+
return execute_concurrent_async(self, statements_and_parameters, concurrency, raise_on_first_error, execution_profile)
2819+
27282820
def execute_graph(self, query, parameters=None, trace=False, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT, execute_as=None):
27292821
"""
27302822
Executes a Gremlin query string or GraphStatement synchronously,

cassandra/concurrent.py

Lines changed: 47 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -13,77 +13,23 @@
1313
# limitations under the License.
1414

1515

16+
import logging
1617
from collections import namedtuple
18+
from concurrent.futures import Future
1719
from heapq import heappush, heappop
1820
from itertools import cycle
1921
from threading import Condition
20-
import sys
2122

2223
from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT
2324

24-
import logging
2525
log = logging.getLogger(__name__)
2626

2727

2828
ExecutionResult = namedtuple('ExecutionResult', ['success', 'result_or_exc'])
2929

3030
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
3131
"""
32-
Executes a sequence of (statement, parameters) tuples concurrently. Each
33-
``parameters`` item must be a sequence or :const:`None`.
34-
35-
The `concurrency` parameter controls how many statements will be executed
36-
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
37-
it is recommended that this be kept below 100 times the number of
38-
core connections per host times the number of connected hosts (see
39-
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
40-
the event loop thread may attempt to block on new connection creation,
41-
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
42-
is 3 or higher, you can safely experiment with higher levels of concurrency.
43-
44-
If `raise_on_first_error` is left as :const:`True`, execution will stop
45-
after the first failed statement and the corresponding exception will be
46-
raised.
47-
48-
`results_generator` controls how the results are returned.
49-
50-
* If :const:`False`, the results are returned only after all requests have completed.
51-
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
52-
memory footprint when the results set will be large -- results are yielded
53-
as they return instead of materializing the entire list at once. The trade for lower memory
54-
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
55-
on-the-fly).
56-
57-
`execution_profile` argument is the execution profile to use for this
58-
request, it is passed directly to :meth:`Session.execute_async`.
59-
60-
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
61-
in the same order that the statements were passed in. If ``success`` is :const:`False`,
62-
there was an error executing the statement, and ``result_or_exc`` will be
63-
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
64-
will be the query result.
65-
66-
Example usage::
67-
68-
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
69-
70-
statements_and_params = []
71-
for user_id in user_ids:
72-
params = (user_id, )
73-
statements_and_params.append((select_statement, params))
74-
75-
results = execute_concurrent(
76-
session, statements_and_params, raise_on_first_error=False)
77-
78-
for (success, result) in results:
79-
if not success:
80-
handle_error(result) # result will be an Exception
81-
else:
82-
process_user(result[0]) # result will be a list of rows
83-
84-
Note: in the case that `generators` are used, it is important to ensure the consumers do not
85-
block or attempt further synchronous requests, because no further IO will be processed until
86-
the consumer returns. This may also produce a deadlock in the IO event thread.
32+
See :meth:`.Session.execute_concurrent`.
8733
"""
8834
if concurrency <= 0:
8935
raise ValueError("concurrency must be greater than 0")
@@ -216,14 +162,50 @@ def _results(self):
216162

217163
def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs):
218164
"""
219-
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
220-
statement and a sequence of parameters. Each item in ``parameters``
221-
should be a sequence or :const:`None`.
165+
See :meth:`.Session.execute_concurrent_with_args`.
166+
"""
167+
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)
222168

223-
Example usage::
224169

225-
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
226-
parameters = [(x,) for x in range(1000)]
227-
execute_concurrent_with_args(session, statement, parameters, concurrency=50)
170+
class ConcurrentExecutorFutureResults(ConcurrentExecutorListResults):
171+
def __init__(self, session, statements_and_params, execution_profile, future):
172+
super().__init__(session, statements_and_params, execution_profile)
173+
self.future = future
174+
175+
def _put_result(self, result, idx, success):
176+
super()._put_result(result, idx, success)
177+
with self._condition:
178+
if self._current == self._exec_count:
179+
if self._exception and self._fail_fast:
180+
self.future.set_exception(self._exception)
181+
else:
182+
sorted_results = [r[1] for r in sorted(self._results_queue)]
183+
self.future.set_result(sorted_results)
184+
185+
186+
def execute_concurrent_async(
187+
session,
188+
statements_and_parameters,
189+
concurrency=100,
190+
raise_on_first_error=False,
191+
execution_profile=EXEC_PROFILE_DEFAULT
192+
):
228193
"""
229-
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)
194+
See :meth:`.Session.execute_concurrent_async`.
195+
"""
196+
# Create a Future object and initialize the custom ConcurrentExecutor with the Future
197+
future = Future()
198+
executor = ConcurrentExecutorFutureResults(
199+
session=session,
200+
statements_and_params=statements_and_parameters,
201+
execution_profile=execution_profile,
202+
future=future
203+
)
204+
205+
# Execute concurrently
206+
try:
207+
executor.execute(concurrency=concurrency, fail_fast=raise_on_first_error)
208+
except Exception as e:
209+
future.set_exception(e)
210+
211+
return future

tests/unit/test_concurrent.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import platform
2525

2626
from cassandra.cluster import Cluster, Session
27-
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
27+
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args, execute_concurrent_async
2828
from cassandra.pool import Host
2929
from cassandra.policies import SimpleConvictionPolicy
3030
from tests.unit.utils import mock_session_pools
@@ -239,6 +239,58 @@ def validate_result_ordering(self, results):
239239
self.assertLess(last_time_added, current_time_added)
240240
last_time_added = current_time_added
241241

242+
def insert_and_validate_list_async(self, reverse, slowdown):
243+
"""
244+
This utility method will execute submit various statements for execution using execute_concurrent_async,
245+
then invoke a separate thread to execute the callback associated with the futures registered
246+
for those statements. The parameters will toggle various timing, and ordering changes.
247+
Finally it will validate that the results were returned in the order they were submitted
248+
:param reverse: Execute the callbacks in the opposite order that they were submitted
249+
:param slowdown: Cause intermittent queries to perform slowly
250+
"""
251+
our_handler = MockResponseResponseFuture(reverse=reverse)
252+
mock_session = Mock()
253+
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
254+
[(i, ) for i in range(100)])
255+
mock_session.execute_async.return_value = our_handler
256+
257+
t = TimedCallableInvoker(our_handler, slowdown=slowdown)
258+
t.start()
259+
try:
260+
future = execute_concurrent_async(mock_session, statements_and_params)
261+
results = future.result()
262+
self.validate_result_ordering(results)
263+
finally:
264+
t.stop()
265+
266+
def test_results_ordering_async_forward(self):
267+
"""
268+
This tests the ordering of our execute_concurrent_async function
269+
when queries complete in the order they were executed.
270+
"""
271+
self.insert_and_validate_list_async(False, False)
272+
273+
def test_results_ordering_async_reverse(self):
274+
"""
275+
This tests the ordering of our execute_concurrent_async function
276+
when queries complete in the reverse order they were executed.
277+
"""
278+
self.insert_and_validate_list_async(True, False)
279+
280+
def test_results_ordering_async_forward_slowdown(self):
281+
"""
282+
This tests the ordering of our execute_concurrent_async function
283+
when queries complete in the order they were executed, with slow queries mixed in.
284+
"""
285+
self.insert_and_validate_list_async(False, True)
286+
287+
def test_results_ordering_async_reverse_slowdown(self):
288+
"""
289+
This tests the ordering of our execute_concurrent_async function
290+
when queries complete in the reverse order they were executed, with slow queries mixed in.
291+
"""
292+
self.insert_and_validate_list_async(True, True)
293+
242294
@mock_session_pools
243295
def test_recursion_limited(self):
244296
"""

0 commit comments

Comments
 (0)