Skip to content

Backports JuliaLang#55643 for internal testing. #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions src/gc-stacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ JL_DLLEXPORT void *jl_malloc_stack(size_t *bufsz, jl_task_t *owner) JL_NOTSAFEPO
return stk;
}

void sweep_stack_pools(void)
void sweep_stack_pool_loop(void) JL_NOTSAFEPOINT
{
// Stack sweeping algorithm:
// // deallocate stacks if we have too many sitting around unused
Expand All @@ -203,27 +203,43 @@ void sweep_stack_pools(void)
// bufsz = t->bufsz
// if (stkbuf)
// push(free_stacks[sz], stkbuf)
assert(gc_n_threads);
for (int i = 0; i < gc_n_threads; i++) {
jl_atomic_fetch_add(&gc_n_threads_sweeping_stacks, 1);
while (1) {
int i = jl_atomic_fetch_add_relaxed(&gc_ptls_sweep_idx, -1);
if (i < 0)
break;
jl_ptls_t ptls2 = gc_all_tls_states[i];

if (ptls2 == NULL)
continue;
assert(gc_n_threads);
// free half of stacks that remain unused since last sweep
for (int p = 0; p < JL_N_STACK_POOLS; p++) {
small_arraylist_t *al = &ptls2->gc_tls.heap.free_stacks[p];
size_t n_to_free;
if (al->len > MIN_STACK_MAPPINGS_PER_POOL) {
n_to_free = al->len / 2;
if (n_to_free > (al->len - MIN_STACK_MAPPINGS_PER_POOL))
n_to_free = al->len - MIN_STACK_MAPPINGS_PER_POOL;
}
else {
n_to_free = 0;
}
for (int n = 0; n < n_to_free; n++) {
void *stk = small_arraylist_pop(al);
free_stack(stk, pool_sizes[p]);
if (i == jl_atomic_load_relaxed(&gc_stack_free_idx)) {
for (int p = 0; p < JL_N_STACK_POOLS; p++) {
small_arraylist_t *al = &ptls2->gc_tls.heap.free_stacks[p];
size_t n_to_free;
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
n_to_free = al->len; // not alive yet or dead, so it does not need these anymore
}
else if (al->len > MIN_STACK_MAPPINGS_PER_POOL) {
n_to_free = al->len / 2;
if (n_to_free > (al->len - MIN_STACK_MAPPINGS_PER_POOL))
n_to_free = al->len - MIN_STACK_MAPPINGS_PER_POOL;
}
else {
n_to_free = 0;
}
for (int n = 0; n < n_to_free; n++) {
void *stk = small_arraylist_pop(al);
free_stack(stk, pool_sizes[p]);
}
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
small_arraylist_free(al);
}
}
}
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
small_arraylist_free(ptls2->gc_tls.heap.free_stacks);
}

small_arraylist_t *live_tasks = &ptls2->gc_tls.heap.live_tasks;
size_t n = 0;
Expand Down Expand Up @@ -264,6 +280,7 @@ void sweep_stack_pools(void)
}
live_tasks->len -= ndel;
}
jl_atomic_fetch_add(&gc_n_threads_sweeping_stacks, -1);
}

JL_DLLEXPORT jl_array_t *jl_live_tasks(void)
Expand Down
1 change: 1 addition & 0 deletions src/gc-tls.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ typedef struct {
jl_gc_markqueue_t mark_queue;
jl_gc_mark_cache_t gc_cache;
_Atomic(size_t) gc_sweeps_requested;
_Atomic(size_t) gc_stack_sweep_requested;
arraylist_t sweep_objs;
} jl_gc_tls_states_t;

Expand Down
54 changes: 49 additions & 5 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ int jl_n_sweepthreads;
// Number of threads currently running the GC mark-loop
_Atomic(int) gc_n_threads_marking;
// Number of threads sweeping
_Atomic(int) gc_n_threads_sweeping;
_Atomic(int) gc_n_threads_sweeping_pools;
// Number of threads sweeping stacks
_Atomic(int) gc_n_threads_sweeping_stacks;
// Temporary for the `ptls->gc_tls.page_metadata_allocd` used during parallel sweeping (padded to avoid false sharing)
_Atomic(jl_gc_padded_page_stack_t *) gc_allocd_scratch;
// `tid` of mutator thread that triggered GC
_Atomic(int) gc_master_tid;
// counter for sharing work when sweeping stacks
_Atomic(int) gc_ptls_sweep_idx;
// counter for round robin of giving back stack pages to the OS
_Atomic(int) gc_stack_free_idx;
// `tid` of first GC thread
int gc_first_tid;
// Mutex/cond used to synchronize wakeup of GC threads on parallel marking
Expand Down Expand Up @@ -1525,6 +1531,44 @@ static void gc_sweep_other(jl_ptls_t ptls, int sweep_full) JL_NOTSAFEPOINT
gc_num.total_sweep_free_mallocd_memory_time += t_free_mallocd_memory_end - t_free_mallocd_memory_start;
}

// wake up all threads to sweep the stacks
void gc_sweep_wake_all_stacks(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
uv_mutex_lock(&gc_threads_lock);
int first = gc_first_parallel_collector_thread_id();
int last = gc_last_parallel_collector_thread_id();
for (int i = first; i <= last; i++) {
jl_ptls_t ptls2 = gc_all_tls_states[i];
gc_check_ptls_of_parallel_collector_thread(ptls2);
jl_atomic_fetch_add(&ptls2->gc_tls.gc_stack_sweep_requested, 1);
}
uv_cond_broadcast(&gc_threads_cond);
uv_mutex_unlock(&gc_threads_lock);
return;
}

