Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Compiler/Runtime improvements
`libjulia-codegen`. It is loaded by default, so normal usage should see no changes.
In deployments that do not need the compiler (e.g. system images where all needed code
is precompiled), this library (and its LLVM dependency) can simply be excluded ([#41936]).
* The environment variable `JULIA_THREAD_SCHEDULER` can be set to `workstealing` to use the
alternative work-stealing scheduler or threaded tasks ([#43366]).

Command-line option changes
---------------------------
Expand Down
10 changes: 10 additions & 0 deletions doc/src/manual/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ If set to anything besides `0`, then Julia's thread policy is consistent with
running on a dedicated machine: the master thread is on proc 0, and threads are
affinitized. Otherwise, Julia lets the operating system handle thread policy.

### `JULIA_THREAD_SCHEDULER`

If set to `workstealing` (case insensitive), Julia uses the work-stealing
scheduler for scheduling thread-enabled tasks (e.g., tasks created by
`Threads.@spawn`). This is only an advisory mechanism and the interface may be
changed or removed in the future.

!!! compat "Julia 1.8"
`JULIA_THREAD_SCHEDULER` requires at least Julia 1.8.

## REPL formatting

Environment variables that determine how REPL output should be formatted at the
Expand Down
2 changes: 1 addition & 1 deletion src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2787,7 +2787,7 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp
gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception);
}

void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp);
extern void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *);
extern jl_value_t *cmpswap_names JL_GLOBALLY_ROOTED;

// mark the initial root set
Expand Down
203 changes: 181 additions & 22 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
extern "C" {
#endif


// thread sleep state

// default to DEFAULT_THREAD_SLEEP_THRESHOLD; set via $JULIA_THREAD_SLEEP_THRESHOLD
Expand Down Expand Up @@ -57,6 +56,12 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT
extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache,
jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT;

// partr dynamic dispatch
void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *);
static int (*partr_enqueue_task)(jl_task_t *, int16_t);
static jl_task_t *(*partr_dequeue_task)(void);
static int (*partr_check_empty)(void);

// multiq
// ---

Expand All @@ -83,20 +88,6 @@ static int32_t heap_p;
static uint64_t cong_unbias;


static inline void multiq_init(void)
{
heap_p = heap_c * jl_n_threads;
heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t));
for (int32_t i = 0; i < heap_p; ++i) {
uv_mutex_init(&heaps[i].lock);
heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*));
jl_atomic_store_relaxed(&heaps[i].ntasks, 0);
jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX);
}
unbias_cong(heap_p, &cong_unbias);
}


static inline void sift_up(taskheap_t *heap, int32_t idx)
{
if (idx > 0) {
Expand Down Expand Up @@ -208,7 +199,7 @@ static inline jl_task_t *multiq_deletemin(void)
}


void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
void multiq_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
int32_t i, j;
for (i = 0; i < heap_p; ++i)
Expand All @@ -228,6 +219,170 @@ static int multiq_check_empty(void)
}


static inline void multiq_init(void)
{
heap_p = heap_c * jl_n_threads;
heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t));
for (int32_t i = 0; i < heap_p; ++i) {
uv_mutex_init(&heaps[i].lock);
heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*));
jl_atomic_store_relaxed(&heaps[i].ntasks, 0);
jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX);
}
unbias_cong(heap_p, &cong_unbias);
jl_gc_mark_enqueued_tasks = &multiq_gc_mark_enqueued_tasks;
partr_enqueue_task = &multiq_insert;
partr_dequeue_task = &multiq_deletemin;
partr_check_empty = &multiq_check_empty;
}



// work-stealing deque

// The work-stealing deque by Chase and Lev (2005). Le et al. (2013) provides
// C11-complienet memory ordering.
//
// Ref:
// * Chase and Lev (2005) https://doi.org/10.1145/1073970.1073974
// * Le et al. (2013) https://doi.org/10.1145/2442516.2442524
//
// TODO: Dynamic buffer resizing.
typedef struct _wsdeque_t {
union {
struct {
jl_task_t **tasks;
_Atomic(int64_t) top;
_Atomic(int64_t) bottom;
};
uint8_t padding[JL_CACHE_BYTE_ALIGNMENT];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has no alignment whatsoever

};
} wsdeque_t;

static wsdeque_t *wsdeques;


