Skip to content

add try/catch around scheduler to reset sleep state #54721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -2376,8 +2376,8 @@ extern int had_exception;
size_t __excstack_state = jl_excstack_state(__eh_ct); \
jl_enter_handler(__eh_ct, &__eh); \
__eh_ct->eh = &__eh; \
if (1)
/* TRY BLOCK; */
for (i__try=1; i__try; i__try=0)

#define JL_CATCH \
if (!had_exception) \
jl_eh_restore_state_noexcept(__eh_ct, &__eh); \
Expand Down
261 changes: 136 additions & 125 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -456,145 +456,156 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
continue;
}
task = get_next_task(trypoptask, q); // note: this should not yield
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
volatile int isrunning = 1;
JL_TRY {
task = get_next_task(trypoptask, q); // note: this should not yield
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
continue; // jump to JL_CATCH
}
if (task)
return task;
continue;
}
if (task) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (task) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
continue; // jump to JL_CATCH
}
return task;
}

// IO is always permitted, but outside a threaded region, only
// thread 0 will process messages.
// Inside a threaded region, any thread can listen for IO messages,
// and one thread should win this race and watch the event loop,
// but we bias away from idle threads getting parked here.
//
// The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
// - After decrementing _threadedregion, the thread is required to
// call jl_wakeup_thread(0), that will kick out any thread who is
// already there, and then eventually thread 0 will get here.
// - Inside a _threadedregion, there must exist at least one
// thread that has a happens-before relationship on the libuv lock
// before reaching this decision point in the code who will see
// the lock as unlocked and thus must win this race here.
int uvlock = 0;
if (jl_atomic_load_relaxed(&_threadedregion)) {
uvlock = jl_mutex_trylock(&jl_uv_mutex);
}
else if (ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
uvlock = 1;
JL_UV_LOCK();
}
else {
// Since we might have started some IO work, we might need
// to ensure tid = 0 will go watch that new event source.
// If trylock would have succeeded, that may have been our
// responsibility, so need to make sure thread 0 will take care
// of us.
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
jl_wakeup_thread(0);
}
if (uvlock) {
int enter_eventloop = may_sleep(ptls);
int active = 0;
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
// if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
enter_eventloop = 0;
if (enter_eventloop) {
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
jl_gc_safepoint();
// IO is always permitted, but outside a threaded region, only
// thread 0 will process messages.
// Inside a threaded region, any thread can listen for IO messages,
// and one thread should win this race and watch the event loop,
// but we bias away from idle threads getting parked here.
//
// The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
// - After decrementing _threadedregion, the thread is required to
// call jl_wakeup_thread(0), that will kick out any thread who is
// already there, and then eventually thread 0 will get here.
// - Inside a _threadedregion, there must exist at least one
// thread that has a happens-before relationship on the libuv lock
// before reaching this decision point in the code who will see
// the lock as unlocked and thus must win this race here.
int uvlock = 0;
if (jl_atomic_load_relaxed(&_threadedregion)) {
uvlock = jl_mutex_trylock(&jl_uv_mutex);
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do.
// Otherwise we got a spurious wakeup since some other thread
// that just wanted to steal libuv from us. We will just go
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (active || !may_sleep(ptls)) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue;
else if (ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
uvlock = 1;
JL_UV_LOCK();
}
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive, just spin-looping if necessary
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
else {
// Since we might have started some IO work, we might need
// to ensure tid = 0 will go watch that new event source.
// If trylock would have succeeded, that may have been our
// responsibility, so need to make sure thread 0 will take care
// of us.
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
jl_wakeup_thread(0);
}
if (uvlock) {
int enter_eventloop = may_sleep(ptls);
int active = 0;
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
// if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
enter_eventloop = 0;
if (enter_eventloop) {
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
jl_gc_safepoint();
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do.
// Otherwise we got a spurious wakeup since some other thread
// that just wanted to steal libuv from us. We will just go
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (active || !may_sleep(ptls)) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue; // jump to JL_CATCH
}
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive, just spin-looping if necessary
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue; // jump to JL_CATCH
}
start_cycles = 0;
continue;
}
}

// any thread which wants us running again will have to observe
// sleep_check_state==sleeping and increment nrunning for us
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
assert(wasrunning);
if (wasrunning == 1) {
// This was the last running thread, and there is no thread with !may_sleep
// so make sure io_loop_tid is notified to check wait_empty
// TODO: this also might be a good time to check again that
// libuv's queue is truly empty, instead of during delete_thread
int16_t tid2 = 0;
if (ptls->tid != tid2) {
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid2];
uv_mutex_lock(&ptls2->sleep_lock);
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
// any thread which wants us running again will have to observe
// sleep_check_state==sleeping and increment nrunning for us
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
assert(wasrunning);
isrunning = 0;
if (wasrunning == 1) {
// This was the last running thread, and there is no thread with !may_sleep
// so make sure io_loop_tid is notified to check wait_empty
// TODO: this also might be a good time to check again that
// libuv's queue is truly empty, instead of during delete_thread
int16_t tid2 = 0;
if (ptls->tid != tid2) {
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid2];
uv_mutex_lock(&ptls2->sleep_lock);
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
}
}
}

// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
if (ptls->tid == 0) {
task = wait_empty;
if (task && jl_atomic_load_relaxed(&nrunning) == 0) {
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(!wasrunning);
wasrunning = !set_not_sleeping(ptls);
assert(!wasrunning);
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (!ptls->finalizers_inhibited)
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
break;
// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
if (ptls->tid == 0) {
task = wait_empty;
if (task && jl_atomic_load_relaxed(&nrunning) == 0) {
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(!wasrunning);
wasrunning = !set_not_sleeping(ptls);
assert(!wasrunning);
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (!ptls->finalizers_inhibited)
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
break;
}
task = NULL;
}
task = NULL;
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&nrunning));
start_cycles = 0;
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
if (task) {
assert(task == wait_empty);
wait_empty = NULL;
continue;
}
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&nrunning));
start_cycles = 0;
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
if (task) {
assert(task == wait_empty);
wait_empty = NULL;
return task;
JL_CATCH {
// probably SIGINT, but possibly a user mistake in trypoptask
if (!isrunning)
jl_atomic_fetch_add_relaxed(&nrunning, 1);
set_not_sleeping(ptls);
jl_rethrow();
}
if (task)
return task;
}
else {
// maybe check the kernel for new messages too
Expand Down