Skip to content

Commit d0adfb2

Browse files
authored
[3.6] bpo-26762, bpo-31019: Backport multiprocessing fixes from master to 3.6 (#2879)
* bpo-26762: Avoid daemon process in _test_multiprocessing (#2842) test_level() of _test_multiprocessing._TestLogging now uses regular processes rather than daemon processes to prevent zombi processes (to not "leak" processes). (cherry picked from commit 0663495) * test_multiprocessing: Fix dangling process/thread (#2850) bpo-26762: Fix more dangling processes and threads in test_multiprocessing: * Queue: call close() followed by join_thread() * Process: call join() or self.addCleanup(p.join) (cherry picked from commit d7e64d9) * test_multiprocessing detects dangling per test case (#2841) bpo-26762: test_multiprocessing now detects dangling processes and threads per test case classes: * setUpClass()/tearDownClass() of mixin classes now check if multiprocessing.process._dangling or threading._dangling was modified to detect "dangling" processses and threads. * ManagerMixin.tearDownClass() now also emits a warning if it still has more than one active child process after 5 seconds. * tearDownModule() now checks for dangling processes and threads before sleep 500 ms. And it now only sleeps if there is a least one dangling process or thread. (cherry picked from commit ffb4940) * bpo-26762: test_multiprocessing close more queues (#2855) * Close explicitly queues to make sure that we don't leave dangling threads * test_queue_in_process(): remove unused queue * test_access() joins also the process to fix a random warning (cherry picked from commit b4c5296) * bpo-31019: Fix multiprocessing.Process.is_alive() (#2875) multiprocessing.Process.is_alive() now removes the process from the _children set if the process completed. The change prevents leaking "dangling" processes. (cherry picked from commit 2db6482)
1 parent efe9fcb commit d0adfb2

File tree

2 files changed

+111
-24
lines changed

2 files changed

+111
-24
lines changed

Lib/multiprocessing/process.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,16 @@ def is_alive(self):
132132
if self is _current_process:
133133
return True
134134
assert self._parent_pid == os.getpid(), 'can only test a child process'
135+
135136
if self._popen is None:
136137
return False
137-
self._popen.poll()
138-
return self._popen.returncode is None
138+
139+
returncode = self._popen.poll()
140+
if returncode is None:
141+
return True
142+
else:
143+
_children.discard(self)
144+
return False
139145

140146
@property
141147
def name(self):

Lib/test/_test_multiprocessing.py

Lines changed: 103 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@
3232
# without thread support.
3333
import threading
3434

35-
import multiprocessing.dummy
3635
import multiprocessing.connection
37-
import multiprocessing.managers
36+
import multiprocessing.dummy
3837
import multiprocessing.heap
38+
import multiprocessing.managers
3939
import multiprocessing.pool
40+
import multiprocessing.queues
4041

4142
from multiprocessing import util
4243

@@ -64,6 +65,13 @@
6465
def latin(s):
6566
return s.encode('latin')
6667

68+
69+
def close_queue(queue):
70+
if isinstance(queue, multiprocessing.queues.Queue):
71+
queue.close()
72+
queue.join_thread()
73+
74+
6775
#
6876
# Constants
6977
#
@@ -275,6 +283,7 @@ def test_process(self):
275283
self.assertEqual(p.exitcode, 0)
276284
self.assertEqual(p.is_alive(), False)
277285
self.assertNotIn(p, self.active_children())
286+
close_queue(q)
278287

279288
@classmethod
280289
def _test_terminate(cls):
@@ -414,6 +423,7 @@ def test_lose_target_ref(self):
414423
p.join()
415424
self.assertIs(wr(), None)
416425
self.assertEqual(q.get(), 5)
426+
close_queue(q)
417427

418428

419429
#
@@ -600,6 +610,7 @@ def test_put(self):
600610
self.assertEqual(queue_full(queue, MAXSIZE), False)
601611

602612
proc.join()
613+
close_queue(queue)
603614

604615
@classmethod
605616
def _test_get(cls, queue, child_can_start, parent_can_continue):
@@ -662,6 +673,7 @@ def test_get(self):
662673
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
663674

664675
proc.join()
676+
close_queue(queue)
665677

666678
@classmethod
667679
def _test_fork(cls, queue):
@@ -697,6 +709,7 @@ def test_fork(self):
697709
self.assertRaises(pyqueue.Empty, queue.get, False)
698710

699711
p.join()
712+
close_queue(queue)
700713

701714
def test_qsize(self):
702715
q = self.Queue()
@@ -712,6 +725,7 @@ def test_qsize(self):
712725
self.assertEqual(q.qsize(), 1)
713726
q.get()
714727
self.assertEqual(q.qsize(), 0)
728+
close_queue(q)
715729

716730
@classmethod
717731
def _test_task_done(cls, q):
@@ -739,6 +753,7 @@ def test_task_done(self):
739753

740754
for p in workers:
741755
p.join()
756+
close_queue(queue)
742757

743758
def test_no_import_lock_contention(self):
744759
with test.support.temp_cwd():
@@ -769,6 +784,7 @@ def test_timeout(self):
769784
# Tolerate a delta of 30 ms because of the bad clock resolution on
770785
# Windows (usually 15.6 ms)
771786
self.assertGreaterEqual(delta, 0.170)
787+
close_queue(q)
772788

773789
def test_queue_feeder_donot_stop_onexc(self):
774790
# bpo-30414: verify feeder handles exceptions correctly
@@ -782,7 +798,9 @@ def __reduce__(self):
782798
q = self.Queue()
783799
q.put(NotSerializable())
784800
q.put(True)
785-
self.assertTrue(q.get(timeout=0.1))
801+
# bpo-30595: use a timeout of 1 second for slow buildbots
802+
self.assertTrue(q.get(timeout=1.0))
803+
close_queue(q)
786804

787805
#
788806
#
@@ -895,10 +913,12 @@ def test_notify(self):
895913
p = self.Process(target=self.f, args=(cond, sleeping, woken))
896914
p.daemon = True
897915
p.start()
916+
self.addCleanup(p.join)
898917

899918
p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
900919
p.daemon = True
901920
p.start()
921+
self.addCleanup(p.join)
902922

903923
# wait for both children to start sleeping
904924
sleeping.acquire()
@@ -941,11 +961,13 @@ def test_notify_all(self):
941961
args=(cond, sleeping, woken, TIMEOUT1))
942962
p.daemon = True
943963
p.start()
964+
self.addCleanup(p.join)
944965

945966
t = threading.Thread(target=self.f,
946967
args=(cond, sleeping, woken, TIMEOUT1))
947968
t.daemon = True
948969
t.start()
970+
self.addCleanup(t.join)
949971

950972
# wait for them all to sleep
951973
for i in range(6):
@@ -964,10 +986,12 @@ def test_notify_all(self):
964986
p = self.Process(target=self.f, args=(cond, sleeping, woken))
965987
p.daemon = True
966988
p.start()
989+
self.addCleanup(p.join)
967990

968991
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
969992
t.daemon = True
970993
t.start()
994+
self.addCleanup(t.join)
971995

972996
# wait for them to all sleep
973997
for i in range(6):
@@ -1143,6 +1167,7 @@ def test_event(self):
11431167
p.daemon = True
11441168
p.start()
11451169
self.assertEqual(wait(), True)
1170+
p.join()
11461171

11471172
#
11481173
# Tests for Barrier - adapted from tests in test/lock_tests.py
@@ -1318,6 +1343,7 @@ def test_wait_return(self):
13181343
self.run_threads(self._test_wait_return_f, (self.barrier, queue))
13191344
results = [queue.get() for i in range(self.N)]
13201345
self.assertEqual(results.count(0), 1)
1346+
close_queue(queue)
13211347

13221348
@classmethod
13231349
def _test_action_f(cls, barrier, results):
@@ -1488,6 +1514,7 @@ def test_thousand(self):
14881514
p = self.Process(target=self._test_thousand_f,
14891515
args=(self.barrier, passes, child_conn, lock))
14901516
p.start()
1517+
self.addCleanup(p.join)
14911518

14921519
for i in range(passes):
14931520
for j in range(self.N):
@@ -2971,6 +2998,8 @@ def test_access(self):
29712998
w.close()
29722999
self.assertEqual(conn.recv(), 'foobar'*2)
29733000

3001+
p.join()
3002+
29743003
#
29753004
#
29763005
#
@@ -3296,16 +3325,16 @@ def test_level(self):
32963325

32973326
logger.setLevel(LEVEL1)
32983327
p = self.Process(target=self._test_level, args=(writer,))
3299-
p.daemon = True
33003328
p.start()
33013329
self.assertEqual(LEVEL1, reader.recv())
3330+
p.join()
33023331

33033332
logger.setLevel(logging.NOTSET)
33043333
root_logger.setLevel(LEVEL2)
33053334
p = self.Process(target=self._test_level, args=(writer,))
3306-
p.daemon = True
33073335
p.start()
33083336
self.assertEqual(LEVEL2, reader.recv())
3337+
p.join()
33093338

33103339
root_logger.setLevel(root_level)
33113340
logger.setLevel(level=LOG_LEVEL)
@@ -3459,7 +3488,7 @@ def _this_sub_process(q):
34593488
except pyqueue.Empty:
34603489
pass
34613490

3462-
def _test_process(q):
3491+
def _test_process():
34633492
queue = multiprocessing.Queue()
34643493
subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
34653494
subProc.daemon = True
@@ -3499,8 +3528,7 @@ def flush(self):
34993528
class TestStdinBadfiledescriptor(unittest.TestCase):
35003529

35013530
def test_queue_in_process(self):
3502-
queue = multiprocessing.Queue()
3503-
proc = multiprocessing.Process(target=_test_process, args=(queue,))
3531+
proc = multiprocessing.Process(target=_test_process)
35043532
proc.start()
35053533
proc.join()
35063534

@@ -4108,7 +4136,32 @@ def test_empty(self):
41084136
# Mixins
41094137
#
41104138

4111-
class ProcessesMixin(object):
4139+
class BaseMixin(object):
4140+
@classmethod
4141+
def setUpClass(cls):
4142+
cls.dangling = (multiprocessing.process._dangling.copy(),
4143+
threading._dangling.copy())
4144+
4145+
@classmethod
4146+
def tearDownClass(cls):
4147+
# bpo-26762: Some multiprocessing objects like Pool create reference
4148+
# cycles. Trigger a garbage collection to break these cycles.
4149+
test.support.gc_collect()
4150+
4151+
processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
4152+
if processes:
4153+
print('Warning -- Dangling processes: %s' % processes,
4154+
file=sys.stderr)
4155+
processes = None
4156+
4157+
threads = set(threading._dangling) - set(cls.dangling[1])
4158+
if threads:
4159+
print('Warning -- Dangling threads: %s' % threads,
4160+
file=sys.stderr)
4161+
threads = None
4162+
4163+
4164+
class ProcessesMixin(BaseMixin):
41124165
TYPE = 'processes'
41134166
Process = multiprocessing.Process
41144167
connection = multiprocessing.connection
@@ -4131,7 +4184,7 @@ class ProcessesMixin(object):
41314184
RawArray = staticmethod(multiprocessing.RawArray)
41324185

41334186

4134-
class ManagerMixin(object):
4187+
class ManagerMixin(BaseMixin):
41354188
TYPE = 'manager'
41364189
Process = multiprocessing.Process
41374190
Queue = property(operator.attrgetter('manager.Queue'))
@@ -4155,30 +4208,43 @@ def Pool(cls, *args, **kwds):
41554208

41564209
@classmethod
41574210
def setUpClass(cls):
4211+
super().setUpClass()
41584212
cls.manager = multiprocessing.Manager()
41594213

41604214
@classmethod
41614215
def tearDownClass(cls):
41624216
# only the manager process should be returned by active_children()
41634217
# but this can take a bit on slow machines, so wait a few seconds
41644218
# if there are other children too (see #17395)
4219+
start_time = time.monotonic()
41654220
t = 0.01
4166-
while len(multiprocessing.active_children()) > 1 and t < 5:
4221+
while len(multiprocessing.active_children()) > 1:
41674222
time.sleep(t)
41684223
t *= 2
4224+
dt = time.monotonic() - start_time
4225+
if dt >= 5.0:
4226+
print("Warning -- multiprocessing.Manager still has %s active "
4227+
"children after %s seconds"
4228+
% (multiprocessing.active_children(), dt),
4229+
file=sys.stderr)
4230+
break
4231+
41694232
gc.collect() # do garbage collection
41704233
if cls.manager._number_of_objects() != 0:
41714234
# This is not really an error since some tests do not
41724235
# ensure that all processes which hold a reference to a
41734236
# managed object have been joined.
4174-
print('Shared objects which still exist at manager shutdown:')
4237+
print('Warning -- Shared objects which still exist at manager '
4238+
'shutdown:')
41754239
print(cls.manager._debug_info())
41764240
cls.manager.shutdown()
41774241
cls.manager.join()
41784242
cls.manager = None
41794243

4244+
super().tearDownClass()
4245+
41804246

4181-
class ThreadsMixin(object):
4247+
class ThreadsMixin(BaseMixin):
41824248
TYPE = 'threads'
41834249
Process = multiprocessing.dummy.Process
41844250
connection = multiprocessing.dummy.connection
@@ -4255,18 +4321,33 @@ def setUpModule():
42554321
multiprocessing.get_logger().setLevel(LOG_LEVEL)
42564322

42574323
def tearDownModule():
4324+
need_sleep = False
4325+
4326+
# bpo-26762: Some multiprocessing objects like Pool create reference
4327+
# cycles. Trigger a garbage collection to break these cycles.
4328+
test.support.gc_collect()
4329+
42584330
multiprocessing.set_start_method(old_start_method[0], force=True)
42594331
# pause a bit so we don't get warning about dangling threads/processes
4260-
time.sleep(0.5)
4332+
processes = set(multiprocessing.process._dangling) - set(dangling[0])
4333+
if processes:
4334+
need_sleep = True
4335+
print('Warning -- Dangling processes: %s' % processes,
4336+
file=sys.stderr)
4337+
processes = None
4338+
4339+
threads = set(threading._dangling) - set(dangling[1])
4340+
if threads:
4341+
need_sleep = True
4342+
print('Warning -- Dangling threads: %s' % threads,
4343+
file=sys.stderr)
4344+
threads = None
4345+
4346+
# Sleep 500 ms to give time to child processes to complete.
4347+
if need_sleep:
4348+
time.sleep(0.5)
42614349
multiprocessing.process._cleanup()
4262-
gc.collect()
4263-
tmp = set(multiprocessing.process._dangling) - set(dangling[0])
4264-
if tmp:
4265-
print('Dangling processes:', tmp, file=sys.stderr)
4266-
del tmp
4267-
tmp = set(threading._dangling) - set(dangling[1])
4268-
if tmp:
4269-
print('Dangling threads:', tmp, file=sys.stderr)
4350+
test.support.gc_collect()
42704351

42714352
remote_globs['setUpModule'] = setUpModule
42724353
remote_globs['tearDownModule'] = tearDownModule

0 commit comments

Comments
 (0)