Skip to content

No system lib multiprocessing #13493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ See docs/process.md for more on how version tagging works.

Current Trunk
-------------
- Removed use of Python multiprocessing library because of stability issues. Added
new environment variable EM_PYTHON_MULTIPROCESSING=1 that can be enabled
to revert back to using Python multiprocessing. (#13493)
- Binaryen now always inlines single-use functions. This should reduce code size
and improve performance (#13744).

Expand Down
13 changes: 12 additions & 1 deletion embuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import argparse
import logging
import sys
import time

from tools import shared
from tools import system_libs
Expand Down Expand Up @@ -112,6 +113,9 @@ def build_port(port_name):

def main():
global force

all_build_start_time = time.time()

parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=get_help())
Expand Down Expand Up @@ -166,6 +170,7 @@ def main():
print('Building targets: %s' % ' '.join(tasks))
for what in tasks:
logger.info('building and verifying ' + what)
start_time = time.time()
if what in SYSTEM_LIBRARIES:
library = SYSTEM_LIBRARIES[what]
if force:
Expand Down Expand Up @@ -260,7 +265,13 @@ def main():
logger.error('unfamiliar build target: ' + what)
return 1

logger.info('...success')
time_taken = time.time() - start_time
logger.info('...success. Took %s(%.2fs)' % (('%02d:%02d mins ' % (time_taken // 60, time_taken % 60) if time_taken >= 60 else ''), time_taken))

if len(tasks) > 1:
all_build_time_taken = time.time() - all_build_start_time
logger.info('Built %d targets in %s(%.2fs)' % (len(tasks), ('%02d:%02d mins ' % (all_build_time_taken // 60, all_build_time_taken % 60) if all_build_time_taken >= 60 else ''), all_build_time_taken))

return 0


Expand Down
2 changes: 1 addition & 1 deletion tests/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ def get_library(self, name, generated_libs, configure=['sh', './configure'],
configure_args=[], make=['make'], make_args=None,
env_init={}, cache_name_extra='', native=False):
if make_args is None:
make_args = ['-j', str(building.get_num_cores())]
make_args = ['-j', str(shared.get_num_cores())]

build_dir = self.get_build_dir()
output_dir = self.get_dir()
Expand Down
215 changes: 62 additions & 153 deletions tools/building.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@
# University of Illinois/NCSA Open Source License. Both these licenses can be
# found in the LICENSE file.

import atexit
import json
import logging
import multiprocessing
import os
import re
import shlex
import shutil
import subprocess
import sys
import tempfile
from subprocess import STDOUT, PIPE
from subprocess import PIPE

from . import diagnostics
from . import response_file
Expand All @@ -36,7 +34,6 @@
logger = logging.getLogger('building')

# Building
multiprocessing_pool = None
binaryen_checked = False

EXPECTED_BINARYEN_VERSION = 100
Expand Down Expand Up @@ -77,55 +74,46 @@ def warn_if_duplicate_entries(archive_contents, archive_filename):
diagnostics.warning('emcc', msg)


# This function creates a temporary directory specified by the 'dir' field in
# the returned dictionary. Caller is responsible for cleaning up those files
# after done.
def extract_archive_contents(archive_file):
lines = run_process([LLVM_AR, 't', archive_file], stdout=PIPE).stdout.splitlines()
# ignore empty lines
contents = [l for l in lines if len(l)]
if len(contents) == 0:
logger.debug('Archive %s appears to be empty (recommendation: link an .so instead of .a)' % archive_file)
return {
'returncode': 0,
'dir': None,
'files': []
}

# `ar` files can only contains filenames. Just to be sure, verify that each
# file has only as filename component and is not absolute
for f in contents:
assert not os.path.dirname(f)
assert not os.path.isabs(f)

warn_if_duplicate_entries(contents, archive_file)

# create temp dir
temp_dir = tempfile.mkdtemp('_archive_contents', 'emscripten_temp_')

# extract file in temp dir
proc = run_process([LLVM_AR, 'xo', archive_file], stdout=PIPE, stderr=STDOUT, cwd=temp_dir)
abs_contents = [os.path.join(temp_dir, c) for c in contents]
# Extracts the given list of archive files and outputs their contents
def extract_archive_contents(archive_files):
archive_results = shared.run_multiple_processes([[LLVM_AR, 't', a] for a in archive_files], pipe_stdout=True)

# check that all files were created
missing_contents = [x for x in abs_contents if not os.path.exists(x)]
if missing_contents:
exit_with_error('llvm-ar failed to extract file(s) ' + str(missing_contents) + ' from archive file ' + f + '! Error:' + str(proc.stdout))

return {
'returncode': proc.returncode,
'dir': temp_dir,
'files': abs_contents
}
unpack_temp_dir = tempfile.mkdtemp('_archive_contents', 'emscripten_temp_')

def clean_at_exit():
try_delete(unpack_temp_dir)
shared.atexit.register(clean_at_exit)

def g_multiprocessing_initializer(*args):
for item in args:
(key, value) = item.split('=', 1)
if key == 'EMCC_POOL_CWD':
os.chdir(value)
else:
os.environ[key] = value
archive_contents = []

for i in range(len(archive_results)):
a = archive_results[i]
contents = [l for l in a.splitlines() if len(l)]
if len(contents) == 0:
logger.debug('Archive %s appears to be empty (recommendation: link an .so instead of .a)' % a)

# `ar` files can only contains filenames. Just to be sure, verify that each
# file has only as filename component and is not absolute
for f in contents:
assert not os.path.dirname(f)
assert not os.path.isabs(f)

warn_if_duplicate_entries(contents, a)

archive_contents += [{
'archive_name': archive_files[i],
'o_files': [os.path.join(unpack_temp_dir, c) for c in contents]
}]

shared.run_multiple_processes([[LLVM_AR, 'xo', a] for a in archive_files], cwd=unpack_temp_dir)

# check that all files were created
for a in archive_contents:
missing_contents = [x for x in a['o_files'] if not os.path.exists(x)]
if missing_contents:
exit_with_error('llvm-ar failed to extract file(s) ' + str(missing_contents) + ' from archive file ' + f + '!')

return archive_contents


def unique_ordered(values):
Expand All @@ -152,74 +140,6 @@ def clear():
_is_ar_cache.clear()


def get_num_cores():
return int(os.environ.get('EMCC_CORES', multiprocessing.cpu_count()))


# Multiprocessing pools are very slow to build up and tear down, and having
# several pools throughout the application has a problem of overallocating
# child processes. Therefore maintain a single centralized pool that is shared
# between all pooled task invocations.
def get_multiprocessing_pool():
global multiprocessing_pool
if not multiprocessing_pool:
cores = get_num_cores()

# If running with one core only, create a mock instance of a pool that does not
# actually spawn any new subprocesses. Very useful for internal debugging.
if cores == 1:
class FakeMultiprocessor(object):
def map(self, func, tasks, *args, **kwargs):
results = []
for t in tasks:
results += [func(t)]
return results

def map_async(self, func, tasks, *args, **kwargs):
class Result:
def __init__(self, func, tasks):
self.func = func
self.tasks = tasks

def get(self, timeout):
results = []
for t in tasks:
results += [func(t)]
return results

return Result(func, tasks)

multiprocessing_pool = FakeMultiprocessor()
else:
child_env = [
# Multiprocessing pool children must have their current working
# directory set to a safe path that is guaranteed not to die in
# between of executing commands, or otherwise the pool children will
# have trouble spawning subprocesses of their own.
'EMCC_POOL_CWD=' + path_from_root(),
# Multiprocessing pool children can't spawn their own linear number of
# children, that could cause a quadratic amount of spawned processes.
'EMCC_CORES=1'
]
multiprocessing_pool = multiprocessing.Pool(processes=cores, initializer=g_multiprocessing_initializer, initargs=child_env)

def close_multiprocessing_pool():
global multiprocessing_pool
try:
# Shut down the pool explicitly, because leaving that for Python to do at process shutdown is buggy and can generate
# noisy "WindowsError: [Error 5] Access is denied" spam which is not fatal.
multiprocessing_pool.terminate()
multiprocessing_pool.join()
multiprocessing_pool = None
except OSError as e:
# Mute the "WindowsError: [Error 5] Access is denied" errors, raise all others through
if not (sys.platform.startswith('win') and isinstance(e, WindowsError) and e.winerror == 5):
raise
atexit.register(close_multiprocessing_pool)

return multiprocessing_pool


# .. but for Popen, we cannot have doublequotes, so provide functionality to
# remove them when needed.
def remove_quotes(arg):
Expand Down Expand Up @@ -291,11 +211,19 @@ def llvm_nm_multiple(files):
# We can issue multiple files in a single llvm-nm calls, but only if those
# files are all .o or .bc files. Because of llvm-nm output format, we cannot
# llvm-nm multiple .a files in one call, but those must be individually checked.
if len(llvm_nm_files) > 1:
llvm_nm_files = [f for f in files if f.endswith('.o') or f.endswith('.bc')]

if len(llvm_nm_files) > 0:
cmd = [LLVM_NM] + llvm_nm_files
o_files = [f for f in llvm_nm_files if os.path.splitext(f)[1].lower() in ['.o', '.obj', '.bc']]
a_files = [f for f in llvm_nm_files if f not in o_files]

# Issue parallel calls for .a files
if len(a_files) > 0:
results = shared.run_multiple_processes([[LLVM_NM, a] for a in a_files], pipe_stdout=True, check=False)
for i in range(len(results)):
nm_cache[a_files[i]] = parse_symbols(results[i])

# Issue a single batch call for multiple .o files
if len(o_files) > 0:
cmd = [LLVM_NM] + o_files
cmd = get_command_with_possible_response_file(cmd)
results = run_process(cmd, stdout=PIPE, stderr=PIPE, check=False)

Expand All @@ -319,11 +247,11 @@ def llvm_nm_multiple(files):
# so loop over the report to extract the results
# for each individual file.

filename = llvm_nm_files[0]
filename = o_files[0]

# When we dispatched more than one file, we must manually parse
# the file result delimiters (like shown structured above)
if len(llvm_nm_files) > 1:
if len(o_files) > 1:
file_start = 0
i = 0

Expand All @@ -340,18 +268,11 @@ def llvm_nm_multiple(files):

nm_cache[filename] = parse_symbols(results[file_start:])
else:
# We only dispatched a single file, we can just parse that directly
# to the output.
# We only dispatched a single file, so can parse all of the result directly
# to that file.
nm_cache[filename] = parse_symbols(results)

# Any .a files that have multiple .o files will have hard time parsing. Scan those
# sequentially to confirm. TODO: Move this to use run_multiple_processes()
# when available.
for f in files:
if f not in nm_cache:
nm_cache[f] = llvm_nm(f)

return [nm_cache[f] for f in files]
return [nm_cache[f] if f in nm_cache else ObjectFileInfo(1, '') for f in files]


def llvm_nm(file):
Expand All @@ -373,25 +294,13 @@ def read_link_inputs(files):
object_names.append(absolute_path_f)

# Archives contain objects, so process all archives first in parallel to obtain the object files in them.
pool = get_multiprocessing_pool()
object_names_in_archives = pool.map(extract_archive_contents, archive_names)

def clean_temporary_archive_contents_directory(directory):
def clean_at_exit():
try_delete(directory)
if directory:
atexit.register(clean_at_exit)

for n in range(len(archive_names)):
if object_names_in_archives[n]['returncode'] != 0:
raise Exception('llvm-ar failed on archive ' + archive_names[n] + '!')
ar_contents[archive_names[n]] = object_names_in_archives[n]['files']
clean_temporary_archive_contents_directory(object_names_in_archives[n]['dir'])

for o in object_names_in_archives:
for f in o['files']:
if f not in nm_cache:
object_names.append(f)
archive_contents = extract_archive_contents(archive_names)

for a in archive_contents:
ar_contents[os.path.abspath(a['archive_name'])] = a['o_files']
for o in a['o_files']:
if o not in nm_cache:
object_names.append(o)

# Next, extract symbols from all object files (either standalone or inside archives we just extracted)
# The results are not used here directly, but populated to llvm-nm cache structure.
Expand Down
Loading