diff --git a/NEWS.md b/NEWS.md index 408dce72dd322..2d3fd96ecd858 100644 --- a/NEWS.md +++ b/NEWS.md @@ -36,6 +36,8 @@ Compiler/Runtime improvements `libjulia-codegen`. It is loaded by default, so normal usage should see no changes. In deployments that do not need the compiler (e.g. system images where all needed code is precompiled), this library (and its LLVM dependency) can simply be excluded ([#41936]). +* The environment variable `JULIA_THREAD_SCHEDULER` can be set to `workstealing` to use the + alternative work-stealing scheduler or threaded tasks ([#43366]). Command-line option changes --------------------------- diff --git a/doc/src/manual/environment-variables.md b/doc/src/manual/environment-variables.md index 08d00f3a8cae4..e413308f10ce2 100644 --- a/doc/src/manual/environment-variables.md +++ b/doc/src/manual/environment-variables.md @@ -235,6 +235,16 @@ If set to anything besides `0`, then Julia's thread policy is consistent with running on a dedicated machine: the master thread is on proc 0, and threads are affinitized. Otherwise, Julia lets the operating system handle thread policy. +### `JULIA_THREAD_SCHEDULER` + +If set to `workstealing` (case insensitive), Julia uses the work-stealing +scheduler for scheduling thread-enabled tasks (e.g., tasks created by +`Threads.@spawn`). This is only an advisory mechanism and the interface may be +changed or removed in the future. + +!!! compat "Julia 1.8" + `JULIA_THREAD_SCHEDULER` requires at least Julia 1.8. + ## REPL formatting Environment variables that determine how REPL output should be formatted at the diff --git a/src/gc.c b/src/gc.c index 577ac5839eb87..b13a4138b6262 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2787,7 +2787,7 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception); } -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp); +extern void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *); extern jl_value_t *cmpswap_names JL_GLOBALLY_ROOTED; // mark the initial root set diff --git a/src/partr.c b/src/partr.c index 048a841158153..b687ca7c8e0ea 100644 --- a/src/partr.c +++ b/src/partr.c @@ -14,7 +14,6 @@ extern "C" { #endif - // thread sleep state // default to DEFAULT_THREAD_SLEEP_THRESHOLD; set via $JULIA_THREAD_SLEEP_THRESHOLD @@ -57,6 +56,12 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT; +// partr dynamic dispatch +void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *); +static int (*partr_enqueue_task)(jl_task_t *, int16_t); +static jl_task_t *(*partr_dequeue_task)(void); +static int (*partr_check_empty)(void); + // multiq // --- @@ -83,20 +88,6 @@ static int32_t heap_p; static uint64_t cong_unbias; -static inline void multiq_init(void) -{ - heap_p = heap_c * jl_n_threads; - heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t)); - for (int32_t i = 0; i < heap_p; ++i) { - uv_mutex_init(&heaps[i].lock); - heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*)); - jl_atomic_store_relaxed(&heaps[i].ntasks, 0); - jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX); - } - unbias_cong(heap_p, &cong_unbias); -} - - static inline void sift_up(taskheap_t *heap, int32_t idx) { if (idx > 0) { @@ -208,7 +199,7 @@ static inline jl_task_t *multiq_deletemin(void) } -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) +void multiq_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) { int32_t i, j; for (i = 0; i < heap_p; ++i) @@ -228,6 +219,170 @@ static int multiq_check_empty(void) } +static inline void multiq_init(void) +{ + heap_p = heap_c * jl_n_threads; + heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t)); + for (int32_t i = 0; i < heap_p; ++i) { + uv_mutex_init(&heaps[i].lock); + heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*)); + jl_atomic_store_relaxed(&heaps[i].ntasks, 0); + jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX); + } + unbias_cong(heap_p, &cong_unbias); + jl_gc_mark_enqueued_tasks = &multiq_gc_mark_enqueued_tasks; + partr_enqueue_task = &multiq_insert; + partr_dequeue_task = &multiq_deletemin; + partr_check_empty = &multiq_check_empty; +} + + + +// work-stealing deque + +// The work-stealing deque by Chase and Lev (2005). Le et al. (2013) provides +// C11-complienet memory ordering. +// +// Ref: +// * Chase and Lev (2005) https://doi.org/10.1145/1073970.1073974 +// * Le et al. (2013) https://doi.org/10.1145/2442516.2442524 +// +// TODO: Dynamic buffer resizing. +typedef struct _wsdeque_t { + union { + struct { + jl_task_t **tasks; + _Atomic(int64_t) top; + _Atomic(int64_t) bottom; + }; + uint8_t padding[JL_CACHE_BYTE_ALIGNMENT]; + }; +} wsdeque_t; + +static wsdeque_t *wsdeques; + + +static int wsdeque_push(jl_task_t *task, int16_t priority_ignord) +{ + int16_t tid = jl_threadid(); + int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom); + int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top); + int64_t size = b - t; + if (size >= tasks_per_heap - 1) // full + return -1; + jl_atomic_store_relaxed( + (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap], task); + if (jl_atomic_load_acquire(&task->tid) != -1) + // If the `task` still hasn't finished the context switch at this point, abort push + // and put it in the sticky queue. + return -1; + jl_fence_release(); + jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); + return 0; +} + + +static jl_task_t *wsdeque_pop(void) +{ + int16_t tid = jl_threadid(); + int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom) - 1; + jl_atomic_store_relaxed(&wsdeques[tid].bottom, b); + jl_fence(); + int64_t t = jl_atomic_load_relaxed(&wsdeques[tid].top); + int64_t size = b - t; + if (size < 0) { + jl_atomic_store_relaxed(&wsdeques[tid].bottom, t); + return NULL; + } + jl_task_t *task = jl_atomic_load_relaxed( + (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap]); + if (size > 0) + return task; + if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) + task = NULL; + jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); + return task; +} + + +static jl_task_t *wsdeque_steal(int16_t tid) +{ + int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top); + jl_fence(); + int64_t b = jl_atomic_load_acquire(&wsdeques[tid].bottom); + int64_t size = b - t; + if (size <= 0) + return NULL; + jl_task_t *task = jl_atomic_load_relaxed( + (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]); + if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) + return NULL; + return task; +} + + +static jl_task_t *wsdeque_pop_or_steal(void) +{ + jl_ptls_t ptls = jl_current_task->ptls; + jl_task_t *task = wsdeque_pop(); + if (task || jl_n_threads < 2) + return task; + + int ntries = jl_n_threads; + for (int i = 0; i < ntries; ++i) { + uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed); + if (tid >= ptls->tid) + tid++; + task = wsdeque_steal(tid); + if (task) + return task; + } + return NULL; +} + + +void wsdeque_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) +{ + for (int i = 0; i < jl_n_threads; ++i) { + int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top); + int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom); + for (int j = t; j < b; ++j) + jl_gc_mark_queue_obj_explicit( + gc_cache, sp, (jl_value_t *)wsdeques[i].tasks[j % tasks_per_heap]); + } +} + + +static int wsdeque_check_empty(void) +{ + for (int32_t i = 0; i < jl_n_threads; ++i) { + int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top); + int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom); + int64_t size = b - t; + if (size > 0) + return 0; + } + return 1; +} + + +static void wsdeque_init(void) +{ + // Manually align the pointer since `jl_malloc_aligned` is not available here. + wsdeques = (wsdeque_t *)(((uintptr_t)calloc(1, sizeof(wsdeque_t) * jl_n_threads + + JL_CACHE_BYTE_ALIGNMENT - 1) + + JL_CACHE_BYTE_ALIGNMENT - 1) & + (-JL_CACHE_BYTE_ALIGNMENT)); + for (int32_t i = 0; i < jl_n_threads; ++i) { + wsdeques[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t *)); + } + unbias_cong(jl_n_threads, &cong_unbias); + jl_gc_mark_enqueued_tasks = &wsdeque_gc_mark_enqueued_tasks; + partr_enqueue_task = &wsdeque_push; + partr_dequeue_task = &wsdeque_pop_or_steal; + partr_check_empty = &wsdeque_check_empty; +} + // parallel task runtime // --- @@ -236,8 +391,12 @@ static int multiq_check_empty(void) // (used only by the main thread) void jl_init_threadinginfra(void) { - /* initialize the synchronization trees pool and the multiqueue */ - multiq_init(); + /* choose and initialize the scheduler */ + char *sch = getenv("JULIA_THREAD_SCHEDULER"); + if (sch && !strncasecmp(sch, "workstealing", 12)) + wsdeque_init(); + else + multiq_init(); sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD; char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME); @@ -292,7 +451,7 @@ void jl_threadfun(void *arg) // enqueue the specified task for execution JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task) { - if (multiq_insert(task, task->prio) == -1) + if (partr_enqueue_task(task, task->prio) == -1) return 1; return 0; } @@ -419,7 +578,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) jl_set_task_tid(task, self); return task; } - return multiq_deletemin(); + return partr_dequeue_task(); } static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT @@ -444,7 +603,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) // quick, race-y check to see if there seems to be any stuff in there jl_cpu_pause(); - if (!multiq_check_empty()) { + if (!partr_check_empty()) { start_cycles = 0; continue; } @@ -453,7 +612,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) jl_ptls_t ptls = ct->ptls; if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) { jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock - if (!multiq_check_empty()) { + if (!partr_check_empty()) { if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us continue; diff --git a/test/threads.jl b/test/threads.jl index 4464c2a2c8859..66410f270c60d 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -24,9 +24,12 @@ let lk = ReentrantLock() end let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threads_exec.jl` - for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races + for test_nthreads in (1, 2, 4, 4), # run once to try single-threaded mode, then try a couple times to trigger bad races + sch in ("depthfirst", "workstealing") + new_env = copy(ENV) new_env["JULIA_NUM_THREADS"] = string(test_nthreads) + new_env["JULIA_THREAD_SCHEDULER"] = sch run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) end end