static int wsdeque_push(jl_task_t *task, int16_t priority_ignord)
{
int16_t tid = jl_threadid();
int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom);
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is being load-acquired? There are no stores in wsdeque_steal

Suggested change
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top);
int64_t t = jl_atomic_load(&wsdeques[tid].top); // ensures that `tasks` is successfully stolen before we try to reuse the slot below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to digest this and I realized that I still don't get it. As we discussed, there is a CAS on .top in wsdeque_steal

julia/src/partr.c

Lines 316 to 318 in 0e50d01

jl_task_t *task = jl_atomic_load_relaxed(
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]);
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1))

The loaded task only matters in the success path where we have a seq_cst write that supersets release write. So, we have:

  • a sequenced-before edge from the load .tasks[t % tasks_per_heap] to the store on the .top via CAS in the next line (as they both happens in the same thread)
  • a synchronizes-with edge from the store on the .top to the load of .top in wsdeque_push you quoted above

So they establish happens-before and it looks like we know that the task is loaded by the time we load the .top? IIRC, your concern was that loading .tasks[t % tasks_per_heap] and storing .top can be reordered, but doesn't release forbids that?

A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store.
https://en.cppreference.com/w/cpp/atomic/memory_order

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. But edges are not transitive, unless all of them are seq-cst, IIUC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yes, looks like we have a proper release/acquire pair here on top to ensure the ops on tasks are okay.

Copy link
Member Author

@tkf tkf Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.... Lahav et al.'s definition of happens-before hb is the transitive closure of the union of sequenced-before sb and synchronizes-with sw

image

R^+ is the transitive closure of R:

image

But I don't know if that's a mismatch between their definition and the C11 semantics (though they use the same notions for discussing C11 so probably not) and/or some difference to the actual definition in the standard.

int64_t size = b - t;
if (size >= tasks_per_heap - 1) // full
return -1;
jl_atomic_store_relaxed(
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap], task);
if (jl_atomic_load_acquire(&task->tid) != -1)
// If the `task` still hasn't finished the context switch at this point, abort push
// and put it in the sticky queue.
return -1;
Comment on lines +275 to +278
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (jl_atomic_load_acquire(&task->tid) != -1)
// If the `task` still hasn't finished the context switch at this point, abort push
// and put it in the sticky queue.
return -1;

The task->tid is fairly likely to be our own tid here. I think we can handle this when we pop the value (putting it into the sticky queue when we pop the value, rather than when we pushed it)

jl_fence_release();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only store was the relaxed to tasks, so maybe just make that a release store?

jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1);
return 0;
}


static jl_task_t *wsdeque_pop(void)
{
int16_t tid = jl_threadid();
int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom) - 1;
jl_atomic_store_relaxed(&wsdeques[tid].bottom, b);
jl_fence();
int64_t t = jl_atomic_load_relaxed(&wsdeques[tid].top);
int64_t size = b - t;
if (size < 0) {
jl_atomic_store_relaxed(&wsdeques[tid].bottom, t);
return NULL;
}
jl_task_t *task = jl_atomic_load_relaxed(
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap]);
if (size > 0)
return task;
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might need a fence, since it is trying to order the relaxed load above with the relaxed store below it?

task = NULL;
jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1);
return task;
}


static jl_task_t *wsdeque_steal(int16_t tid)
{
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top);
jl_fence();
int64_t b = jl_atomic_load_acquire(&wsdeques[tid].bottom);
int64_t size = b - t;
if (size <= 0)
return NULL;
jl_task_t *task = jl_atomic_load_relaxed(
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]);
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly also need an explicit fence? I am not certain if seq-cst cmpswp is sufficient to enforce an order on relaxed ops nearby.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised this doesn't try to steal size*fraction items, instead of a constant (1), but that does not need to be changed for this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's Stealing Multi-Queue that does something similar to what you said https://arxiv.org/abs/2109.00657

But stealing like this works quite well for fork-join use cases in which you typically only enqueue/"materialize" O(log n) tasks in the queue while the task itself can "reveal" O(n) child tasks upon execution (n = e.g., length of array for a parallel map).

return NULL;
return task;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return task;
if (jl_set_task_tid(task))
return task;
wsdequeue_push(task, 1); // FIXME: the sticky queue would be a better place for this, but we need to handle that inside Julia instead of handling these conditions here
return NULL;

}


