Skip to content

bpo-29293: multiprocessing.Condition.notify() lacks parameter n #2480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,8 +999,8 @@ class ConditionProxy(AcquirerProxy):
_exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
def notify(self):
return self._callmethod('notify')
def notify(self, n=1):
return self._callmethod('notify', (n,))
def notify_all(self):
return self._callmethod('notify_all')
def wait_for(self, predicate, timeout=None):
Expand Down
24 changes: 5 additions & 19 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,24 +268,7 @@ def wait(self, timeout=None):
for i in range(count):
self._lock.acquire()

def notify(self):
assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False)

# to take account of timeouts since last notify() we subtract
# woken_count from sleeping_count and rezero woken_count
while self._woken_count.acquire(False):
res = self._sleeping_count.acquire(False)
assert res

if self._sleeping_count.acquire(False): # try grabbing a sleeper
self._wait_semaphore.release() # wake up one sleeper
self._woken_count.acquire() # wait for the sleeper to wake

# rezero _wait_semaphore in case a timeout just happened
self._wait_semaphore.acquire(False)

def notify_all(self):
def notify(self, n=1):
assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False)

Expand All @@ -296,7 +279,7 @@ def notify_all(self):
assert res

sleepers = 0
while self._sleeping_count.acquire(False):
while sleepers < n and self._sleeping_count.acquire(False):
self._wait_semaphore.release() # wake up one sleeper
sleepers += 1

Expand All @@ -308,6 +291,9 @@ def notify_all(self):
while self._wait_semaphore.acquire(False):
pass

def notify_all(self):
self.notify(n=sys.maxsize)

def wait_for(self, predicate, timeout=None):
result = predicate()
if result:
Expand Down
66 changes: 59 additions & 7 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,17 @@ def f(cls, cond, sleeping, woken, timeout=None):
woken.release()
cond.release()

def assertReachesEventually(self, func, value):
for i in range(10):
try:
if func() == value:
break
except NotImplementedError:
break
time.sleep(DELTA)
time.sleep(DELTA)
self.assertReturnsIfImplemented(value, func)

def check_invariant(self, cond):
# this is only supposed to succeed when there are no sleepers
if self.TYPE == 'processes':
Expand Down Expand Up @@ -1055,13 +1066,54 @@ def test_notify_all(self):
cond.release()

# check they have all woken
for i in range(10):
try:
if get_value(woken) == 6:
break
except NotImplementedError:
break
time.sleep(DELTA)
self.assertReachesEventually(lambda: get_value(woken), 6)

# check state is not mucked up
self.check_invariant(cond)

def test_notify_n(self):
cond = self.Condition()
sleeping = self.Semaphore(0)
woken = self.Semaphore(0)

# start some threads/processes
for i in range(3):
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()

t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
t.daemon = True
t.start()

# wait for them to all sleep
for i in range(6):
sleeping.acquire()

# check no process/thread has woken up
time.sleep(DELTA)
self.assertReturnsIfImplemented(0, get_value, woken)

# wake some of them up
cond.acquire()
cond.notify(n=2)
cond.release()

# check 2 have woken
self.assertReachesEventually(lambda: get_value(woken), 2)

# wake the rest of them
cond.acquire()
cond.notify(n=4)
cond.release()

self.assertReachesEventually(lambda: get_value(woken), 6)

# doesn't do anything more
cond.acquire()
cond.notify(n=3)
cond.release()

self.assertReturnsIfImplemented(6, get_value, woken)

# check state is not mucked up
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add missing parameter "n" on multiprocessing.Condition.notify().

The doc claims multiprocessing.Condition behaves like threading.Condition,
but its notify() method lacked the optional "n" argument (to specify the
number of sleepers to wake up) that threading.Condition.notify() accepts.