Skip to content

Commit df3e53d

Browse files
authored
bpo-45835: Fix race condition in test_queue (#29601)
Some of the tests in test_queue had a race condition in which a non-sentinel value could be enqueued after the final sentinel value leading to not all the inputs being processed (and test failures). This changes feed() to enqueue a sentinel once the inputs are exhausted, which guarantees that the final queued object is a sentinel. This requires the number of feeder threads to match the number of consumer threads, but that's already the case in the relevant tests.
1 parent 25ecc04 commit df3e53d

File tree

2 files changed

+12
-11
lines changed

2 files changed

+12
-11
lines changed

Lib/test/test_queue.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -420,11 +420,12 @@ class BaseSimpleQueueTest:
420420
def setUp(self):
421421
self.q = self.type2test()
422422

423-
def feed(self, q, seq, rnd):
423+
def feed(self, q, seq, rnd, sentinel):
424424
while True:
425425
try:
426426
val = seq.pop()
427427
except IndexError:
428+
q.put(sentinel)
428429
return
429430
q.put(val)
430431
if rnd.random() > 0.5:
@@ -463,11 +464,10 @@ def consume_timeout(self, q, results, sentinel):
463464
return
464465
results.append(val)
465466

466-
def run_threads(self, n_feeders, n_consumers, q, inputs,
467-
feed_func, consume_func):
467+
def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
468468
results = []
469469
sentinel = None
470-
seq = inputs + [sentinel] * n_consumers
470+
seq = inputs.copy()
471471
seq.reverse()
472472
rnd = random.Random(42)
473473

@@ -481,11 +481,11 @@ def wrapper(*args, **kwargs):
481481
return wrapper
482482

483483
feeders = [threading.Thread(target=log_exceptions(feed_func),
484-
args=(q, seq, rnd))
485-
for i in range(n_feeders)]
484+
args=(q, seq, rnd, sentinel))
485+
for i in range(n_threads)]
486486
consumers = [threading.Thread(target=log_exceptions(consume_func),
487487
args=(q, results, sentinel))
488-
for i in range(n_consumers)]
488+
for i in range(n_threads)]
489489

490490
with threading_helper.start_threads(feeders + consumers):
491491
pass
@@ -543,7 +543,7 @@ def test_order(self):
543543
# Test a pair of concurrent put() and get()
544544
q = self.q
545545
inputs = list(range(100))
546-
results = self.run_threads(1, 1, q, inputs, self.feed, self.consume)
546+
results = self.run_threads(1, q, inputs, self.feed, self.consume)
547547

548548
# One producer, one consumer => results appended in well-defined order
549549
self.assertEqual(results, inputs)
@@ -553,7 +553,7 @@ def test_many_threads(self):
553553
N = 50
554554
q = self.q
555555
inputs = list(range(10000))
556-
results = self.run_threads(N, N, q, inputs, self.feed, self.consume)
556+
results = self.run_threads(N, q, inputs, self.feed, self.consume)
557557

558558
# Multiple consumers without synchronization append the
559559
# results in random order
@@ -564,7 +564,7 @@ def test_many_threads_nonblock(self):
564564
N = 50
565565
q = self.q
566566
inputs = list(range(10000))
567-
results = self.run_threads(N, N, q, inputs,
567+
results = self.run_threads(N, q, inputs,
568568
self.feed, self.consume_nonblock)
569569

570570
self.assertEqual(sorted(results), inputs)
@@ -574,7 +574,7 @@ def test_many_threads_timeout(self):
574574
N = 50
575575
q = self.q
576576
inputs = list(range(1000))
577-
results = self.run_threads(N, N, q, inputs,
577+
results = self.run_threads(N, q, inputs,
578578
self.feed, self.consume_timeout)
579579

580580
self.assertEqual(sorted(results), inputs)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix race condition in test_queue tests with multiple "feeder" threads.

0 commit comments

Comments
 (0)