void gc_sweep_wait_for_all_stacks(void) JL_NOTSAFEPOINT
{
while ((jl_atomic_load_acquire(&gc_ptls_sweep_idx) >= 0 ) || jl_atomic_load_acquire(&gc_n_threads_sweeping_stacks) != 0) {
jl_cpu_pause();
}
}

void sweep_stack_pools(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// initialize ptls index for parallel sweeping of stack pools
assert(gc_n_threads);
int stack_free_idx = jl_atomic_load_relaxed(&gc_stack_free_idx);
if (stack_free_idx + 1 == gc_n_threads)
jl_atomic_store_relaxed(&gc_stack_free_idx, 0);
else
jl_atomic_store_relaxed(&gc_stack_free_idx, stack_free_idx + 1);
jl_atomic_store_release(&gc_ptls_sweep_idx, gc_n_threads - 1); // idx == gc_n_threads = release stacks to the OS so it's serial
gc_sweep_wake_all_stacks(ptls);
sweep_stack_pool_loop();
gc_sweep_wait_for_all_stacks();
}

static void gc_pool_sync_nfree(jl_gc_pagemeta_t *pg, jl_taggedvalue_t *last) JL_NOTSAFEPOINT
{
assert(pg->fl_begin_offset != UINT16_MAX);
Expand Down Expand Up @@ -1639,15 +1683,15 @@ void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_
void gc_sweep_wait_for_all(void)
{
jl_atomic_store(&gc_allocd_scratch, NULL);
while (jl_atomic_load_relaxed(&gc_n_threads_sweeping) != 0) {
while (jl_atomic_load_acquire(&gc_n_threads_sweeping_pools) != 0) {
jl_cpu_pause();
}
}

// sweep all pools
void gc_sweep_pool_parallel(jl_ptls_t ptls)
{
jl_atomic_fetch_add(&gc_n_threads_sweeping, 1);
jl_atomic_fetch_add(&gc_n_threads_sweeping_pools, 1);
jl_gc_padded_page_stack_t *allocd_scratch = jl_atomic_load(&gc_allocd_scratch);
if (allocd_scratch != NULL) {
gc_page_profiler_serializer_t serializer = gc_page_serializer_create();
Expand Down Expand Up @@ -1692,7 +1736,7 @@ void gc_sweep_pool_parallel(jl_ptls_t ptls)
}
gc_page_serializer_destroy(&serializer);
}
jl_atomic_fetch_add(&gc_n_threads_sweeping, -1);
jl_atomic_fetch_add(&gc_n_threads_sweeping_pools, -1);
}

// free all pages (i.e. through `madvise` on Linux) that were lazily freed
Expand Down Expand Up @@ -3604,7 +3648,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
#endif
current_sweep_full = sweep_full;
sweep_weak_refs();
sweep_stack_pools();
sweep_stack_pools(ptls);
gc_sweep_foreign_objs();
gc_sweep_other(ptls, sweep_full);
gc_scrub();
Expand Down
7 changes: 5 additions & 2 deletions src/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,10 @@ extern uv_mutex_t gc_threads_lock;
extern uv_cond_t gc_threads_cond;
extern uv_sem_t gc_sweep_assists_needed;
extern _Atomic(int) gc_n_threads_marking;
extern _Atomic(int) gc_n_threads_sweeping;
extern _Atomic(int) gc_n_threads_sweeping_pools;
extern _Atomic(int) gc_n_threads_sweeping_stacks;
extern _Atomic(int) gc_ptls_sweep_idx;
extern _Atomic(int) gc_stack_free_idx;
extern uv_barrier_t thread_init_done;
void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq);
void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t *fl_parent, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT;
Expand All @@ -574,7 +577,7 @@ void gc_mark_loop_serial(jl_ptls_t ptls);
void gc_mark_loop_parallel(jl_ptls_t ptls, int master);
void gc_sweep_pool_parallel(jl_ptls_t ptls);
void gc_free_pages(void);
void sweep_stack_pools(void);
void sweep_stack_pool_loop(void) JL_NOTSAFEPOINT;
void jl_gc_debug_init(void);

// GC pages
Expand Down
12 changes: 11 additions & 1 deletion src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ static inline int may_sweep(jl_ptls_t ptls) JL_NOTSAFEPOINT
return (jl_atomic_load(&ptls->gc_tls.gc_sweeps_requested) > 0);
}

static inline int may_sweep_stack(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
return (jl_atomic_load(&ptls->gc_tls.gc_stack_sweep_requested) > 0);
}

// parallel gc thread function
void jl_parallel_gc_threadfun(void *arg)
{
Expand All @@ -139,12 +144,17 @@ void jl_parallel_gc_threadfun(void *arg)

while (1) {
uv_mutex_lock(&gc_threads_lock);
while (!may_mark() && !may_sweep(ptls)) {
while (!may_mark() && !may_sweep(ptls) && !may_sweep_stack(ptls)) {
uv_cond_wait(&gc_threads_cond, &gc_threads_lock);
}
uv_mutex_unlock(&gc_threads_lock);
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
gc_mark_loop_parallel(ptls, 0);
if (may_sweep_stack(ptls)) {
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
sweep_stack_pool_loop();
jl_atomic_fetch_add(&ptls->gc_tls.gc_stack_sweep_requested, -1);
}
if (may_sweep(ptls)) {
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
gc_sweep_pool_parallel(ptls);
Expand Down