static jl_task_t *wsdeque_pop_or_steal(void)
{
jl_ptls_t ptls = jl_current_task->ptls;
jl_task_t *task = wsdeque_pop();
if (task || jl_n_threads < 2)
return task;

int ntries = jl_n_threads;
for (int i = 0; i < ntries; ++i) {
uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed);
if (tid >= ptls->tid)
tid++;
task = wsdeque_steal(tid);
if (task)
return task;
}
return NULL;
}


void wsdeque_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
for (int i = 0; i < jl_n_threads; ++i) {
int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top);
int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom);
for (int j = t; j < b; ++j)
jl_gc_mark_queue_obj_explicit(
gc_cache, sp, (jl_value_t *)wsdeques[i].tasks[j % tasks_per_heap]);
}
}


static int wsdeque_check_empty(void)
{
for (int32_t i = 0; i < jl_n_threads; ++i) {
int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top);
int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom);
int64_t size = b - t;
if (size > 0)
return 0;
}
return 1;
}


static void wsdeque_init(void)
{
// Manually align the pointer since `jl_malloc_aligned` is not available here.
wsdeques = (wsdeque_t *)(((uintptr_t)calloc(1, sizeof(wsdeque_t) * jl_n_threads +
JL_CACHE_BYTE_ALIGNMENT - 1) +
JL_CACHE_BYTE_ALIGNMENT - 1) &
(-JL_CACHE_BYTE_ALIGNMENT));
Comment on lines +372 to +375
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
wsdeques = (wsdeque_t *)(((uintptr_t)calloc(1, sizeof(wsdeque_t) * jl_n_threads +
JL_CACHE_BYTE_ALIGNMENT - 1) +
JL_CACHE_BYTE_ALIGNMENT - 1) &
(-JL_CACHE_BYTE_ALIGNMENT));
wsdeques = (wsdeque_t *)malloc_cache_align(sizeof(wsdeque_t) * jl_n_threads);

for (int32_t i = 0; i < jl_n_threads; ++i) {
wsdeques[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t *));
}
unbias_cong(jl_n_threads, &cong_unbias);
jl_gc_mark_enqueued_tasks = &wsdeque_gc_mark_enqueued_tasks;
partr_enqueue_task = &wsdeque_push;
partr_dequeue_task = &wsdeque_pop_or_steal;
partr_check_empty = &wsdeque_check_empty;
}


// parallel task runtime
// ---
Expand All @@ -236,8 +391,12 @@ static int multiq_check_empty(void)
// (used only by the main thread)
void jl_init_threadinginfra(void)
{
/* initialize the synchronization trees pool and the multiqueue */
multiq_init();
/* choose and initialize the scheduler */
char *sch = getenv("JULIA_THREAD_SCHEDULER");
if (sch && !strncasecmp(sch, "workstealing", 12))
wsdeque_init();
else
multiq_init();

sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD;
char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME);
Expand Down Expand Up @@ -292,7 +451,7 @@ void jl_threadfun(void *arg)
// enqueue the specified task for execution
JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task)
{
if (multiq_insert(task, task->prio) == -1)
if (partr_enqueue_task(task, task->prio) == -1)
return 1;
return 0;
}
Expand Down Expand Up @@ -419,7 +578,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q)
jl_set_task_tid(task, self);
return task;
}
return multiq_deletemin();
return partr_dequeue_task();
}

static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
Expand All @@ -444,7 +603,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)

// quick, race-y check to see if there seems to be any stuff in there
jl_cpu_pause();
if (!multiq_check_empty()) {
if (!partr_check_empty()) {
start_cycles = 0;
continue;
}
Expand All @@ -453,7 +612,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
jl_ptls_t ptls = ct->ptls;
if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) {
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
if (!multiq_check_empty()) {
if (!partr_check_empty()) {
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
continue;
Expand Down
5 changes: 4 additions & 1 deletion test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ let lk = ReentrantLock()
end

let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threads_exec.jl`
for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races
for test_nthreads in (1, 2, 4, 4), # run once to try single-threaded mode, then try a couple times to trigger bad races
sch in ("depthfirst", "workstealing")

new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = string(test_nthreads)
new_env["JULIA_THREAD_SCHEDULER"] = sch
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end
end
Expand Down