Skip to content

bpo-26762: test_multiprocessing detects dangling per test case class #2841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 24, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4303,7 +4303,32 @@ def test_empty(self):
# Mixins
#

class ProcessesMixin(object):
class BaseMixin(object):
@classmethod
def setUpClass(cls):
cls.dangling = (multiprocessing.process._dangling.copy(),
threading._dangling.copy())

@classmethod
def tearDownClass(cls):
# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()

processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
if processes:
print('Warning -- Dangling processes: %s' % processes,
file=sys.stderr)
processes = None

threads = set(threading._dangling) - set(cls.dangling[1])
if threads:
print('Warning -- Dangling threads: %s' % threads,
file=sys.stderr)
threads = None


class ProcessesMixin(BaseMixin):
TYPE = 'processes'
Process = multiprocessing.Process
connection = multiprocessing.connection
Expand All @@ -4326,7 +4351,7 @@ class ProcessesMixin(object):
RawArray = staticmethod(multiprocessing.RawArray)


class ManagerMixin(object):
class ManagerMixin(BaseMixin):
TYPE = 'manager'
Process = multiprocessing.Process
Queue = property(operator.attrgetter('manager.Queue'))
Expand All @@ -4350,30 +4375,43 @@ def Pool(cls, *args, **kwds):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.manager = multiprocessing.Manager()

@classmethod
def tearDownClass(cls):
# only the manager process should be returned by active_children()
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1 and t < 5:
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
print("Warning -- multiprocessing.Manager still has %s active "
"children after %s seconds"
% (multiprocessing.active_children(), dt),
file=sys.stderr)
break

gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
# This is not really an error since some tests do not
# ensure that all processes which hold a reference to a
# managed object have been joined.
print('Shared objects which still exist at manager shutdown:')
print('Warning -- Shared objects which still exist at manager '
'shutdown:')
print(cls.manager._debug_info())
cls.manager.shutdown()
cls.manager.join()
cls.manager = None

super().tearDownClass()


class ThreadsMixin(object):
class ThreadsMixin(BaseMixin):
TYPE = 'threads'
Process = multiprocessing.dummy.Process
connection = multiprocessing.dummy.connection
Expand Down Expand Up @@ -4450,18 +4488,33 @@ def setUpModule():
multiprocessing.get_logger().setLevel(LOG_LEVEL)

def tearDownModule():
need_sleep = False

# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()

multiprocessing.set_start_method(old_start_method[0], force=True)
# pause a bit so we don't get warning about dangling threads/processes
time.sleep(0.5)
processes = set(multiprocessing.process._dangling) - set(dangling[0])
if processes:
need_sleep = True
print('Warning -- Dangling processes: %s' % processes,
file=sys.stderr)
processes = None

threads = set(threading._dangling) - set(dangling[1])
if threads:
need_sleep = True
print('Warning -- Dangling threads: %s' % threads,
file=sys.stderr)
threads = None

# Sleep 500 ms to give time to child processes to complete.
if need_sleep:
time.sleep(0.5)
multiprocessing.process._cleanup()
gc.collect()
tmp = set(multiprocessing.process._dangling) - set(dangling[0])
if tmp:
print('Dangling processes:', tmp, file=sys.stderr)
del tmp
tmp = set(threading._dangling) - set(dangling[1])
if tmp:
print('Dangling threads:', tmp, file=sys.stderr)
test.support.gc_collect()

remote_globs['setUpModule'] = setUpModule
remote_globs['tearDownModule'] = tearDownModule