From 73216aac2dce11e0dedca9f2fb496d5af939f546 Mon Sep 17 00:00:00 2001 From: idoleat Date: Thu, 6 Jun 2024 19:00:26 +0800 Subject: [PATCH 01/11] Setup the base tone of describing ABA problem First describing what can go wrong and then explain the root cause of it. Example code will be added later on as well as sultions to ABA problem. --- concurrency-primer.tex | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index 37aee7f..e02e9d9 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -883,6 +883,29 @@ \subsection{Conclusion about lock-free} Balancing complexity and performance is essential in concurrency, a domain fraught with challenges. +\subsection{ABA problem} +We have introduced CAS as one of the read-modify-write operations. +However, does the target object not changing really mean that no other threads modified it halfway through? +Consider the following scenario, + +\begin{ccode} + /* example code place holder */ + do { /* read and modify */ } + /* something can happen in between */ + while(CAS); +\end{ccode} + +If the target object is changed to something by other thread and changed back, the result of comparism is still equal. +Although the target object has indeed been changed, causing the operation to not remain atomic. +We call this \introduce{ABA problem}. In the example above, the presents of ABA problem leads to + +The ABA problem occurs when changes occur between reading and comparing, but the comparing mechanism is unable to identify that the state is not the latest. +The maximum number we refer to may not be the actual maximum, and the next job may not be the same job. +Failure to recognize this through comparison can result in outdated information. +The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state. +If acting on the old state is chosen, then safe memory reclamation should be considered as memory may have already been freed by other threads. +More aggressively, one might consider the programming paradigm where each operation on the target object does not have a side effect on modifying it. + \section{Sequential consistency on weakly-ordered hardware} Different hardware architectures offer distinct memory models or \introduce{memory models}. From b9ddce23e45f12704241f5553dc03ed4bc8061b0 Mon Sep 17 00:00:00 2001 From: idoleat Date: Thu, 13 Jun 2024 16:26:15 +0800 Subject: [PATCH 02/11] Add a simple ABA problem example and more description Because ABA problem is not so intuitive, the plan is to have a simple example first, then reusing the new example in section 5. A nother stack example by "null program" will be mentioned for additional examples. Next commit will provide solutions to fix ABA problem in both examples. --- concurrency-primer.tex | 62 +++++++++++++++++++++++++++++------ examples/simple_aba_example.c | 39 ++++++++++++++++++++++ 2 files changed, 91 insertions(+), 10 deletions(-) create mode 100644 examples/simple_aba_example.c diff --git a/concurrency-primer.tex b/concurrency-primer.tex index e02e9d9..ad3251f 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -886,25 +886,67 @@ \subsection{Conclusion about lock-free} \subsection{ABA problem} We have introduced CAS as one of the read-modify-write operations. However, does the target object not changing really mean that no other threads modified it halfway through? +If the target object is changed to something by other thread and changed back, the result of comparison is still equal. +Although the target object has indeed been changed, causing the operation not remaining atomic. +We call this \introduce{ABA problem}. Consider the following scenario, +\inputminted{c}{./examples/simple_aba_example.c} + +Compile it with \monobox{gcc -std=c11 -Wall -Wextra -pthread simple\_aba\_example.c}. +The execution result would be: + +\begin{ccode} + A: v = 42 + B: v = 47 + B: v = 42 + A: v = 52 +\end{ccode} + +In the example provided, the presence of ABA problem results in thread A being unaware that variable \monobox{v} has been altered. +Since the comparison result indicates \monobox{v} unchanged, \monobox{v + 10} is swapped in. +Here sleeping is only used to ensure the occurance of ABA problem. +In real world scenario, instead of sleeping, thread A could paused by being context switched for other tasks, including being preempted by higher priority tasks. +This example seems harmless, but things can get nasty when atomic \textsc{RMW} operations are used in more complex data structures. + +In a broader context, the ABA problem occurs when changes occur between loading and comparing, but the comparing mechanism is unable to identify that the state of the target object is not the latest, yielding a false positive result. + +Back to thread pool example in \secref{rmw}, it contains ABA problem as well. +In \monobox{worker} function, we have the thread trying to claim the job. + \begin{ccode} - /* example code place holder */ - do { /* read and modify */ } - /* something can happen in between */ - while(CAS); + job_t *job = atomic_load(&thrd_pool->head->prev); + while (!atomic_compare_exchange_strong(&thrd_pool->head->prev, &job, + job->prev)) { + } \end{ccode} -If the target object is changed to something by other thread and changed back, the result of comparism is still equal. -Although the target object has indeed been changed, causing the operation to not remain atomic. -We call this \introduce{ABA problem}. In the example above, the presents of ABA problem leads to +Consider the following scenario: +\begin{enumerate} + \item There is only one job left. + \item Thread A loads the pointer to the job by \monobox{atomic\_load()}. + \item Thread A is preempted. + \item Thread B claims the job and successfully updates \monobox{thrd\_pool->head->prev}. + \item Thread B sets thread pool state to idle. + \item Main thread fininshes waiting and adds more jobs. + \item Memory allocator reuses the recently freed memory as new jobs addresses. + \item Fortunately, the first added job has the same address as the one Thread A holded. + \item Thread A is back in running state. The comparison result is equal so it updates \monobox{thrd\_pool->head->prev} with the old \monobox{job->prev}, which is already a dangling pointer. + \item Another thread loads the dangling pointer from \monobox{thrd\_pool->head->prev}. +\end{enumerate} -The ABA problem occurs when changes occur between reading and comparing, but the comparing mechanism is unable to identify that the state is not the latest. -The maximum number we refer to may not be the actual maximum, and the next job may not be the same job. -Failure to recognize this through comparison can result in outdated information. +Notice that even though \monobox{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison. +At the end, the dangling pointer could either point to garbage or trigger segmentation fault. +It could be even worse if nested ABA problem occurs in thread B. + +Failure to recognize changed target object through comparison can result in stale information. The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state. If acting on the old state is chosen, then safe memory reclamation should be considered as memory may have already been freed by other threads. More aggressively, one might consider the programming paradigm where each operation on the target object does not have a side effect on modifying it. +In the later section, we will introduce a different way of implementing atomic \textsc{RMW} operations by using LL/SC instructions. The exclusiveness provided by LL/SC instructions avoids the pitfall introduced by comparison. + +To make different state distinguishable, a common solution is adding a version number to be compared as well. + \section{Sequential consistency on weakly-ordered hardware} diff --git a/examples/simple_aba_example.c b/examples/simple_aba_example.c new file mode 100644 index 0000000..a22d5ab --- /dev/null +++ b/examples/simple_aba_example.c @@ -0,0 +1,39 @@ +#include +#include +#include +#include +atomic_int v = 42; + +int threadA(void *args) +{ + int va; + do { + va = atomic_load(&v); + printf("A: v = %d\n", va); + /* Ensure thread B do something before comparing */ + thrd_sleep(&(struct timespec){ .tv_sec = 1 }, NULL); + } while (atomic_compare_exchange_strong(&v, &va, va + 10)); + printf("A: v = %d\n", atomic_load(&v)); + + return 0; +} + +int threadB(void *args) +{ + atomic_fetch_add(&v, 5); + printf("B: v = %d\n", atomic_load(&v)); + atomic_fetch_sub(&v, 5); + printf("B: v = %d\n", atomic_load(&v)); + + return 0; +} + +int main() +{ + thrd_t A, B; + thrd_create(&A, threadA, NULL); + thrd_create(&B, threadB, NULL); + /* Ensure all threads complete */ + thrd_sleep(&(struct timespec){ .tv_sec = 2 }, NULL); + return 0; +} From 9f196f646e3f2b91b84e7233a448cbc46a571f51 Mon Sep 17 00:00:00 2001 From: idoleat Date: Sun, 16 Jun 2024 20:02:16 +0800 Subject: [PATCH 03/11] Add full thread pool example and description Approaches are explained first, then the code and explanations on changes. --- concurrency-primer.tex | 37 +++++-- examples/rmw_example_aba.c | 192 +++++++++++++++++++++++++++++++++++++ 2 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 examples/rmw_example_aba.c diff --git a/concurrency-primer.tex b/concurrency-primer.tex index ad3251f..650cce4 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -916,7 +916,7 @@ \subsection{ABA problem} \begin{ccode} job_t *job = atomic_load(&thrd_pool->head->prev); - while (!atomic_compare_exchange_strong(&thrd_pool->head->prev, &job, + while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job, job->prev)) { } \end{ccode} @@ -930,23 +930,46 @@ \subsection{ABA problem} \item Thread B sets thread pool state to idle. \item Main thread fininshes waiting and adds more jobs. \item Memory allocator reuses the recently freed memory as new jobs addresses. - \item Fortunately, the first added job has the same address as the one Thread A holded. + \item Fortunately, the first added job has the same address as the one thread A holded. \item Thread A is back in running state. The comparison result is equal so it updates \monobox{thrd\_pool->head->prev} with the old \monobox{job->prev}, which is already a dangling pointer. \item Another thread loads the dangling pointer from \monobox{thrd\_pool->head->prev}. \end{enumerate} -Notice that even though \monobox{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison. +Notice that even though \monobox{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison. At the end, the dangling pointer could either point to garbage or trigger segmentation fault. It could be even worse if nested ABA problem occurs in thread B. +Also, the possibility to allocate a job with same address could be higher when using memory pool, meaning that more chances to have ABA problem occurred. +In fact, pre-allocated memory should be used to achive lock-free since \monobox{malloc} could have mutex involved in multithreaded environment. Failure to recognize changed target object through comparison can result in stale information. The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state. If acting on the old state is chosen, then safe memory reclamation should be considered as memory may have already been freed by other threads. More aggressively, one might consider the programming paradigm where each operation on the target object does not have a side effect on modifying it. -In the later section, we will introduce a different way of implementing atomic \textsc{RMW} operations by using LL/SC instructions. The exclusiveness provided by LL/SC instructions avoids the pitfall introduced by comparison. - -To make different state distinguishable, a common solution is adding a version number to be compared as well. - +In the later section, we will introduce a different way of implementing atomic \textsc{RMW} operations by using LL/SC instructions. The exclusive status provided by LL/SC instructions avoids the pitfall introduced by comparison. + +To make different state distinguishable, a common solution is incrementing a version number each time target object is changed. +By bundling the target object and version into a comparison, it ensures that each change marks a distinguishable result. +Given a sufficient large size for version number, there should be no repeated version numbers. +There are multiple methods for storing the version number, depending on the evaluation of the duration before a version number wraps around. +In the thread pool example, the target object is a pointer. The unused bits in a pointer can be utilized to store the version number. +In addition to embedding the version number into a pointer, we could consider utilizing an additional 32-bit or 64-bit value next to the target object for the version number. +It requires the compare-and-swap instruction to be capable of comparing a wider size at once. +Sometimes, this is referred to as \introduce{double-width compare-and-swap}. +On x86-64 processors, for atomic instructions that load or store more that a CPU word size, it needs additional hardware support. +You can use \monobox{grep cx16 /proc/crpuinfo} to check if the processor supports 16-byte compare-and-swap. +For hardware that does not support the desired size, software implementations which may have locks involve are used instead as mentioned in \secref{arbitrarily-size}. +Back to the example, the following code is fixed by using an an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx64} to enable 16-byte compare-and-swap in \monobox{worker} function. + +\inputminted{c}{./examples/rmw_example_aba.c} + +Notice that, in the \monobox{struct idle\_job}, a union is used for type punning, which bundles the pointer and version number for compare-and-swap. +Directly casting a job pointer to a pointer that points to a 16-byte object is undefined behavior (due to having different alignment), thus type punnined is used instead. +By using this techniques, \monobox{struct idle\_job} still can be accessed normally in other places, minimizing code modification. +Compiler optimizations are conservative on type punning, but it is acceptable for atomic operations. +See \secref{fusing}. +Another way to prevent ABA problem in the example is using safe memory reclamation mechanisms. +Different from previously mentioned acting on the old state, the address of a job is not freed until no one is using it. +This prevents memory allocator or memory pool reuses the address and causing problem. \section{Sequential consistency on weakly-ordered hardware} diff --git a/examples/rmw_example_aba.c b/examples/rmw_example_aba.c new file mode 100644 index 0000000..07a9c43 --- /dev/null +++ b/examples/rmw_example_aba.c @@ -0,0 +1,192 @@ +#include +#include +#include +#include +#include +#include +#include + +#define CACHE_LINE_SIZE 64 +#define N_JOBS 16 +#define N_THREADS 8 + +typedef struct job { + void *args; + struct job *next, *prev; +} job_t; + +typedef struct idle_job { + union { + struct { + _Atomic(job_t *) prev; + unsigned long long version; + }; + _Atomic struct B16 { + job_t *_prev; + unsigned long long _version; + } DCAS; + }; + char padding[CACHE_LINE_SIZE - sizeof(_Atomic(job_t *)) - + sizeof(unsigned long long)]; + job_t job; +} idle_job_t; + +enum state { idle, running, cancelled }; + +typedef struct thread_pool { + atomic_flag initialezed; + int size; + thrd_t *pool; + atomic_int state; + thrd_start_t func; + // job queue is a SPMC ring buffer + idle_job_t *head; +} thread_pool_t; + +static int worker(void *args) +{ + if (!args) + return EXIT_FAILURE; + thread_pool_t *thrd_pool = (thread_pool_t *)args; + + while (1) { + if (atomic_load(&thrd_pool->state) == cancelled) + return EXIT_SUCCESS; + if (atomic_load(&thrd_pool->state) == running) { + // claim the job + struct B16 job = atomic_load(&thrd_pool->head->DCAS); + struct B16 next; + do { + next._prev = job._prev->prev; + next._version = job._version; + } while (!atomic_compare_exchange_weak(&thrd_pool->head->DCAS, &job, + next)); + + if (job._prev->args == NULL) { + atomic_store(&thrd_pool->state, idle); + } else { + printf("Hello from job %d\n", *(int *)job._prev->args); + free(job._prev->args); + free(job._prev); // could cause dangling pointer in other threads + } + } else { + /* To auto run when jobs added, set status to running if job queue is not empty. + * As long as the producer is protected */ + thrd_yield(); + continue; + } + }; +} + +static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size) +{ + if (atomic_flag_test_and_set(&thrd_pool->initialezed)) { + printf("This thread pool has already been initialized.\n"); + return false; + } + + assert(size > 0); + thrd_pool->pool = malloc(sizeof(thrd_t) * size); + if (!thrd_pool->pool) { + printf("Failed to allocate thread identifiers.\n"); + return false; + } + + // May use memory pool for jobs + idle_job_t *idle_job = malloc(sizeof(idle_job_t)); + if (!idle_job) { + printf("Failed to allocate idle job.\n"); + return false; + } + // idle_job will always be the first job + idle_job->job.args = NULL; + idle_job->job.next = &idle_job->job; + idle_job->job.prev = &idle_job->job; + idle_job->prev = &idle_job->job; + idle_job->version = 0ULL; + thrd_pool->func = worker; + thrd_pool->head = idle_job; + thrd_pool->state = idle; + thrd_pool->size = size; + + for (size_t i = 0; i < size; i++) { + thrd_create(thrd_pool->pool + i, worker, thrd_pool); + //TODO: error handling + } + + return true; +} + +static void thread_pool_destroy(thread_pool_t *thrd_pool) +{ + if (atomic_exchange(&thrd_pool->state, cancelled)) + printf("Thread pool cancelled with jobs still running.\n"); + for (int i = 0; i < thrd_pool->size; i++) { + thrd_join(thrd_pool->pool[i], NULL); + } + while (thrd_pool->head->prev != &thrd_pool->head->job) { + job_t *job = thrd_pool->head->prev->prev; + free(thrd_pool->head->prev); + thrd_pool->head->prev = job; + } + free(thrd_pool->head); + free(thrd_pool->pool); + atomic_fetch_and(&thrd_pool->state, 0); + atomic_flag_clear(&thrd_pool->initialezed); +} + +__attribute__((nonnull(2))) static bool add_job(thread_pool_t *thrd_pool, + void *args) +{ + // May use memory pool for jobs + job_t *job = malloc(sizeof(job_t)); + if (!job) + return false; + + // unprotected producer + job->args = args; + job->next = thrd_pool->head->job.next; + job->prev = &thrd_pool->head->job; + thrd_pool->head->job.next->prev = job; + thrd_pool->head->job.next = job; + if (thrd_pool->head->prev == &thrd_pool->head->job) { + thrd_pool->head->prev = job; + thrd_pool->head->version += 1; + // trap worker at idle job + thrd_pool->head->job.prev = &thrd_pool->head->job; + } + + return true; +} + +static inline void wait_until(thread_pool_t *thrd_pool, int state) +{ + while (atomic_load(&thrd_pool->state) != state) { + thrd_yield(); + } +} + +int main() +{ + thread_pool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT }; + if (!thread_pool_init(&thrd_pool, N_THREADS)) { + printf("failed to init.\n"); + return 0; + } + for (int i = 0; i < N_JOBS; i++) { + int *id = malloc(sizeof(int)); + *id = i; + add_job(&thrd_pool, id); + } + // Due to simplified job queue (not protecting producer), starting the pool manually + atomic_store(&thrd_pool.state, running); + wait_until(&thrd_pool, idle); + for (int i = 0; i < N_JOBS; i++) { + int *id = malloc(sizeof(int)); + *id = i; + add_job(&thrd_pool, id); + } + atomic_store(&thrd_pool.state, running); + thread_pool_destroy(&thrd_pool); + return 0; +} From 11c2494bd7fb69c8b9afcc28fdf5c9d8f8f808b4 Mon Sep 17 00:00:00 2001 From: idoleat Date: Mon, 17 Jun 2024 01:25:57 +0800 Subject: [PATCH 04/11] Fix typo and grammar mistakes --- concurrency-primer.tex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index 650cce4..11073d3 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -955,10 +955,10 @@ \subsection{ABA problem} In addition to embedding the version number into a pointer, we could consider utilizing an additional 32-bit or 64-bit value next to the target object for the version number. It requires the compare-and-swap instruction to be capable of comparing a wider size at once. Sometimes, this is referred to as \introduce{double-width compare-and-swap}. -On x86-64 processors, for atomic instructions that load or store more that a CPU word size, it needs additional hardware support. -You can use \monobox{grep cx16 /proc/crpuinfo} to check if the processor supports 16-byte compare-and-swap. +On x86-64 processors, for atomic instructions that load or store more than a CPU word size, it needs additional hardware support. +You can use \monobox{\$ grep cx16 /proc/cpuinfo} to check if the processor supports 16-byte compare-and-swap. For hardware that does not support the desired size, software implementations which may have locks involve are used instead as mentioned in \secref{arbitrarily-size}. -Back to the example, the following code is fixed by using an an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx64} to enable 16-byte compare-and-swap in \monobox{worker} function. +Back to the example, ABA problem in the following code is fixed by using an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx64} to enable 16-byte compare-and-swap in \monobox{worker} function. \inputminted{c}{./examples/rmw_example_aba.c} @@ -969,7 +969,7 @@ \subsection{ABA problem} See \secref{fusing}. Another way to prevent ABA problem in the example is using safe memory reclamation mechanisms. Different from previously mentioned acting on the old state, the address of a job is not freed until no one is using it. -This prevents memory allocator or memory pool reuses the address and causing problem. +This prevents memory allocator or memory pool from reusing the address and causing problem. \section{Sequential consistency on weakly-ordered hardware} From e8b81b4c1b0f437a64ef4ca2b8724eff646d5aed Mon Sep 17 00:00:00 2001 From: idoleat Date: Mon, 17 Jun 2024 02:28:34 +0800 Subject: [PATCH 05/11] Fix typo --- concurrency-primer.tex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index 11073d3..df86aa3 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -905,7 +905,7 @@ \subsection{ABA problem} In the example provided, the presence of ABA problem results in thread A being unaware that variable \monobox{v} has been altered. Since the comparison result indicates \monobox{v} unchanged, \monobox{v + 10} is swapped in. -Here sleeping is only used to ensure the occurance of ABA problem. +Here sleeping is only used to ensure the occurrence of ABA problem. In real world scenario, instead of sleeping, thread A could paused by being context switched for other tasks, including being preempted by higher priority tasks. This example seems harmless, but things can get nasty when atomic \textsc{RMW} operations are used in more complex data structures. @@ -928,9 +928,9 @@ \subsection{ABA problem} \item Thread A is preempted. \item Thread B claims the job and successfully updates \monobox{thrd\_pool->head->prev}. \item Thread B sets thread pool state to idle. - \item Main thread fininshes waiting and adds more jobs. + \item Main thread finishes waiting and adds more jobs. \item Memory allocator reuses the recently freed memory as new jobs addresses. - \item Fortunately, the first added job has the same address as the one thread A holded. + \item Fortunately, the first added job has the same address as the one thread A held. \item Thread A is back in running state. The comparison result is equal so it updates \monobox{thrd\_pool->head->prev} with the old \monobox{job->prev}, which is already a dangling pointer. \item Another thread loads the dangling pointer from \monobox{thrd\_pool->head->prev}. \end{enumerate} @@ -963,7 +963,7 @@ \subsection{ABA problem} \inputminted{c}{./examples/rmw_example_aba.c} Notice that, in the \monobox{struct idle\_job}, a union is used for type punning, which bundles the pointer and version number for compare-and-swap. -Directly casting a job pointer to a pointer that points to a 16-byte object is undefined behavior (due to having different alignment), thus type punnined is used instead. +Directly casting a job pointer to a pointer that points to a 16-byte object is undefined behavior (due to having different alignment), thus type punning is used instead. By using this techniques, \monobox{struct idle\_job} still can be accessed normally in other places, minimizing code modification. Compiler optimizations are conservative on type punning, but it is acceptable for atomic operations. See \secref{fusing}. From ecd1b5728b533235ef4e3eae5c676e6e4737bd09 Mon Sep 17 00:00:00 2001 From: idoleat Date: Tue, 23 Jul 2024 23:13:09 +0800 Subject: [PATCH 06/11] Update tpool in aba example to the one in main The new thread pool is ported to the rmw_example_aba.c file. Makefile is updated accordingly as well. --- examples/Makefile | 6 +- examples/rmw_example_aba.c | 194 +++++++++++++++++++++++++------------ 2 files changed, 134 insertions(+), 66 deletions(-) diff --git a/examples/Makefile b/examples/Makefile index f5a58e6..3913ebb 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -1,6 +1,8 @@ all: $(CC) -Wall -o rmw_example rmw_example.c -pthread -lm + $(CC) -Wall -o rmw_example_aba rmw_example_aba.c -pthread -lm -mcx16 clean: - rm -f rmw_example + rm -f rmw_example rmw_example_aba check: all - ./rmw_example \ No newline at end of file + ./rmw_example + ./rmw_example_aba diff --git a/examples/rmw_example_aba.c b/examples/rmw_example_aba.c index 07a9c43..f84e166 100644 --- a/examples/rmw_example_aba.c +++ b/examples/rmw_example_aba.c @@ -1,17 +1,24 @@ -#include #include #include #include #include #include #include +#include +#define PRECISION 100 /* upper bound in BPP sum */ #define CACHE_LINE_SIZE 64 -#define N_JOBS 16 -#define N_THREADS 8 +#define N_THREADS 64 + +struct tpool_future { + void *result; + void *arg; + atomic_flag flag; +}; typedef struct job { - void *args; + void *(*func)(void *); + struct tpool_future *future; struct job *next, *prev; } job_t; @@ -21,64 +28,93 @@ typedef struct idle_job { _Atomic(job_t *) prev; unsigned long long version; }; - _Atomic struct B16 { - job_t *_prev; + _Atomic struct versioned_prev { + job_t *ptr; unsigned long long _version; - } DCAS; + } v_prev; }; char padding[CACHE_LINE_SIZE - sizeof(_Atomic(job_t *)) - - sizeof(unsigned long long)]; + sizeof(unsigned long long)]; /* avoid false sharing */ job_t job; } idle_job_t; enum state { idle, running, cancelled }; -typedef struct thread_pool { +typedef struct tpool { atomic_flag initialezed; int size; thrd_t *pool; atomic_int state; thrd_start_t func; - // job queue is a SPMC ring buffer - idle_job_t *head; -} thread_pool_t; + idle_job_t *head; /* job queue is a SPMC ring buffer */ +} tpool_t; + +static struct tpool_future *tpool_future_create(void *arg) +{ + struct tpool_future *future = malloc(sizeof(struct tpool_future)); + if (future) { + future->result = NULL; + future->arg = arg; + atomic_flag_clear(&future->flag); + atomic_flag_test_and_set(&future->flag); + } + return future; +} + +void tpool_future_wait(struct tpool_future *future) +{ + while (atomic_flag_test_and_set(&future->flag)) + ; +} + +void tpool_future_destroy(struct tpool_future *future) +{ + free(future->result); + free(future); +} static int worker(void *args) { if (!args) return EXIT_FAILURE; - thread_pool_t *thrd_pool = (thread_pool_t *)args; + tpool_t *thrd_pool = (tpool_t *)args; while (1) { + /* worker is laid off */ if (atomic_load(&thrd_pool->state) == cancelled) return EXIT_SUCCESS; if (atomic_load(&thrd_pool->state) == running) { - // claim the job - struct B16 job = atomic_load(&thrd_pool->head->DCAS); - struct B16 next; + /* worker takes the job */ + struct versioned_prev job = atomic_load(&thrd_pool->head->v_prev); + /* worker checks if there is only an idle job in the job queue */ + if (job.ptr == &thrd_pool->head->job) { + /* worker says it is idle */ + atomic_store(&thrd_pool->state, idle); + thrd_yield(); + continue; + } + + struct versioned_prev next; + /* compare 16 byte at once */ do { - next._prev = job._prev->prev; + next.ptr = job.ptr->prev; next._version = job._version; - } while (!atomic_compare_exchange_weak(&thrd_pool->head->DCAS, &job, - next)); + } while (!atomic_compare_exchange_weak(&thrd_pool->head->v_prev, + &job, next)); - if (job._prev->args == NULL) { - atomic_store(&thrd_pool->state, idle); - } else { - printf("Hello from job %d\n", *(int *)job._prev->args); - free(job._prev->args); - free(job._prev); // could cause dangling pointer in other threads - } + job.ptr->future->result = + (void *)job.ptr->func(job.ptr->future->arg); + atomic_flag_clear(&job.ptr->future->flag); + free(job.ptr); } else { - /* To auto run when jobs added, set status to running if job queue is not empty. - * As long as the producer is protected */ + /* worker is idle */ thrd_yield(); - continue; } }; + return EXIT_SUCCESS; } -static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size) +static bool tpool_init(tpool_t *thrd_pool, size_t size) { if (atomic_flag_test_and_set(&thrd_pool->initialezed)) { printf("This thread pool has already been initialized.\n"); @@ -92,14 +128,13 @@ static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size) return false; } - // May use memory pool for jobs idle_job_t *idle_job = malloc(sizeof(idle_job_t)); if (!idle_job) { printf("Failed to allocate idle job.\n"); return false; } - // idle_job will always be the first job - idle_job->job.args = NULL; + + /* idle_job will always be the first job */ idle_job->job.next = &idle_job->job; idle_job->job.prev = &idle_job->job; idle_job->prev = &idle_job->job; @@ -109,21 +144,21 @@ static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size) thrd_pool->state = idle; thrd_pool->size = size; - for (size_t i = 0; i < size; i++) { + /* employer hires many workers */ + for (size_t i = 0; i < size; i++) thrd_create(thrd_pool->pool + i, worker, thrd_pool); - //TODO: error handling - } return true; } -static void thread_pool_destroy(thread_pool_t *thrd_pool) +static void tpool_destroy(tpool_t *thrd_pool) { if (atomic_exchange(&thrd_pool->state, cancelled)) printf("Thread pool cancelled with jobs still running.\n"); - for (int i = 0; i < thrd_pool->size; i++) { + + for (int i = 0; i < thrd_pool->size; i++) thrd_join(thrd_pool->pool[i], NULL); - } + while (thrd_pool->head->prev != &thrd_pool->head->job) { job_t *job = thrd_pool->head->prev->prev; free(thrd_pool->head->prev); @@ -135,16 +170,35 @@ static void thread_pool_destroy(thread_pool_t *thrd_pool) atomic_flag_clear(&thrd_pool->initialezed); } -__attribute__((nonnull(2))) static bool add_job(thread_pool_t *thrd_pool, - void *args) +/* Use Bailey–Borwein–Plouffe formula to approximate PI */ +static void *bbp(void *arg) +{ + int k = *(int *)arg; + double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) - + (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6)); + double *product = malloc(sizeof(double)); + if (!product) + return NULL; + + *product = 1 / pow(16, k) * sum; + return (void *)product; +} + +struct tpool_future *add_job(tpool_t *thrd_pool, void *(*func)(void *), + void *arg) { - // May use memory pool for jobs job_t *job = malloc(sizeof(job_t)); if (!job) - return false; + return NULL; + + struct tpool_future *future = tpool_future_create(arg); + if (!future) { + free(job); + return NULL; + } - // unprotected producer - job->args = args; + job->func = func; + job->future = future; job->next = thrd_pool->head->job.next; job->prev = &thrd_pool->head->job; thrd_pool->head->job.next->prev = job; @@ -152,41 +206,53 @@ __attribute__((nonnull(2))) static bool add_job(thread_pool_t *thrd_pool, if (thrd_pool->head->prev == &thrd_pool->head->job) { thrd_pool->head->prev = job; thrd_pool->head->version += 1; - // trap worker at idle job + /* the previous job of the idle job is itself */ thrd_pool->head->job.prev = &thrd_pool->head->job; } - - return true; + return future; } -static inline void wait_until(thread_pool_t *thrd_pool, int state) +static inline void wait_until(tpool_t *thrd_pool, int state) { - while (atomic_load(&thrd_pool->state) != state) { + while (atomic_load(&thrd_pool->state) != state) thrd_yield(); - } } int main() { - thread_pool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT }; - if (!thread_pool_init(&thrd_pool, N_THREADS)) { + int bbp_args[PRECISION]; + struct tpool_future *futures[PRECISION]; + double bbp_sum = 0; + + tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT }; + if (!tpool_init(&thrd_pool, N_THREADS)) { printf("failed to init.\n"); return 0; } - for (int i = 0; i < N_JOBS; i++) { - int *id = malloc(sizeof(int)); - *id = i; - add_job(&thrd_pool, id); - } - // Due to simplified job queue (not protecting producer), starting the pool manually + /* employer ask workers to work */ atomic_store(&thrd_pool.state, running); + + /* employer wait ... until workers are idle */ wait_until(&thrd_pool, idle); - for (int i = 0; i < N_JOBS; i++) { - int *id = malloc(sizeof(int)); - *id = i; - add_job(&thrd_pool, id); + + /* employer add more job to the job queue */ + for (int i = 0; i < PRECISION; i++) { + bbp_args[i] = i; + futures[i] = add_job(&thrd_pool, bbp, &bbp_args[i]); } + + /* employer ask workers to work */ atomic_store(&thrd_pool.state, running); - thread_pool_destroy(&thrd_pool); + + /* employer wait for the result of job */ + for (int i = 0; i < PRECISION; i++) { + tpool_future_wait(futures[i]); + bbp_sum += *(double *)(futures[i]->result); + tpool_future_destroy(futures[i]); + } + + /* employer destroys the job queue and lays workers off */ + tpool_destroy(&thrd_pool); + printf("PI calculated with %d terms: %.15f\n", PRECISION, bbp_sum); return 0; } From e04392d2366b50046de406b09430ec14a663a11a Mon Sep 17 00:00:00 2001 From: idoleat Date: Tue, 23 Jul 2024 23:52:28 +0800 Subject: [PATCH 07/11] Align tex file with updated code example The compile command is added in the Makefile, same as the previous example code. Missing reference and wrong compiler flag are fixed as well. --- concurrency-primer.tex | 12 ++++++------ examples/Makefile | 4 +++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index df86aa3..25eedf4 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -893,7 +893,6 @@ \subsection{ABA problem} \inputminted{c}{./examples/simple_aba_example.c} -Compile it with \monobox{gcc -std=c11 -Wall -Wextra -pthread simple\_aba\_example.c}. The execution result would be: \begin{ccode} @@ -916,9 +915,10 @@ \subsection{ABA problem} \begin{ccode} job_t *job = atomic_load(&thrd_pool->head->prev); + ... while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job, - job->prev)) { - } + job->prev)) + ; \end{ccode} Consider the following scenario: @@ -939,7 +939,7 @@ \subsection{ABA problem} At the end, the dangling pointer could either point to garbage or trigger segmentation fault. It could be even worse if nested ABA problem occurs in thread B. Also, the possibility to allocate a job with same address could be higher when using memory pool, meaning that more chances to have ABA problem occurred. -In fact, pre-allocated memory should be used to achive lock-free since \monobox{malloc} could have mutex involved in multithreaded environment. +In fact, pre-allocated memory should be used to achieve lock-free since \monobox{malloc} could have mutex involved in multithreaded environment. Failure to recognize changed target object through comparison can result in stale information. The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state. @@ -957,8 +957,8 @@ \subsection{ABA problem} Sometimes, this is referred to as \introduce{double-width compare-and-swap}. On x86-64 processors, for atomic instructions that load or store more than a CPU word size, it needs additional hardware support. You can use \monobox{\$ grep cx16 /proc/cpuinfo} to check if the processor supports 16-byte compare-and-swap. -For hardware that does not support the desired size, software implementations which may have locks involve are used instead as mentioned in \secref{arbitrarily-size}. -Back to the example, ABA problem in the following code is fixed by using an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx64} to enable 16-byte compare-and-swap in \monobox{worker} function. +For hardware that does not support the desired size, software implementations which may have locks involve are used instead, as mentioned in \secref{atomictype}. +Back to the example, ABA problem in the following code is fixed by using an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx16} to enable 16-byte compare-and-swap in \monobox{worker} function. \inputminted{c}{./examples/rmw_example_aba.c} diff --git a/examples/Makefile b/examples/Makefile index 3913ebb..05d2d90 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -1,8 +1,10 @@ all: $(CC) -Wall -o rmw_example rmw_example.c -pthread -lm $(CC) -Wall -o rmw_example_aba rmw_example_aba.c -pthread -lm -mcx16 + $(CC) -Wall -o simple_aba_example simple_aba_example.c -pthread clean: - rm -f rmw_example rmw_example_aba + rm -f rmw_example rmw_example_aba simple_aba_example check: all ./rmw_example ./rmw_example_aba + ./simple_aba_example From 8abe58d27c5c515791052f165278214165d3048f Mon Sep 17 00:00:00 2001 From: idoleat Date: Wed, 24 Jul 2024 01:25:38 +0800 Subject: [PATCH 08/11] Update terminology from "multithread" to "multi-threaded" --- concurrency-primer.tex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index 25eedf4..c950eda 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -939,7 +939,7 @@ \subsection{ABA problem} At the end, the dangling pointer could either point to garbage or trigger segmentation fault. It could be even worse if nested ABA problem occurs in thread B. Also, the possibility to allocate a job with same address could be higher when using memory pool, meaning that more chances to have ABA problem occurred. -In fact, pre-allocated memory should be used to achieve lock-free since \monobox{malloc} could have mutex involved in multithreaded environment. +In fact, pre-allocated memory should be used to achieve lock-free since \monobox{malloc} could have mutex involved in multi-threaded environment. Failure to recognize changed target object through comparison can result in stale information. The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state. From ee7a4a9f5bf7b601fcfa3f86d0befd7ec7c534a1 Mon Sep 17 00:00:00 2001 From: idoleat Date: Fri, 26 Jul 2024 17:28:21 +0800 Subject: [PATCH 09/11] Tweak style annotation Replace \monobox{} with \cc{} to properly apply the style to C code. Dollar sign removed for shell command. --- concurrency-primer.tex | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index c950eda..fb4d607 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -902,8 +902,8 @@ \subsection{ABA problem} A: v = 52 \end{ccode} -In the example provided, the presence of ABA problem results in thread A being unaware that variable \monobox{v} has been altered. -Since the comparison result indicates \monobox{v} unchanged, \monobox{v + 10} is swapped in. +In the example provided, the presence of ABA problem results in thread A being unaware that variable \cc{v} has been altered. +Since the comparison result indicates \cc{v} unchanged, \cc{v + 10} is swapped in. Here sleeping is only used to ensure the occurrence of ABA problem. In real world scenario, instead of sleeping, thread A could paused by being context switched for other tasks, including being preempted by higher priority tasks. This example seems harmless, but things can get nasty when atomic \textsc{RMW} operations are used in more complex data structures. @@ -924,18 +924,18 @@ \subsection{ABA problem} Consider the following scenario: \begin{enumerate} \item There is only one job left. - \item Thread A loads the pointer to the job by \monobox{atomic\_load()}. + \item Thread A loads the pointer to the job by \cc{atomic_load()}. \item Thread A is preempted. - \item Thread B claims the job and successfully updates \monobox{thrd\_pool->head->prev}. + \item Thread B claims the job and successfully updates \cc{thrd_pool->head->prev}. \item Thread B sets thread pool state to idle. \item Main thread finishes waiting and adds more jobs. \item Memory allocator reuses the recently freed memory as new jobs addresses. \item Fortunately, the first added job has the same address as the one thread A held. - \item Thread A is back in running state. The comparison result is equal so it updates \monobox{thrd\_pool->head->prev} with the old \monobox{job->prev}, which is already a dangling pointer. - \item Another thread loads the dangling pointer from \monobox{thrd\_pool->head->prev}. + \item Thread A is back in running state. The comparison result is equal so it updates \cc{thrd_pool->head->prev} with the old \cc{job->prev}, which is already a dangling pointer. + \item Another thread loads the dangling pointer from \cc{thrd_pool->head->prev}. \end{enumerate} -Notice that even though \monobox{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison. +Notice that even though \cc{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison. At the end, the dangling pointer could either point to garbage or trigger segmentation fault. It could be even worse if nested ABA problem occurs in thread B. Also, the possibility to allocate a job with same address could be higher when using memory pool, meaning that more chances to have ABA problem occurred. @@ -956,15 +956,15 @@ \subsection{ABA problem} It requires the compare-and-swap instruction to be capable of comparing a wider size at once. Sometimes, this is referred to as \introduce{double-width compare-and-swap}. On x86-64 processors, for atomic instructions that load or store more than a CPU word size, it needs additional hardware support. -You can use \monobox{\$ grep cx16 /proc/cpuinfo} to check if the processor supports 16-byte compare-and-swap. +You can use \monobox{grep cx16 /proc/cpuinfo} to check if the processor supports 16-byte compare-and-swap. For hardware that does not support the desired size, software implementations which may have locks involve are used instead, as mentioned in \secref{atomictype}. Back to the example, ABA problem in the following code is fixed by using an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx16} to enable 16-byte compare-and-swap in \monobox{worker} function. \inputminted{c}{./examples/rmw_example_aba.c} -Notice that, in the \monobox{struct idle\_job}, a union is used for type punning, which bundles the pointer and version number for compare-and-swap. +Notice that, in the \cc{struct idle_job}, a union is used for type punning, which bundles the pointer and version number for compare-and-swap. Directly casting a job pointer to a pointer that points to a 16-byte object is undefined behavior (due to having different alignment), thus type punning is used instead. -By using this techniques, \monobox{struct idle\_job} still can be accessed normally in other places, minimizing code modification. +By using this techniques, \cc{struct idle_job} still can be accessed normally in other places, minimizing code modification. Compiler optimizations are conservative on type punning, but it is acceptable for atomic operations. See \secref{fusing}. Another way to prevent ABA problem in the example is using safe memory reclamation mechanisms. From 610df96737f8d92c1206052a22ae39ee9acc11d8 Mon Sep 17 00:00:00 2001 From: idoleat Date: Fri, 26 Jul 2024 17:37:20 +0800 Subject: [PATCH 10/11] Proofread introduction to the ABA problem Presents them as statements rather than questions. --- concurrency-primer.tex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index fb4d607..95eb8ae 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -884,10 +884,10 @@ \subsection{Conclusion about lock-free} a domain fraught with challenges. \subsection{ABA problem} -We have introduced CAS as one of the read-modify-write operations. -However, does the target object not changing really mean that no other threads modified it halfway through? -If the target object is changed to something by other thread and changed back, the result of comparison is still equal. -Although the target object has indeed been changed, causing the operation not remaining atomic. +CAS has been introduced as one of the read-modify-write operations. +However, the target object not changing does not necessarily mean that no other threads modified it halfway through. +If the target object is changed by another thread and then changed back, the result of comparison would still be equal. +In this case, the target object has indeed been modified, yet the operation appears unchanged, compromising its atomicity. We call this \introduce{ABA problem}. Consider the following scenario, From ea19caddd95739ad8c42a098a3cd28a8240941e1 Mon Sep 17 00:00:00 2001 From: idoleat Date: Sat, 27 Jul 2024 17:24:32 +0800 Subject: [PATCH 11/11] Improve the statement of the impact of ABA problem --- concurrency-primer.tex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/concurrency-primer.tex b/concurrency-primer.tex index 95eb8ae..d5c0400 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -941,7 +941,8 @@ \subsection{ABA problem} Also, the possibility to allocate a job with same address could be higher when using memory pool, meaning that more chances to have ABA problem occurred. In fact, pre-allocated memory should be used to achieve lock-free since \monobox{malloc} could have mutex involved in multi-threaded environment. -Failure to recognize changed target object through comparison can result in stale information. +Being unable to determine whether the target object has been changed through comparison could result in a false positive when the return value of CAS is true. +Thus, the atomicity provided by CAS is not guaranteed. The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state. If acting on the old state is chosen, then safe memory reclamation should be considered as memory may have already been freed by other threads. More aggressively, one might consider the programming paradigm where each operation on the target object does not have a side effect on modifying it.