Skip to content

[3.6] bpo-26762, bpo-31019: Backport multiprocessing fixes from master to 3.6 #2879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Lib/multiprocessing/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,16 @@ def is_alive(self):
if self is _current_process:
return True
assert self._parent_pid == os.getpid(), 'can only test a child process'

if self._popen is None:
return False
self._popen.poll()
return self._popen.returncode is None

returncode = self._popen.poll()
if returncode is None:
return True
else:
_children.discard(self)
return False

@property
def name(self):
Expand Down
125 changes: 103 additions & 22 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
# without thread support.
import threading

import multiprocessing.dummy
import multiprocessing.connection
import multiprocessing.managers
import multiprocessing.dummy
import multiprocessing.heap
import multiprocessing.managers
import multiprocessing.pool
import multiprocessing.queues

from multiprocessing import util

Expand Down Expand Up @@ -64,6 +65,13 @@
def latin(s):
return s.encode('latin')


def close_queue(queue):
if isinstance(queue, multiprocessing.queues.Queue):
queue.close()
queue.join_thread()


#
# Constants
#
Expand Down Expand Up @@ -275,6 +283,7 @@ def test_process(self):
self.assertEqual(p.exitcode, 0)
self.assertEqual(p.is_alive(), False)
self.assertNotIn(p, self.active_children())
close_queue(q)

@classmethod
def _test_terminate(cls):
Expand Down Expand Up @@ -414,6 +423,7 @@ def test_lose_target_ref(self):
p.join()
self.assertIs(wr(), None)
self.assertEqual(q.get(), 5)
close_queue(q)


#
Expand Down Expand Up @@ -600,6 +610,7 @@ def test_put(self):
self.assertEqual(queue_full(queue, MAXSIZE), False)

proc.join()
close_queue(queue)

@classmethod
def _test_get(cls, queue, child_can_start, parent_can_continue):
Expand Down Expand Up @@ -662,6 +673,7 @@ def test_get(self):
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)

proc.join()
close_queue(queue)

@classmethod
def _test_fork(cls, queue):
Expand Down Expand Up @@ -697,6 +709,7 @@ def test_fork(self):
self.assertRaises(pyqueue.Empty, queue.get, False)

p.join()
close_queue(queue)

def test_qsize(self):
q = self.Queue()
Expand All @@ -712,6 +725,7 @@ def test_qsize(self):
self.assertEqual(q.qsize(), 1)
q.get()
self.assertEqual(q.qsize(), 0)
close_queue(q)

@classmethod
def _test_task_done(cls, q):
Expand Down Expand Up @@ -739,6 +753,7 @@ def test_task_done(self):

for p in workers:
p.join()
close_queue(queue)

def test_no_import_lock_contention(self):
with test.support.temp_cwd():
Expand Down Expand Up @@ -769,6 +784,7 @@ def test_timeout(self):
# Tolerate a delta of 30 ms because of the bad clock resolution on
# Windows (usually 15.6 ms)
self.assertGreaterEqual(delta, 0.170)
close_queue(q)

def test_queue_feeder_donot_stop_onexc(self):
# bpo-30414: verify feeder handles exceptions correctly
Expand All @@ -782,7 +798,9 @@ def __reduce__(self):
q = self.Queue()
q.put(NotSerializable())
q.put(True)
self.assertTrue(q.get(timeout=0.1))
# bpo-30595: use a timeout of 1 second for slow buildbots
self.assertTrue(q.get(timeout=1.0))
close_queue(q)

#
#
Expand Down Expand Up @@ -895,10 +913,12 @@ def test_notify(self):
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
self.addCleanup(p.join)

p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
self.addCleanup(p.join)

# wait for both children to start sleeping
sleeping.acquire()
Expand Down Expand Up @@ -941,11 +961,13 @@ def test_notify_all(self):
args=(cond, sleeping, woken, TIMEOUT1))
p.daemon = True
p.start()
self.addCleanup(p.join)

t = threading.Thread(target=self.f,
args=(cond, sleeping, woken, TIMEOUT1))
t.daemon = True
t.start()
self.addCleanup(t.join)

# wait for them all to sleep
for i in range(6):
Expand All @@ -964,10 +986,12 @@ def test_notify_all(self):
p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True
p.start()
self.addCleanup(p.join)

t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
t.daemon = True
t.start()
self.addCleanup(t.join)

# wait for them to all sleep
for i in range(6):
Expand Down Expand Up @@ -1143,6 +1167,7 @@ def test_event(self):
p.daemon = True
p.start()
self.assertEqual(wait(), True)
p.join()

#
# Tests for Barrier - adapted from tests in test/lock_tests.py
Expand Down Expand Up @@ -1318,6 +1343,7 @@ def test_wait_return(self):
self.run_threads(self._test_wait_return_f, (self.barrier, queue))
results = [queue.get() for i in range(self.N)]
self.assertEqual(results.count(0), 1)
close_queue(queue)

