From 930228a617dfce5c2d8e66f859fbfb037d3f8032 Mon Sep 17 00:00:00 2001 From: Yevhenii Hyzyla Date: Sat, 11 Dec 2021 16:36:03 +0200 Subject: [PATCH] bpo-45997: Fix asyncio.Semaphore waiters order --- Lib/asyncio/locks.py | 11 ++++++++ Lib/test/test_asyncio/test_locks.py | 39 +++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py index 4fef64e3921e17..c15132e03fff4b 100644 --- a/Lib/asyncio/locks.py +++ b/Lib/asyncio/locks.py @@ -6,6 +6,7 @@ from . import exceptions from . import mixins +from . import tasks class _ContextManagerMixin: @@ -350,6 +351,7 @@ def __init__(self, value=1, *, loop=mixins._marker): raise ValueError("Semaphore initial value must be >= 0") self._value = value self._waiters = collections.deque() + self._skip_loop_iteration = False def __repr__(self): res = super().__repr__() @@ -365,6 +367,9 @@ def _wake_up_next(self): waiter.set_result(None) return + # Next acquire call should give a chance to other waiters acquire lock + self._skip_loop_iteration = True + def locked(self): """Returns True if semaphore can not be acquired immediately.""" return self._value == 0 @@ -378,6 +383,12 @@ async def acquire(self): called release() to make it larger than 0, and then return True. """ + + # Skip one loop iteration and give a change to next waiter to acquire + if self._skip_loop_iteration: + self._skip_loop_iteration = False + await tasks.sleep(0) + while self._value <= 0: fut = self._get_loop().create_future() self._waiters.append(fut) diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py index b2492c1acfecef..84d2a944a4a466 100644 --- a/Lib/test/test_asyncio/test_locks.py +++ b/Lib/test/test_asyncio/test_locks.py @@ -965,6 +965,45 @@ def test_release_no_waiters(self): sem.release() self.assertFalse(sem.locked()) + def test_acquire_fifo_order(self): + sem = asyncio.Semaphore(1) + result = [] + + async def c1(): + await sem.acquire() + result.append('c1_1') + sem.release() + + await sem.acquire() + result.append('c1_2') + sem.release() + + async def c2(): + await sem.acquire() + result.append('c2_1') + sem.release() + + await sem.acquire() + result.append('c2_2') + sem.release() + + async def c3(): + await sem.acquire() + result.append('c3_1') + sem.release() + + await sem.acquire() + result.append('c3_2') + sem.release() + + t1 = self.loop.create_task(c1()) + t2 = self.loop.create_task(c2()) + t3 = self.loop.create_task(c3()) + + race_tasks = [t1, t2, t3] + self.loop.run_until_complete(asyncio.gather(*race_tasks)) + self.assertEqual(['c1_1', 'c2_1', 'c3_1', 'c1_2', 'c2_2', 'c3_2'], result) + if __name__ == '__main__': unittest.main()