diff --git a/src/library_pthread.js b/src/library_pthread.js index 0fe387cc36fd7..f27e5a039094e 100644 --- a/src/library_pthread.js +++ b/src/library_pthread.js @@ -9,7 +9,6 @@ var LibraryPThread = { $PThread__deps: ['$registerPthreadPtr', '$ERRNO_CODES', 'emscripten_futex_wake', '$killThread', '$cancelThread', '$cleanupThread', - '_main_thread_futex_wait_address' #if USE_ASAN || USE_LSAN , '$withBuiltinMalloc' #endif @@ -28,6 +27,9 @@ var LibraryPThread = { runningWorkers: [], // Points to a pthread_t structure in the Emscripten main heap, allocated on demand if/when first needed. // mainThreadBlock: undefined, + // Stores the memory address that the main thread is waiting on, if any. If + // the main thread is waiting, we wake it up before waking up any workers. + // mainThreadFutex: undefined, initMainThreadBlock: function() { #if ASSERTIONS assert(!ENVIRONMENT_IS_PTHREAD); @@ -69,7 +71,7 @@ var LibraryPThread = { Atomics.store(HEAPU32, (PThread.mainThreadBlock + {{{ C_STRUCTS.pthread.tid }}} ) >> 2, PThread.mainThreadBlock); // Main thread ID. Atomics.store(HEAPU32, (PThread.mainThreadBlock + {{{ C_STRUCTS.pthread.pid }}} ) >> 2, {{{ PROCINFO.pid }}}); // Process ID. - __main_thread_futex_wait_address = _malloc(4); + PThread.initShared(); #if PTHREADS_PROFILING PThread.createProfilerBlock(PThread.mainThreadBlock); @@ -87,6 +89,7 @@ var LibraryPThread = { _emscripten_register_main_browser_thread_id(PThread.mainThreadBlock); }, initWorker: function() { + PThread.initShared(); #if EMBIND // Embind must initialize itself on all threads, as it generates support JS. Module['___embind_register_native_and_builtin_types'](); @@ -106,6 +109,12 @@ var LibraryPThread = { PThread['setThreadStatus'] = PThread.setThreadStatus; PThread['threadCancel'] = PThread.threadCancel; PThread['threadExit'] = PThread.threadExit; +#endif + }, + initShared: function() { + PThread.mainThreadFutex = Module['_main_thread_futex']; +#if ASSERTIONS + assert(PThread.mainThreadFutex > 0); #endif }, // Maps pthread_t to pthread info objects @@ -1115,14 +1124,12 @@ var LibraryPThread = { return 0; }, - // Stores the memory address that the main thread is waiting on, if any. - _main_thread_futex_wait_address: '0', - // Returns 0 on success, or one of the values -ETIMEDOUT, -EWOULDBLOCK or -EINVAL on error. - emscripten_futex_wait__deps: ['_main_thread_futex_wait_address', 'emscripten_main_thread_process_queued_calls'], + emscripten_futex_wait__deps: ['emscripten_main_thread_process_queued_calls'], emscripten_futex_wait: function(addr, val, timeout) { if (addr <= 0 || addr > HEAP8.length || addr&3 != 0) return -{{{ cDefine('EINVAL') }}}; - if (ENVIRONMENT_IS_NODE || ENVIRONMENT_IS_WORKER) { + // We can do a normal blocking wait anywhere but on the main browser thread. + if (!ENVIRONMENT_IS_WEB) { #if PTHREADS_PROFILING PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}}); #endif @@ -1135,31 +1142,114 @@ var LibraryPThread = { if (ret === 'ok') return 0; throw 'Atomics.wait returned an unexpected value ' + ret; } else { - // Atomics.wait is not available in the main browser thread, so simulate it via busy spinning. - var loadedVal = Atomics.load(HEAP32, addr >> 2); - if (val != loadedVal) return -{{{ cDefine('EWOULDBLOCK') }}}; + // First, check if the value is correct for us to wait on. + if (Atomics.load(HEAP32, addr >> 2) != val) { + return -{{{ cDefine('EWOULDBLOCK') }}}; + } + // Atomics.wait is not available in the main browser thread, so simulate it via busy spinning. var tNow = performance.now(); var tEnd = tNow + timeout; #if PTHREADS_PROFILING PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}}); #endif + // Register globally which address the main thread is simulating to be + // waiting on. When zero, the main thread is not waiting on anything, and on + // nonzero, the contents of the address pointed by PThread.mainThreadFutex + // tell which address the main thread is simulating its wait on. + // We need to be careful of recursion here: If we wait on a futex, and + // then call _emscripten_main_thread_process_queued_calls() below, that + // will call code that takes the proxying mutex - which can once more + // reach this code in a nested call. To avoid interference between the + // two (there is just a single mainThreadFutex at a time), unmark + // ourselves before calling the potentially-recursive call. See below for + // how we handle the case of our futex being notified during the time in + // between when we are not set as the value of mainThreadFutex. +#if ASSERTIONS + assert(PThread.mainThreadFutex > 0); +#endif + var lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, addr); +#if ASSERTIONS + // We must not have already been waiting. + assert(lastAddr == 0); +#endif - // Register globally which address the main thread is simulating to be waiting on. When zero, main thread is not waiting on anything, - // and on nonzero, the contents of address pointed by __main_thread_futex_wait_address tell which address the main thread is simulating its wait on. - Atomics.store(HEAP32, __main_thread_futex_wait_address >> 2, addr); - var ourWaitAddress = addr; // We may recursively re-enter this function while processing queued calls, in which case we'll do a spurious wakeup of the older wait operation. - while (addr == ourWaitAddress) { + while (1) { + // Check for a timeout. tNow = performance.now(); if (tNow > tEnd) { #if PTHREADS_PROFILING PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}}); +#endif + // We timed out, so stop marking ourselves as waiting. + lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, 0); +#if ASSERTIONS + // The current value must have been our address which we set, or + // in a race it was set to 0 which means another thread just allowed + // us to run, but (tragically) that happened just a bit too late. + assert(lastAddr == addr || lastAddr == 0); #endif return -{{{ cDefine('ETIMEDOUT') }}}; } - _emscripten_main_thread_process_queued_calls(); // We are performing a blocking loop here, so must pump any pthreads if they want to perform operations that are proxied. - addr = Atomics.load(HEAP32, __main_thread_futex_wait_address >> 2); // Look for a worker thread waking us up. + // We are performing a blocking loop here, so we must handle proxied + // events from pthreads, to avoid deadlocks. + // Note that we have to do so carefully, as we may take a lock while + // doing so, which can recurse into this function; stop marking + // ourselves as waiting while we do so. + lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, 0); +#if ASSERTIONS + assert(lastAddr == addr || lastAddr == 0); +#endif + if (lastAddr == 0) { + // We were told to stop waiting, so stop. + break; + } + _emscripten_main_thread_process_queued_calls(); + + // Check the value, as if we were starting the futex all over again. + // This handles the following case: + // + // * wait on futex A + // * recurse into emscripten_main_thread_process_queued_calls(), + // which waits on futex B. that sets the mainThreadFutex address to + // futex B, and there is no longer any mention of futex A. + // * a worker is done with futex A. it checks mainThreadFutex but does + // not see A, so it does nothing special for the main thread. + // * a worker is done with futex B. it flips mainThreadMutex from B + // to 0, ending the wait on futex B. + // * we return to the wait on futex A. mainThreadFutex is 0, but that + // is because of futex B being done - we can't tell from + // mainThreadFutex whether A is done or not. therefore, check the + // memory value of the futex. + // + // That case motivates the design here. Given that, checking the memory + // address is also necessary for other reasons: we unset and re-set our + // address in mainThreadFutex around calls to + // emscripten_main_thread_process_queued_calls(), and a worker could + // attempt to wake us up right before/after such times. + // + // Note that checking the memory value of the futex is valid to do: we + // could easily have been delayed (relative to the worker holding on + // to futex A), which means we could be starting all of our work at the + // later time when there is no need to block. The only "odd" thing is + // that we may have caused side effects in that "delay" time. But the + // only side effects we can have are to call + // emscripten_main_thread_process_queued_calls(). That is always ok to + // do on the main thread (it's why it is ok for us to call it in the + // middle of this function, and elsewhere). So if we check the value + // here and return, it's the same is if what happened on the main thread + // was the same as calling emscripten_main_thread_process_queued_calls() + // a few times times before calling emscripten_futex_wait(). + if (Atomics.load(HEAP32, addr >> 2) != val) { + return -{{{ cDefine('EWOULDBLOCK') }}}; + } + + // Mark us as waiting once more, and continue the loop. + lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, addr); +#if ASSERTIONS + assert(lastAddr == 0); +#endif } #if PTHREADS_PROFILING PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}}); @@ -1170,7 +1260,6 @@ var LibraryPThread = { // Returns the number of threads (>= 0) woken up, or the value -EINVAL on error. // Pass count == INT_MAX to wake up all threads. - emscripten_futex_wake__deps: ['_main_thread_futex_wait_address'], emscripten_futex_wake: function(addr, count) { if (addr <= 0 || addr > HEAP8.length || addr&3 != 0 || count < 0) return -{{{ cDefine('EINVAL') }}}; if (count == 0) return 0; @@ -1181,10 +1270,20 @@ var LibraryPThread = { // See if main thread is waiting on this address? If so, wake it up by resetting its wake location to zero. // Note that this is not a fair procedure, since we always wake main thread first before any workers, so // this scheme does not adhere to real queue-based waiting. - var mainThreadWaitAddress = Atomics.load(HEAP32, __main_thread_futex_wait_address >> 2); +#if ASSERTIONS + assert(PThread.mainThreadFutex > 0); +#endif + var mainThreadWaitAddress = Atomics.load(HEAP32, PThread.mainThreadFutex >> 2); var mainThreadWoken = 0; if (mainThreadWaitAddress == addr) { - var loadedAddr = Atomics.compareExchange(HEAP32, __main_thread_futex_wait_address >> 2, mainThreadWaitAddress, 0); +#if ASSERTIONS + // We only use mainThreadFutex on the main browser thread, where we + // cannot block while we wait. Therefore we should only see it set from + // other threads, and not on the main thread itself. In other words, the + // main thread must never try to wake itself up! + assert(!ENVIRONMENT_IS_WEB); +#endif + var loadedAddr = Atomics.compareExchange(HEAP32, PThread.mainThreadFutex >> 2, mainThreadWaitAddress, 0); if (loadedAddr == mainThreadWaitAddress) { --count; mainThreadWoken = 1; diff --git a/src/worker.js b/src/worker.js index 77507729ec40e..9b82149b1b091 100644 --- a/src/worker.js +++ b/src/worker.js @@ -244,7 +244,7 @@ this.onmessage = function(e) { } } catch(ex) { err('worker.js onmessage() captured an uncaught exception: ' + ex); - if (ex.stack) err(ex.stack); + if (ex && ex.stack) err(ex.stack); throw ex; } }; diff --git a/system/lib/pthread/library_pthread.c b/system/lib/pthread/library_pthread.c index 6af1da45d6379..ea0006d3c141c 100644 --- a/system/lib/pthread/library_pthread.c +++ b/system/lib/pthread/library_pthread.c @@ -913,6 +913,11 @@ int llvm_atomic_load_add_i32_p0i32(int* ptr, int delta) { return emscripten_atomic_add_u32(ptr, delta); } +// Stores the memory address that the main thread is waiting on, if any. If +// the main thread is waiting, we wake it up before waking up any workers. +EMSCRIPTEN_KEEPALIVE +void* main_thread_futex; + typedef struct main_args { int argc; char** argv; diff --git a/tests/pthread/test_pthread_proxy_hammer.cpp b/tests/pthread/test_pthread_proxy_hammer.cpp new file mode 100644 index 0000000000000..996006ba32cde --- /dev/null +++ b/tests/pthread/test_pthread_proxy_hammer.cpp @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include +#include + +class random_device +{ + int __f_; +public: + // constructors + explicit random_device(); + ~random_device(); + + // generating functions + unsigned operator()(); +}; + +random_device::random_device() +{ + __f_ = open("/dev/urandom", O_RDONLY); + if (__f_ < 0) abort(); +} + +random_device::~random_device() +{ + close(__f_); +} + +unsigned +random_device::operator()() +{ + unsigned r; + size_t n = sizeof(r); + char* p = reinterpret_cast(&r); + while (n > 0) + { + ssize_t s = read(__f_, p, 1); + if (s == 0) abort(); + if (s == -1) + { + if (errno != EINTR) abort(); + continue; + } + n -= static_cast(s); + p += static_cast(s); + } + return r; +} + +int main() { + int total = 0; + for (int i = 0; i < ITERATIONS; i++) { + // printf causes proxying + printf("%d %d\n", i, total); + for (int j = 0; j < 1024; j++) { + // allocation uses a mutex + auto* rd = new random_device(); + // reading data causes proxying + for (int k = 0; k < 4; k++) { + total += (*rd)(); + } + // make sure the optimizer doesn't remove the allocation + EM_ASM({ out("iter") }, rd); + delete rd; + } + } + printf("done: %d", total); +#ifdef REPORT_RESULT + REPORT_RESULT(0); +#endif + return 0; +} + diff --git a/tests/pthread/test_pthread_proxying_in_futex_wait.cpp b/tests/pthread/test_pthread_proxying_in_futex_wait.cpp index 4a2f485de84f2..3eb11a274a325 100644 --- a/tests/pthread/test_pthread_proxying_in_futex_wait.cpp +++ b/tests/pthread/test_pthread_proxying_in_futex_wait.cpp @@ -3,6 +3,7 @@ // University of Illinois/NCSA Open Source License. Both these licenses can be // found in the LICENSE file. +#include #include #include #include @@ -47,9 +48,12 @@ int main() int rc = pthread_create(&thread, &attr, ThreadMain, 0); assert(rc == 0); rc = emscripten_futex_wait(&main_thread_wait_val, 1, 15 * 1000); - if (rc != 0) + // An rc of 0 means no error, and of EWOULDBLOCK means that the value is + // not the expected one, which can happen if the pthread manages to set it + // before we reach the futex_wait. + if (rc != 0 && rc != -EWOULDBLOCK) { - printf("ERROR! futex wait timed out!\n"); + printf("ERROR! futex wait errored %d!\n", rc); result = 2; #ifdef REPORT_RESULT REPORT_RESULT(result); diff --git a/tests/runner.py b/tests/runner.py index e0b6b622380e3..0b95220852d7b 100755 --- a/tests/runner.py +++ b/tests/runner.py @@ -1356,12 +1356,12 @@ def assert_out_queue_empty(self, who): self.harness_out_queue.get() raise Exception('excessive responses from %s' % who) - # @param tries_left: how many more times to try this test, if it fails. browser tests have - # many more causes of flakiness (in particular, they do not run - # synchronously, so we have a timeout, which can be hit if the VM - # we run on stalls temporarily), so we let each test try more than - # once by default - def run_browser(self, html_file, message, expectedResult=None, timeout=None, tries_left=1): + # @param extra_tries: how many more times to try this test, if it fails. browser tests have + # many more causes of flakiness (in particular, they do not run + # synchronously, so we have a timeout, which can be hit if the VM + # we run on stalls temporarily), so we let each test try more than + # once by default + def run_browser(self, html_file, message, expectedResult=None, timeout=None, extra_tries=1): if not has_browser(): return if BrowserCore.unresponsive_tests >= BrowserCore.MAX_UNRESPONSIVE_TESTS: @@ -1400,10 +1400,10 @@ def run_browser(self, html_file, message, expectedResult=None, timeout=None, tri try: self.assertIdenticalUrlEncoded(expectedResult, output) except Exception as e: - if tries_left > 0: + if extra_tries > 0: print('[test error (see below), automatically retrying]') print(e) - return self.run_browser(html_file, message, expectedResult, timeout, tries_left - 1) + return self.run_browser(html_file, message, expectedResult, timeout, extra_tries - 1) else: raise e finally: @@ -1541,7 +1541,7 @@ def btest(self, filename, expected=None, reference=None, force_c=False, reference_slack=0, manual_reference=False, post_build=None, args=[], outfile='test.html', message='.', also_proxied=False, url_suffix='', timeout=None, also_asmjs=False, - manually_trigger_reftest=False): + manually_trigger_reftest=False, extra_tries=1): assert expected or reference, 'a btest must either expect an output, or have a reference image' # if we are provided the source and not a path, use that filename_is_src = '\n' in filename @@ -1575,7 +1575,7 @@ def btest(self, filename, expected=None, reference=None, force_c=False, post_build() if not isinstance(expected, list): expected = [expected] - self.run_browser(outfile + url_suffix, message, ['/report_result?' + e for e in expected], timeout=timeout) + self.run_browser(outfile + url_suffix, message, ['/report_result?' + e for e in expected], timeout=timeout, extra_tries=extra_tries) # Tests can opt into being run under asmjs as well if 'WASM=0' not in args and (also_asmjs or self.also_asmjs): diff --git a/tests/test_browser.py b/tests/test_browser.py index 9f475ee81b9a3..03a3aca5f7b92 100644 --- a/tests/test_browser.py +++ b/tests/test_browser.py @@ -4918,3 +4918,22 @@ def test_zzz_zzz_4GB_fail(self): # 4GB. self.emcc_args += ['-O2', '-s', 'ALLOW_MEMORY_GROWTH', '-s', 'MAXIMUM_MEMORY=4GB', '-s', 'ABORTING_MALLOC=0'] self.do_run_in_out_file_test('tests', 'browser', 'test_4GB_fail.cpp', js_engines=[V8_ENGINE]) + + @unittest.skip("only run this manually, to test for race conditions") + @parameterized({ + 'normal': ([],), + 'assertions': (['-s', 'ASSERTIONS'],) + }) + @requires_threads + def test_manual_pthread_proxy_hammer(self, args): + # the specific symptom of the hang that was fixed is that the test hangs + # at some point, using 0% CPU. often that occured in 0-200 iterations, but + # you may want to adjust "ITERATIONS". + self.btest(path_from_root('tests', 'pthread', 'test_pthread_proxy_hammer.cpp'), + expected='0', + args=['-s', 'USE_PTHREADS=1', '-O2', '-s', 'PROXY_TO_PTHREAD', + '-DITERATIONS=1024', '-g1'] + args, + timeout=10000, + # don't run this with the default extra_tries value, as this is + # *meant* to notice something random, a race condition. + extra_tries=0)