@classmethod
def _test_action_f(cls, barrier, results):
Expand Down Expand Up @@ -1488,6 +1514,7 @@ def test_thousand(self):
p = self.Process(target=self._test_thousand_f,
args=(self.barrier, passes, child_conn, lock))
p.start()
self.addCleanup(p.join)

for i in range(passes):
for j in range(self.N):
Expand Down Expand Up @@ -2971,6 +2998,8 @@ def test_access(self):
w.close()
self.assertEqual(conn.recv(), 'foobar'*2)

p.join()

#
#
#
Expand Down Expand Up @@ -3296,16 +3325,16 @@ def test_level(self):

logger.setLevel(LEVEL1)
p = self.Process(target=self._test_level, args=(writer,))
p.daemon = True
p.start()
self.assertEqual(LEVEL1, reader.recv())
p.join()

logger.setLevel(logging.NOTSET)
root_logger.setLevel(LEVEL2)
p = self.Process(target=self._test_level, args=(writer,))
p.daemon = True
p.start()
self.assertEqual(LEVEL2, reader.recv())
p.join()

root_logger.setLevel(root_level)
logger.setLevel(level=LOG_LEVEL)
Expand Down Expand Up @@ -3459,7 +3488,7 @@ def _this_sub_process(q):
except pyqueue.Empty:
pass

def _test_process(q):
def _test_process():
queue = multiprocessing.Queue()
subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
subProc.daemon = True
Expand Down Expand Up @@ -3499,8 +3528,7 @@ def flush(self):
class TestStdinBadfiledescriptor(unittest.TestCase):

def test_queue_in_process(self):
queue = multiprocessing.Queue()
proc = multiprocessing.Process(target=_test_process, args=(queue,))
proc = multiprocessing.Process(target=_test_process)
proc.start()
proc.join()

Expand Down Expand Up @@ -4108,7 +4136,32 @@ def test_empty(self):
# Mixins
#

class ProcessesMixin(object):
class BaseMixin(object):
@classmethod
def setUpClass(cls):
cls.dangling = (multiprocessing.process._dangling.copy(),
threading._dangling.copy())

@classmethod
def tearDownClass(cls):
# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()

processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
if processes:
print('Warning -- Dangling processes: %s' % processes,
file=sys.stderr)
processes = None

threads = set(threading._dangling) - set(cls.dangling[1])
if threads:
print('Warning -- Dangling threads: %s' % threads,
file=sys.stderr)
threads = None


class ProcessesMixin(BaseMixin):
TYPE = 'processes'
Process = multiprocessing.Process
connection = multiprocessing.connection
Expand All @@ -4131,7 +4184,7 @@ class ProcessesMixin(object):
RawArray = staticmethod(multiprocessing.RawArray)


class ManagerMixin(object):
class ManagerMixin(BaseMixin):
TYPE = 'manager'
Process = multiprocessing.Process
Queue = property(operator.attrgetter('manager.Queue'))
Expand All @@ -4155,30 +4208,43 @@ def Pool(cls, *args, **kwds):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.manager = multiprocessing.Manager()

@classmethod
def tearDownClass(cls):
# only the manager process should be returned by active_children()
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1 and t < 5:
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
print("Warning -- multiprocessing.Manager still has %s active "
"children after %s seconds"
% (multiprocessing.active_children(), dt),
file=sys.stderr)
break

gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
# This is not really an error since some tests do not
# ensure that all processes which hold a reference to a
# managed object have been joined.
print('Shared objects which still exist at manager shutdown:')
print('Warning -- Shared objects which still exist at manager '
'shutdown:')
print(cls.manager._debug_info())
cls.manager.shutdown()
cls.manager.join()
cls.manager = None

super().tearDownClass()


class ThreadsMixin(object):
class ThreadsMixin(BaseMixin):
TYPE = 'threads'
Process = multiprocessing.dummy.Process
connection = multiprocessing.dummy.connection
Expand Down Expand Up @@ -4255,18 +4321,33 @@ def setUpModule():
multiprocessing.get_logger().setLevel(LOG_LEVEL)

def tearDownModule():
need_sleep = False

# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()

multiprocessing.set_start_method(old_start_method[0], force=True)
# pause a bit so we don't get warning about dangling threads/processes
time.sleep(0.5)
processes = set(multiprocessing.process._dangling) - set(dangling[0])
if processes:
need_sleep = True
print('Warning -- Dangling processes: %s' % processes,
file=sys.stderr)
processes = None

threads = set(threading._dangling) - set(dangling[1])
if threads:
need_sleep = True
print('Warning -- Dangling threads: %s' % threads,
file=sys.stderr)
threads = None

# Sleep 500 ms to give time to child processes to complete.
if need_sleep:
time.sleep(0.5)
multiprocessing.process._cleanup()
gc.collect()
tmp = set(multiprocessing.process._dangling) - set(dangling[0])
if tmp:
print('Dangling processes:', tmp, file=sys.stderr)
del tmp
tmp = set(threading._dangling) - set(dangling[1])
if tmp:
print('Dangling threads:', tmp, file=sys.stderr)
test.support.gc_collect()

remote_globs['setUpModule'] = setUpModule
remote_globs['tearDownModule'] = tearDownModule