From a8e65f6ebb2f3b012681c756ab5de8adf2254160 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Mon, 31 Mar 2025 14:24:10 -0600 Subject: [PATCH 1/6] Add thread-local governing delegation of hardware resources. --- include/qt_threadpool.h | 2 +- src/threadpool.c | 8 ++++++++ test/internal/threadpool.c | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/include/qt_threadpool.h b/include/qt_threadpool.h index 514853c19..1352af106 100644 --- a/include/qt_threadpool.h +++ b/include/qt_threadpool.h @@ -18,6 +18,6 @@ typedef enum { hw_pool_init_status hw_pool_init(uint32_t num_threads); void hw_pool_destroy(); -void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg); +void run_on_current_pool(qt_threadpool_func_type func, void *arg); #endif diff --git a/src/threadpool.c b/src/threadpool.c index 0af68d70d..9648ae4a1 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -75,6 +75,8 @@ typedef struct { #endif } pool_header; +_Thread_local pool_header *delegated_pool; + typedef struct { // 16 byte aligned to allow loading it in one atomic instruction // on architectures where that makes sense (most of them). @@ -306,6 +308,7 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { #ifdef QPOOL_USE_PTHREADS pthread_attr_destroy(&attr); #endif + delegated_pool = &hw_pool; return POOL_INIT_SUCCESS; cleanup_threads: if (i) { @@ -348,6 +351,7 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { } API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { + delegated_pool = NULL; uint32_t num_threads = atomic_load_explicit(&hw_pool.num_threads, memory_order_relaxed); char *buffer = atomic_load_explicit(&hw_pool.threads, memory_order_relaxed); @@ -400,6 +404,10 @@ pool_run_on_all(pool_header *pool, qt_threadpool_func_type func, void *arg) { suspend_main_while_working(pool); } +API_FUNC void run_on_current_pool(qt_threadpool_func_type func, void *arg) { + pool_run_on_all(delegated_pool, func, arg); +} + API_FUNC void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg) { pool_run_on_all(&hw_pool, func, arg); } diff --git a/test/internal/threadpool.c b/test/internal/threadpool.c index 4c9ac7e14..16aabec44 100644 --- a/test/internal/threadpool.c +++ b/test/internal/threadpool.c @@ -10,7 +10,7 @@ int main() { hw_pool_init(2); hw_pool_destroy(); hw_pool_init(2); - hw_pool_run_on_all(&on_thread_test, NULL); + run_on_current_pool(&on_thread_test, NULL); hw_pool_destroy(); printf("exited successfully\n"); fflush(stdout); From 2c8e42e5c8830cd493bfd665ef3723d7de727ac5 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Mon, 31 Mar 2025 15:38:24 -0600 Subject: [PATCH 2/6] Add todo items outlining how thread resource delegation can be implemented. --- src/threadpool.c | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/threadpool.c b/src/threadpool.c index 9648ae4a1..d8f6f90e0 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -382,6 +382,17 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { atomic_store_explicit(&hw_pool.num_threads, 0, memory_order_release); } +// TODO: If no thread pool is delegated, just run the function directly. + +// TODO: interface for querying about the delgated thread pool. +// At least let the user get the number of threads available. +// If no thread pool is delgated, just report a single thread. + +// TODO: have the main thread fill the role of thread 0. +// Instead of having the main thread wait/resume, swap in its thread-locals +// then have it run the per-thread function. +// This will avoid the suspend/resume OS overheads for at least that thread. + API_FUNC void pool_run_on_all(pool_header *pool, qt_threadpool_func_type func, void *arg) { uint32_t num_threads = @@ -412,3 +423,20 @@ API_FUNC void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg) { pool_run_on_all(&hw_pool, func, arg); } +API_FUNC void divide_pool(uint32_t num_groups, ...) { + // TODO: for each group: + // make a new threadpool header for the group + // wake the leader thread and have it: + // update its own thread-local thread pool and index + // re-wake and launch a new iteration loop on its delegated worker + // threads, having them: + // update their thread-local indices then launch their own iteration + // loops + // wait for the other threads in the group to finish (busy or futex?) + // restore its own thread-locals + // signal completion to main via the atomic on the outer pool + // have the main thread act as leader for the first group + // wait for the groups to finish (busy or futex?) + ; +} + From bb387bbc29636445404ff05450b803bc6d4e041f Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 1 Apr 2025 09:42:13 -0600 Subject: [PATCH 3/6] Have threads with no delegated pool execute run_on_all calls themselves. --- src/threadpool.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/threadpool.c b/src/threadpool.c index d8f6f90e0..5003fa11f 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -382,8 +382,6 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { atomic_store_explicit(&hw_pool.num_threads, 0, memory_order_release); } -// TODO: If no thread pool is delegated, just run the function directly. - // TODO: interface for querying about the delgated thread pool. // At least let the user get the number of threads available. // If no thread pool is delgated, just report a single thread. @@ -416,7 +414,14 @@ pool_run_on_all(pool_header *pool, qt_threadpool_func_type func, void *arg) { } API_FUNC void run_on_current_pool(qt_threadpool_func_type func, void *arg) { - pool_run_on_all(delegated_pool, func, arg); + if (delegated_pool) { + pool_run_on_all(delegated_pool, func, arg); + } else { + uint32_t outer_index = context_index; + context_index = 0; + func(arg); + context_index = outer_index; + } } API_FUNC void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg) { From 7c505b80310c5aa732ce1f0552374f7a0fff8161 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 1 Apr 2025 10:31:32 -0600 Subject: [PATCH 4/6] Add interface for querying numbers of delegated threads. --- include/qt_threadpool.h | 1 + src/threadpool.c | 8 +++++--- test/internal/threadpool.c | 4 ++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/include/qt_threadpool.h b/include/qt_threadpool.h index 1352af106..183eb8d2d 100644 --- a/include/qt_threadpool.h +++ b/include/qt_threadpool.h @@ -19,5 +19,6 @@ typedef enum { hw_pool_init_status hw_pool_init(uint32_t num_threads); void hw_pool_destroy(); void run_on_current_pool(qt_threadpool_func_type func, void *arg); +uint32_t get_num_delegated_threads(); #endif diff --git a/src/threadpool.c b/src/threadpool.c index 5003fa11f..e8ab97c66 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -382,9 +382,11 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { atomic_store_explicit(&hw_pool.num_threads, 0, memory_order_release); } -// TODO: interface for querying about the delgated thread pool. -// At least let the user get the number of threads available. -// If no thread pool is delgated, just report a single thread. +API_FUNC uint32_t get_num_delegated_threads() { + if (delegated_pool) return delegated_pool->num_threads; + // Every thread at least has itself available for work. + return 1; +} // TODO: have the main thread fill the role of thread 0. // Instead of having the main thread wait/resume, swap in its thread-locals diff --git a/test/internal/threadpool.c b/test/internal/threadpool.c index 16aabec44..a27fd7264 100644 --- a/test/internal/threadpool.c +++ b/test/internal/threadpool.c @@ -2,13 +2,17 @@ #include "qt_threadpool.h" static int on_thread_test(void *arg) { + test_check(get_num_delegated_threads() == 1); printf("hello from thread\n"); return 0; } int main() { + test_check(get_num_delegated_threads() == 1); hw_pool_init(2); + test_check(get_num_delegated_threads() == 2); hw_pool_destroy(); + test_check(get_num_delegated_threads() == 1); hw_pool_init(2); run_on_current_pool(&on_thread_test, NULL); hw_pool_destroy(); From 4a293e1f87b23aab58b40a3e35f69a5a81d0e001 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 1 Apr 2025 12:39:16 -0600 Subject: [PATCH 5/6] remove outdated comment. --- src/threadpool.c | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/threadpool.c b/src/threadpool.c index e8ab97c66..f2845e361 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -283,15 +283,6 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { while (i < num_threads) { pooled_thread_control *thread_control = (pooled_thread_control *)(buffer + alignment * (size_t)i); - // Initialize the thread control struct in two 128b atomic writes. - // TODO: It's possible to just do this in a single 256b atomic write on most - // x86 platforms. That may also require increasing the alignment constraints - // for the control_slice. - // TODO: also ifdef in an implementation for platforms that can't do - // lock-free 128b writes or that don't handle mixed-size atomic writes. - // TODO: making some kind of ifunc to handle this initialization is probably - // actually the right way to do it because it's hard to know enough about - // the CPU at compile-time. init_thread_control(thread_control, i, &hw_pool); int status; #ifdef QPOOL_USE_PTHREADS From 95da0059565c58d7f228d2ac7c1834c8b719808a Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 1 Apr 2025 14:45:25 -0600 Subject: [PATCH 6/6] Have the main thread take the role of thread zero when running the threadpool. --- src/threadpool.c | 80 ++++++++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/src/threadpool.c b/src/threadpool.c index f2845e361..c55956bcf 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -254,7 +254,6 @@ static int pooled_thread_func(void *void_arg) { API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { if unlikely (!num_threads) return POOL_INIT_NO_THREADS_SPECIFIED; uint32_t old = 0u; - assert(num_threads < UINT32_MAX); if unlikely (!atomic_compare_exchange_strong_explicit(&hw_pool.num_threads, &old, num_threads, @@ -284,16 +283,21 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { pooled_thread_control *thread_control = (pooled_thread_control *)(buffer + alignment * (size_t)i); init_thread_control(thread_control, i, &hw_pool); - int status; + if (i) { + int status; #ifdef QPOOL_USE_PTHREADS - status = pthread_create( - &thread_control->thread, &attr, pooled_thread_func, thread_control); - if unlikely (status) goto cleanup_threads; + status = pthread_create( + &thread_control->thread, &attr, pooled_thread_func, thread_control); + if unlikely (status) goto cleanup_threads; #else - status = - thrd_create(&thread_control->thread, pooled_thread_func, thread_control); - if unlikely (status != thrd_success) goto cleanup_threads; + status = thrd_create( + &thread_control->thread, pooled_thread_func, thread_control); + if unlikely (status != thrd_success) goto cleanup_threads; #endif + } + // Leave the thread object uninitialized for thread 0. + // It needs to be there for the sake of alignment, + // but other than that it's unused. ++i; } #ifdef QPOOL_USE_PTHREADS @@ -303,7 +307,13 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { return POOL_INIT_SUCCESS; cleanup_threads: if (i) { + // Last thread failed to launch, so no need to clean it up. + // If an error was raised it would have been at an iteration + // higher than 0 for the thread create loop since no thread is + // created at 0. uint32_t j = --i; + // current thread does the work of worker zero so + // no need to signal or join for that one. while (i) { // TODO: fix deinit to match new layout and interrupt mechanism. pooled_thread_control *thread_control = @@ -347,17 +357,17 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { atomic_load_explicit(&hw_pool.num_threads, memory_order_relaxed); char *buffer = atomic_load_explicit(&hw_pool.threads, memory_order_relaxed); size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); - uint32_t i = num_threads; + uint32_t i = num_threads - 1u; + // Current thread is thread 0 so no need to notify/join that one. while (i) { - --i; // TODO: fix deinit to match new layout and interrupt mechanism. pooled_thread_control *thread_control = (pooled_thread_control *)(buffer + alignment * (size_t)i); notify_worker_of_termination(thread_control); + --i; } - i = num_threads; + i = num_threads - 1u; while (i) { - --i; pooled_thread_control *thread_control = (pooled_thread_control *)(buffer + alignment * (size_t)i); // TODO: crash informatively if join fails somehow. @@ -366,6 +376,7 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { #else thrd_join(thread_control->thread, NULL); #endif + --i; } atomic_store_explicit(&hw_pool.threads, NULL, memory_order_relaxed); @@ -379,31 +390,40 @@ API_FUNC uint32_t get_num_delegated_threads() { return 1; } -// TODO: have the main thread fill the role of thread 0. -// Instead of having the main thread wait/resume, swap in its thread-locals -// then have it run the per-thread function. -// This will avoid the suspend/resume OS overheads for at least that thread. +// Note: current thread fills the role of thread zero in the pool. API_FUNC void pool_run_on_all(pool_header *pool, qt_threadpool_func_type func, void *arg) { uint32_t num_threads = atomic_load_explicit(&pool->num_threads, memory_order_relaxed); assert(num_threads); - assert(num_threads < UINT32_MAX); - char *buffer = - (char *)atomic_load_explicit(&pool->threads, memory_order_relaxed); - atomic_store_explicit( - &pool->num_active_threads, num_threads, memory_order_relaxed); - init_main_sync(pool); - size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); - for (uint32_t i = 0u; - i < atomic_load_explicit(&pool->num_threads, memory_order_relaxed); - i++) { - pooled_thread_control *thread_control = - (pooled_thread_control *)(buffer + alignment * (size_t)i); - launch_work_on_thread(thread_control, func, arg); + if (num_threads > 1u) { + char *buffer = + (char *)atomic_load_explicit(&pool->threads, memory_order_relaxed); + atomic_store_explicit( + &pool->num_active_threads, num_threads - 1u, memory_order_relaxed); + init_main_sync(pool); + size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); + for (uint32_t i = 1u; + i < atomic_load_explicit(&pool->num_threads, memory_order_relaxed); + i++) { + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + launch_work_on_thread(thread_control, func, arg); + } + } + uint32_t outer_index = context_index; + context_index = 0u; + pool_header *outer_delegated_pool = delegated_pool; + delegated_pool = NULL; + func(arg); + delegated_pool = outer_delegated_pool; + context_index = outer_index; + if (num_threads > 1u) { + // some loops may have threads that take dramatically longer + // so we still suspend, but it's potentially for much less time. + suspend_main_while_working(pool); } - suspend_main_while_working(pool); } API_FUNC void run_on_current_pool(qt_threadpool_func_type func, void *arg) {