Skip to content

Commit 4835041

Browse files
authored
bpo-29293: multiprocessing.Condition.notify() lacks parameter n (#2480)
* bpo-29293: multiprocessing.Condition.notify() lacks parameter `n` * Add NEWS blurb
1 parent d3ed287 commit 4835041

File tree

4 files changed

+71
-28
lines changed

4 files changed

+71
-28
lines changed

Lib/multiprocessing/managers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -999,8 +999,8 @@ class ConditionProxy(AcquirerProxy):
999999
_exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
10001000
def wait(self, timeout=None):
10011001
return self._callmethod('wait', (timeout,))
1002-
def notify(self):
1003-
return self._callmethod('notify')
1002+
def notify(self, n=1):
1003+
return self._callmethod('notify', (n,))
10041004
def notify_all(self):
10051005
return self._callmethod('notify_all')
10061006
def wait_for(self, predicate, timeout=None):

Lib/multiprocessing/synchronize.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -268,24 +268,7 @@ def wait(self, timeout=None):
268268
for i in range(count):
269269
self._lock.acquire()
270270

271-
def notify(self):
272-
assert self._lock._semlock._is_mine(), 'lock is not owned'
273-
assert not self._wait_semaphore.acquire(False)
274-
275-
# to take account of timeouts since last notify() we subtract
276-
# woken_count from sleeping_count and rezero woken_count
277-
while self._woken_count.acquire(False):
278-
res = self._sleeping_count.acquire(False)
279-
assert res
280-
281-
if self._sleeping_count.acquire(False): # try grabbing a sleeper
282-
self._wait_semaphore.release() # wake up one sleeper
283-
self._woken_count.acquire() # wait for the sleeper to wake
284-
285-
# rezero _wait_semaphore in case a timeout just happened
286-
self._wait_semaphore.acquire(False)
287-
288-
def notify_all(self):
271+
def notify(self, n=1):
289272
assert self._lock._semlock._is_mine(), 'lock is not owned'
290273
assert not self._wait_semaphore.acquire(False)
291274

@@ -296,7 +279,7 @@ def notify_all(self):
296279
assert res
297280

298281
sleepers = 0
299-
while self._sleeping_count.acquire(False):
282+
while sleepers < n and self._sleeping_count.acquire(False):
300283
self._wait_semaphore.release() # wake up one sleeper
301284
sleepers += 1
302285

@@ -308,6 +291,9 @@ def notify_all(self):
308291
while self._wait_semaphore.acquire(False):
309292
pass
310293

294+
def notify_all(self):
295+
self.notify(n=sys.maxsize)
296+
311297
def wait_for(self, predicate, timeout=None):
312298
result = predicate()
313299
if result:

Lib/test/_test_multiprocessing.py

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,17 @@ def f(cls, cond, sleeping, woken, timeout=None):
948948
woken.release()
949949
cond.release()
950950

951+
def assertReachesEventually(self, func, value):
952+
for i in range(10):
953+
try:
954+
if func() == value:
955+
break
956+
except NotImplementedError:
957+
break
958+
time.sleep(DELTA)
959+
time.sleep(DELTA)
960+
self.assertReturnsIfImplemented(value, func)
961+
951962
def check_invariant(self, cond):
952963
# this is only supposed to succeed when there are no sleepers
953964
if self.TYPE == 'processes':
@@ -1055,13 +1066,54 @@ def test_notify_all(self):
10551066
cond.release()
10561067

10571068
# check they have all woken
1058-
for i in range(10):
1059-
try:
1060-
if get_value(woken) == 6:
1061-
break
1062-
except NotImplementedError:
1063-
break
1064-
time.sleep(DELTA)
1069+
self.assertReachesEventually(lambda: get_value(woken), 6)
1070+
1071+
# check state is not mucked up
1072+
self.check_invariant(cond)
1073+
1074+
def test_notify_n(self):
1075+
cond = self.Condition()
1076+
sleeping = self.Semaphore(0)
1077+
woken = self.Semaphore(0)
1078+
1079+
# start some threads/processes
1080+
for i in range(3):
1081+
p = self.Process(target=self.f, args=(cond, sleeping, woken))
1082+
p.daemon = True
1083+
p.start()
1084+
1085+
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1086+
t.daemon = True
1087+
t.start()
1088+
1089+
# wait for them to all sleep
1090+
for i in range(6):
1091+
sleeping.acquire()
1092+
1093+
# check no process/thread has woken up
1094+
time.sleep(DELTA)
1095+
self.assertReturnsIfImplemented(0, get_value, woken)
1096+
1097+
# wake some of them up
1098+
cond.acquire()
1099+
cond.notify(n=2)
1100+
cond.release()
1101+
1102+
# check 2 have woken
1103+
self.assertReachesEventually(lambda: get_value(woken), 2)
1104+
1105+
# wake the rest of them
1106+
cond.acquire()
1107+
cond.notify(n=4)
1108+
cond.release()
1109+
1110+
self.assertReachesEventually(lambda: get_value(woken), 6)
1111+
1112+
# doesn't do anything more
1113+
cond.acquire()
1114+
cond.notify(n=3)
1115+
cond.release()
1116+
10651117
self.assertReturnsIfImplemented(6, get_value, woken)
10661118

10671119
# check state is not mucked up
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Add missing parameter "n" on multiprocessing.Condition.notify().
2+
3+
The doc claims multiprocessing.Condition behaves like threading.Condition,
4+
but its notify() method lacked the optional "n" argument (to specify the
5+
number of sleepers to wake up) that threading.Condition.notify() accepts.

0 commit comments

Comments
 (0)