Skip to content

Commit 9450c75

Browse files
bpo-45835: Fix race condition in test_queue (GH-29601)
Some of the tests in test_queue had a race condition in which a non-sentinel value could be enqueued after the final sentinel value leading to not all the inputs being processed (and test failures). This changes feed() to enqueue a sentinel once the inputs are exhausted, which guarantees that the final queued object is a sentinel. This requires the number of feeder threads to match the number of consumer threads, but that's already the case in the relevant tests. (cherry picked from commit df3e53d) Co-authored-by: Sam Gross <[email protected]>
1 parent 71d842b commit 9450c75

File tree

2 files changed

+12
-11
lines changed

2 files changed

+12
-11
lines changed

Lib/test/test_queue.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -418,11 +418,12 @@ class BaseSimpleQueueTest:
418418
def setUp(self):
419419
self.q = self.type2test()
420420

421-
def feed(self, q, seq, rnd):
421+
def feed(self, q, seq, rnd, sentinel):
422422
while True:
423423
try:
424424
val = seq.pop()
425425
except IndexError:
426+
q.put(sentinel)
426427
return
427428
q.put(val)
428429
if rnd.random() > 0.5:
@@ -461,11 +462,10 @@ def consume_timeout(self, q, results, sentinel):
461462
return
462463
results.append(val)
463464

464-
def run_threads(self, n_feeders, n_consumers, q, inputs,
465-
feed_func, consume_func):
465+
def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
466466
results = []
467467
sentinel = None
468-
seq = inputs + [sentinel] * n_consumers
468+
seq = inputs.copy()
469469
seq.reverse()
470470
rnd = random.Random(42)
471471

@@ -479,11 +479,11 @@ def wrapper(*args, **kwargs):
479479
return wrapper
480480

481481
feeders = [threading.Thread(target=log_exceptions(feed_func),
482-
args=(q, seq, rnd))
483-
for i in range(n_feeders)]
482+
args=(q, seq, rnd, sentinel))
483+
for i in range(n_threads)]
484484
consumers = [threading.Thread(target=log_exceptions(consume_func),
485485
args=(q, results, sentinel))
486-
for i in range(n_consumers)]
486+
for i in range(n_threads)]
487487

488488
with support.start_threads(feeders + consumers):
489489
pass
@@ -541,7 +541,7 @@ def test_order(self):
541541
# Test a pair of concurrent put() and get()
542542
q = self.q
543543
inputs = list(range(100))
544-
results = self.run_threads(1, 1, q, inputs, self.feed, self.consume)
544+
results = self.run_threads(1, q, inputs, self.feed, self.consume)
545545

546546
# One producer, one consumer => results appended in well-defined order
547547
self.assertEqual(results, inputs)
@@ -551,7 +551,7 @@ def test_many_threads(self):
551551
N = 50
552552
q = self.q
553553
inputs = list(range(10000))
554-
results = self.run_threads(N, N, q, inputs, self.feed, self.consume)
554+
results = self.run_threads(N, q, inputs, self.feed, self.consume)
555555

556556
# Multiple consumers without synchronization append the
557557
# results in random order
@@ -562,7 +562,7 @@ def test_many_threads_nonblock(self):
562562
N = 50
563563
q = self.q
564564
inputs = list(range(10000))
565-
results = self.run_threads(N, N, q, inputs,
565+
results = self.run_threads(N, q, inputs,
566566
self.feed, self.consume_nonblock)
567567

568568
self.assertEqual(sorted(results), inputs)
@@ -572,7 +572,7 @@ def test_many_threads_timeout(self):
572572
N = 50
573573
q = self.q
574574
inputs = list(range(1000))
575-
results = self.run_threads(N, N, q, inputs,
575+
results = self.run_threads(N, q, inputs,
576576
self.feed, self.consume_timeout)
577577

578578
self.assertEqual(sorted(results), inputs)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix race condition in test_queue tests with multiple "feeder" threads.

0 commit comments

Comments
 (0)