diff --git a/concurrency-primer.tex b/concurrency-primer.tex index 2d096bb..d531e32 100644 --- a/concurrency-primer.tex +++ b/concurrency-primer.tex @@ -217,7 +217,7 @@ \section{Background} Initially, any optimizing compiler will restructure your code to enhance performance on its target hardware. The primary objective is to maintain the operational effect within \emph{the current thread}, allowing reads and writes to be rearranged to prevent pipeline stalls\footnote{% -Most \textsc{CPU} architectures execute segments of multiple instructions concurrently to improve throughput (refer to \fig{pipeline}). +Most \textsc{CPU} architectures execute segments of multiple instructions concurrently to improve throughput (refer to \fig{fig:pipeline}). A stall, or suspension of forward progress, occurs when an instruction awaits the outcome of a preceding one in the pipeline until the necessary result becomes available.} or to optimize data locality.\punckern\footnote{% \textsc{RAM} accesses data not byte by byte, but in larger units known as \introduce{cache lines}. Grouping frequently used variables on the same cache line means they are processed together, @@ -232,21 +232,21 @@ \section{Background} Even without compiler alterations, we would face challenges because our hardware complicates matters further! Modern \textsc{CPU}s operate in a fashion far more complex than what traditional pipelined methods, -like those depicted in \fig{pipeline}, suggest. +like those depicted in \fig{fig:pipeline}, suggest. They are equipped with multiple data paths tailored for various instruction types and schedulers that reorder and direct instructions through these paths. \includegraphics[keepaspectratio,width=0.7\linewidth]{images/pipeline} \captionof{figure}{A traditional five-stage \textsc{CPU} pipeline with fetch, decode, execute, memory access, and write-back stages. Modern designs are much more complicated, often reordering instructions on the fly.} -\label{pipeline} +\label{fig:pipeline} It is quite common to form oversimplified views about memory operations. -Picturing a multi-core processor setup might lead us to envision a model similar to \fig{ideal-machine}, +Picturing a multi-core processor setup might lead us to envision a model similar to \fig{fig:ideal-machine}, wherein each core alternately accesses and manipulates the system's memory. \includegraphics[keepaspectratio, width=0.8\linewidth]{ideal-machine} \captionof{figure}{An idealized multi-core processor where cores take turns accessing a single shared set of memory.} -\label{ideal-machine} +\label{fig:ideal-machine} The reality is far from straightforward. Although processor speeds have surged exponentially in recent decades, @@ -260,7 +260,7 @@ \section{Background} \includegraphics[keepaspectratio, width=0.8\linewidth]{images/mp-cache} \captionof{figure}{A common memory hierarchy for modern multi-core processors} -\label{dunnington} +\label{fig:dunnington} The myriad complexities within multithreaded programs on multi-core \textsc{CPU}s lead to a lack of a uniform concept of ``now''. Establishing some semblance of order among threads requires a concerted effort involving the hardware, @@ -321,6 +321,15 @@ \section{Enforcing law and order} This model, defined by Leslie Lamport in 1979, is called \introduce{sequential consistency}. +Notice that using atomic variables as an lvalue expression, such as \monobox{v\_ready = true} and \monobox{while(!v\_ready)}, is a convenient alternative to explicitly using \monobox{atomic\_load} or \monobox{atomic\_store}.\punckern\footnote{% +Atomic load/store are not necessary generated as atomic instructions. +Under a weaker consistency model, they could simply be normal load/store, +and their code generation can vary across different architectures. +Checkout \href{https://llvm.org/docs/Atomics.html\#atomics-and-codegen}{LLVM's document} as an example to see how it is handled.} +As stated in C11 6.7.2.4 and 6.7.3, the properties associated with atomic types are meaningful only for expressions that are +lvalues. +Lvalue-to-rvalue conversion (which models a memory read from an atomic location to a CPU register) strips atomicity along with other qualifiers. + \section{Atomicity} \label{atomicity} But order is only one of the vital ingredients for inter-thread communication. @@ -345,15 +354,15 @@ \section{Atomicity} \includegraphics[keepaspectratio, width=0.8\linewidth]{images/atomicity} \captionof{figure}{A flowchart depicting how two concurrent programs communicate and coordinate through a shared resource to achieve a goal, accessing the shared resource.} -\label{atomicity} +\label{fig:atomicity} -Summary of concepts from the first three sections, as shown in \fig{atomicity}. +Summary of concepts from the first three sections, as shown in \fig{fig:atomicity}. In \secref{background}, we observe the importance of maintaining the correct order of operations: t3 \to t4 \to t5 \to t6 \to t7, so that two concurrent programs can function as expected. In \secref{seqcst}, we see how two concurrent programs communicate to guarantee the order of operations: t5 \to t6. In \secref{atomicity}, we understand that certain operations must be treated as a single atomic step to ensure the order of operations: t3 \to t4 \to t5 and the order of operations: t6 \to t7. \section{Arbitrarily-sized ``atomic'' types} - +\label{atomictype} Along with \cc|atomic_int| and friends, \cplusplus{} provides the template \cpp|std::atomic| for defining arbitrary atomic types. \clang{}, lacking a similar language feature but wanting to provide the same functionality, @@ -380,39 +389,59 @@ \section{Arbitrarily-sized ``atomic'' types} \section{Read-modify-write} \label{rmw} -Loads and stores are all well and good, -but sometimes we need to read a value, modify it, -and write it back as a single atomic step. -There are a few common \introduce{read-modify-write} (\textsc{RMW}) operations. +So far we have introduced the importance of order and atomicity. +In \secref{seqcst}, we see how an atomic object ensures the order of single store or load operation is not reordered by the compiler within a program. +Only upon establishing the correct inter-thread order can we continue to pursue how multiple threads can establish a correct cross-thread order. +After achieving this goal, we can further explore how concurrent threads can coordinate and collaborate smoothly. +In \secref{atomicity}, there is a need for atomicity to ensure that a group of operations is not only sequentially executed but also completes without being interrupted by operation from other threads. +This establishes correct order of operations from different threads. + +\includegraphics[keepaspectratio, width=0.6\linewidth]{images/atomic-rmw} +\captionof{figure}{Exchange, Test and Set, Fetch and…, Compare and Swap can all be transformed into atomic RMW operations, ensuring that operations like t1 \to t2 \to t3 will become an atomic step.} +\label{fig:atomic-rmw} + +Atomic loads and stores are all well and good when we do not need to consider the previous state of atomic variables, but sometimes we need to read a value, modify it, and write it back as a single atomic step. +As shown in \fig{fig:atomic-rmw}, the modification is based on the previous state that is visible for reading, and the result is then written back. +A complete \introduce{read-modify-write} operation is performed atomically to ensure visibility to subsequent operations. + +Furthermore, for communication between concurrent threads, a shared resource is required, as shown in \fig{fig:atomicity} +Think back to the discussion in previous sections. +In order for concurrent threads to collaborate on operating a shared resource, we need a way to communicate. +Thus, the need for a channel for communication arises with the appearance of the shared resource. + +As discussed earlier, the process of accessing shared resources responsible for communication must also ensure both order and non-interference. +To prevent the recursive protection of shared resources, +atomic operations can be introduced for the shared resources responsible for communication, as shown in \fig{fig:atomic-types}. + +There are a few common \introduce{read-modify-write} (\textsc{RMW}) operations to make theses operation become a single atomic step. In \cplusplus{}, they are represented as member functions of \cpp|std::atomic|. In \clang{}, they are freestanding functions. +\includegraphics[keepaspectratio, width=1\linewidth]{images/atomic-types} +\captionof{figure}{Test and Set (Left) and Compare and Swap (Right) leverage their functionality of checking and their atomicity to make other RMW operations perform atomically. +The red color represents atomic RMW operations, while the blue color represents RMW operations that behave atomically.} +\label{fig:atomic-types} + \subsection{Exchange} \label{exchange} - -The simplest atomic \textsc{RMW} operation is an \introduce{exchange}: -the current value is read and replaced with a new one. -To see where this might be useful, -let's tweak our example from \secref{atomicity}: -instead of displaying the total number of processed files, -the \textsc{UI} might want to show how many were processed per second. -We could implement this by having the \textsc{UI} thread read the counter then zero it each second. -But we could get the following race condition if reading and zeroing are separate steps: -\begin{enumerate} - \item The \textsc{UI} thread reads the counter. - \item Before the \textsc{UI} thread has the chance to zero it, - the worker thread increments it again. - \item The \textsc{UI} thread now zeroes the counter, and the previous increment is lost. -\end{enumerate} -If the \textsc{UI} thread atomically exchanges the current value with zero, -the race disappears. +Transform \textsc{RMW} into modifying a private variable first, +and then directly swapping the private variable with the shared variable. +Therefore, we only need to ensure that the second step, +which involves Read that load the shared variable and then Modify and Write that exchange it with the private variable, +is a single atomic step. +This allows programmers to extensively modify the private variable beforehand and only write it to the shared variable when necessary.  \subsection{Test and set} - +\label{Testandset} \introduce{Test-and-set} works on a Boolean value: we read it, set it to \cpp|true|, and provide the value it held beforehand. \clang{} and \cplusplus{} offer a type dedicated to this purpose, called \monobox{atomic\_flag}. -We could use it to build a simple spinlock: +The initial value of an \monobox{atomic\_flag} is indeterminate until initialized with \monobox{ATOMIC\_FLAG\_INIT} macro. + +\introduce{Test-and-set} operations are not limited to just \textsc{RMW} functions; +they can also be utilized for constructing simple spinlock. +In this scenario, the flag acts as a shared resource for communication between threads. +Thus, spinlock implemented with \introduce{Test-and-set} operations ensures that entire \textsc{RMW} operations on shared resources are performed atomically, as shown in \fig{fig:atomic-types}. \label{spinlock} \begin{ccode} atomic_flag af = ATOMIC_FLAG_INIT; @@ -431,73 +460,127 @@ \subsection{Test and set} someone else has acquired the lock and we must wait until they release it by clearing the flag. \subsection{Fetch and…} - -We can also read a value, -perform a simple operation on it (such as addition, subtraction, -or bitwise \textsc{AND}, \textsc{OR}, \textsc{XOR}) and return its previous value, -all as part of a single atomic operation. -You might recall from the exchange example that additions by the worker thread must be atomic to prevent races, where: -\begin{enumerate} - \item The worker thread loads the current counter value and adds one. - \item Before that thread can store the value back, - the \textsc{UI} thread zeroes the counter. - \item The worker now performs its store, as if the counter was never cleared. -\end{enumerate} +Transform \textsc{RMW} to directly modify the shared variable (such as addition, subtraction, +or bitwise \textsc{AND}, \textsc{OR}, \textsc{XOR}) and return its previous value, +all as part of a single atomic operation. +Compare with \introduce{Exchange} \secref{exchange}, when programmers only need to make simple modification to the shared variable, +they can use \introduce{Fetch-and…}. \subsection{Compare and swap} \label{cas} - Finally, we have \introduce{compare-and-swap} (\textsc{CAS}), sometimes called \introduce{compare-and-exchange}. -It allows us to conditionally exchange a value \emph{if} its previous value matches some expected one. -In \clang{} and \cplusplus{}, \textsc{CAS} resembles the following, +It allows us to conditionally exchange a value \emph{if} its previous value matches the expected one. +In \clang{} and \cplusplus{}, as noted in C11 7.17.7.4, \textsc{CAS} resembles the following, if it were executed atomically: -\begin{cppcode} -template -bool atomic::compare_exchange_strong( - T& expected, T desired) +\begin{ccode} +/* A is an atomic type. C is the non-atomic type corresponding to A */ +bool atomic_compare_exchange_strong(A* obj, C* expected, C desired) { - if (*this == expected) { - *this = desired; + if (memcmp(obj, expected, sizeof(*object)) == 0) { + memcpy(obj, &desired, sizeof(*object)); return true; + } else { + memcpy(expected, obj, sizeof(*object)); + return false; } - expected = *this; - return false; } -\end{cppcode} +\end{ccode} \begin{samepage} \noindent The \cpp|_strong| suffix may leave you wondering if there is a corresponding ``weak'' \textsc{CAS}. Indeed, there is. However, we will delve into that topic later in \secref{spurious-llsc-failures}. \end{samepage} -Let's say we have some long-running task that we might want to cancel. -We'll give it three states: \textit{idle}, \textit{running}, -and \textit{cancelled}, and write a loop that exits when it is cancelled. -\begin{cppcode} -enum class TaskState : int8_t { - Idle, Running, Cancelled -}; - -std::atomic ts; +Because \textsc{CAS} involves an expected value comparison, +it allows \textsc{CAS} operations to extend beyond just \textsc{RMW} functions. +Here's how it works: First, read the shared resource and use this value as the expected value. +Modify the private variable, and then \textsc{CAS}. Compare the current shared variable with the expected shared variable. +If they match, it indicates that modify is exclusive, ant then write by swaping the shared variable with the private variable. +If they don't match, it implies that interference from another thread has occurred. +Subsequently, update the expected value with the current shared value and retry modify in a loop. +This iterative process allows \textsc{CAS} to serve as a communication mechanism between threads, +ensuring that entire \textsc{RMW} operations on shared resources are performed atomically. +As shown in \fig{fig:atomic-types}, compared with \introduce{Test-and-set} \secref{Testandset}, +a thread that employs \textsc{CAS} can directly use the shared resource to check. +It uses atomic \textsc{CAS} to ensure that Modify is atomic, +coupled with a while loop to ensure that the entire \textsc{RMW} can behave atomically. + +~\\ +However, atomic \textsc{RMW} operations here are merely a programming tool for programmers to achieve program logic correctness. +Its actual execution as atomic operations depends on the how compiler translate it into actual atomic instructions based on differenct hardware instruction set. +\introduce{Exchange}, \introduce{Fetch-and-Add}, \introduce{Test-and-set} and \textsc{CAS} in instruction level are different style of atomic \textsc{RMW} instructions. +ISA could only provide some of them, +leaving the rest to compilers to synthesize atomic \textsc{RMW} operations. +For example, In IA32/64 and IBM System/360/z architectures, +\introduce{Test-and-set} functionality is directly supported by hardware instructions. +x86 has XCHG, XADD for \introduce{Exchange} and \introduce{Fetch-and-Add} but has \introduce{Test-and-set} implemented with XCHG. +Arm, in another style, provides LL/SC (Load Linked/Store Conditional) flavor instructions for all the operations, +with \textsc{CAS} added in Armv8/v9-A. + +\subsection{example} +\label{rmw_example} +The following example code is a simplified implementation of a thread pool, which demonstrates the use of \clang{}11 atomic library. + +\inputminted{c}{./examples/rmw_example.c} + +Stdout of the program is: +\begin{ccode} +PI calculated with 100 terms: 3.141592653589793 +\end{ccode} -void taskLoop() -{ - ts = TaskState::Running; - while (ts == TaskState::Running) { - // Do good work. - } -} -\end{cppcode} -If we want to cancel the task if it is running, but do nothing if it is idle, -we could \textsc{CAS}: -\begin{cppcode} -bool cancel() -{ - auto expected = TaskState::Running; - return ts.compare_exchange_strong(expected, TaskState::Cancelled); +\textbf{Exchange} +In function \monobox{thread\_pool\_destroy}, \monobox{atomic\_exchange(\&thrd\_pool->state, cancelled)} reads the current state and replaces it with ``cancelled''. +A warning message is printed if the pool is destroyed while workers are still ``running''. +If the exchange is not performed atomically, we may initially get the state as ``running''. Subsequently, a thread could set the state to ``cancelled'' after finishing the last one, resulting in a false warning. + +\textbf{Test and set} +In this example, the scenario is as follows: +First, the main thread initially acquires a lock \monobox{future->flag} and then sets it true, +which is akin to creating a job and then transferring its ownership to the worker. +Subsequently, the main thread will be blocked until the worker clears the flag. +This indicates that the main thread will wail until the worker completes the job and returns ownership back to the main thread, which ensures correct cooperation. + +\textbf{Fetch and…} +In the function \monobox{thread\_pool\_destroy}, \monobox{atomic\_fetch\_and} is utilized as a means to set the state to ``idle''. +Yet, in this case, it is not necessary, as the pool needs to be reinitialized for further use regardless. +Its return value could be further utilized, for instance, to report the previous state and perform additional actions. + +\textbf{Compare and swap} +Once threads are created in the thread pool as workers, they will continuously search for jobs to do. +Jobs are taken from the tail of the job queue. +To take a job without being taken by another worker halfway through, we need to atomically change the pointer to the last job. +Otherwise, the last job is under race. +The while loop in the function \monobox{worker}, +\begin{ccode} +while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job, + job->prev)) { } -\end{cppcode} +\end{ccode} +, keeps trying to claim the job atomically until success. + +Built-in post increment and decrement operators and compound assignment on atomic objects, such as \monobox{++} and \monobox{+=}, are read-modify-write atomic operations with total sequentially consistent ordering as well. +They behave equivalently to a \cc|do while| loop. See \clang{}11 standard 6.5.2.4 and 6.5.16.2 for more details. + +What if claiming a job, which updates \cc|thrd_pool->head->prev|, is not done atomically? +Two or more threads could have races updating \cc|thrd_pool->head->prev| and working on the same job. +Data races are undefined behavior in \clang{}11 and \cplusplus{}11. +Working on the same job can lead to duplication of the calculation of \cc|job->future->result|, +use after free and double free on the job. + +But even when jobs were claimed atomically, a thread can still have chances holding a job that has been freed. +This is a defect of the example code. +Jobs in the example are dynamically allocated. They are freed after worker finishes each job. +However, this situation may lead to dangling pointers for workers that are still holding and attempting to claim the job. +If jobs are intended to be dynamically allocated, then safe memory reclamation should be implemented for such shared objects. +RCU, hazard pointer and reference counting are major ways of solving this problem. + +\subsection{Further improvements} +At the beginning of \secref{rmw}, we described how a global total order is established by combining local order and inter-thread order imposed by atomic objects. +But should every object, including non-atomic ones, participate in a single global order established by atomic objects? +\introduce{Sequential consistency} solves the ordering problem in in \secref{seqcst}, but it may force too much ordering, as some normal operations may not require it. +Without specifying, atomic operations in \clang{}11 atomic library use \monobox{memory\_order\_seq\_cst} as default memory order. Operations post-fix with \monobox{\_explicit} accept an additional argument to specify which memory order to use. +How to leverage memory orders to optimize performance will be covered later in \secref{lock-example}. \section{Atomic operations as building blocks} @@ -530,7 +613,7 @@ \section{Atomic operations as building blocks} Lockless algorithms are not inherently superior or quicker than blocking ones; they serve different purposes with their own design philosophies. Additionally, the mere use of atomic operations does not render algorithms lock-free. -For example, our basic spinlock discussed in \secref{spinlock} is still considered a blocking algorithm even though it eschews \textsc{OS}-specific syscalls for making the blocked thread sleep. +For example, basic spinlock is still considered a blocking algorithm even though it eschews \textsc{OS}-specific syscalls for making the blocked thread sleep. Putting a blocked thread to sleep is often an optimization, allowing the operating system's scheduler to allocate \textsc{CPU} resources to active threads until the blocked one is revived. Some concurrency libraries even introduce hybrid locks that combine brief spinning with sleeping to balance \textsc{CPU} usage and context-switching overheads. @@ -862,51 +945,43 @@ \subsection{Relaxed} Relaxed atomic operations are useful for variables shared between threads where \emph{no specific order} of operations is needed. Although it may seem like a niche requirement, such scenarios are quite common. -Refer back to our discussions on \secref{atomicity} and \secref{rmw} operations, -where a worker thread increments a counter that a \textsc{UI} thread then reads. -In this case, the counter can be incremented using \cc|fetch_add(1, memory_order_relaxed)| because the only requirement is atomicity; -the counter itself does not coordinate synchronization. - -Relaxed operations are also beneficial for managing flags shared between threads. -For example, a thread might continuously run until it receives a signal to exit: +Relaxed operations are beneficial for managing flags shared between threads. +For example, a worker thread in thread pool in \secref{rmw} might continuously run until it receives a cancelled signal: \begin{cppcode} -atomic_bool stop(false); - -void worker() -{ - while (!stop.load(memory_order_relaxed)) { - // Do good work. +while (1) { + if (atomic_load_explicit(&thrd_pool->state, memory_order_relaxed) == cancelled) + return EXIT_SUCCESS; + if (atomic_load_explicit(&thrd_pool->state, memory_order_relaxed) == running) { + // claim the job + job_t *job = atomic_load(&thrd_pool->head->prev); + while (!atomic_compare_exchange_weak_explicit(&thrd_pool->head->prev, &job, + job->prev, memory_order_release, + memory_order_relaxed)) { + } + if (job->args == NULL) { + atomic_store(&thrd_pool->state, idle); + } else { + void *ret_value = job->func(job->args); + job->future->result = ret_value; + atomic_flag_clear(&job->future->flag); + free(job->args); + free(job); // could cause dangling pointer in other threads + } + } else { + /* To auto run when jobs added, set status to running if job queue is not empty. + * As long as the producer is protected */ + thrd_yield(); + continue; } -} - -int main() -{ - launchWorker(); - // Wait some... - stop = true; // seq_cst - joinWorker(); -} +}; \end{cppcode} We do not care if the contents of the loop are rearranged around the load. -Nothing bad will happen so long as \texttt{stop} is only used to tell the worker to exit, and not to ``announce'' any new data. +Nothing bad will happen so long as \texttt{cancelled} is only used to tell the worker to exit, and not to ``announce'' any new data. Finally, relaxed loads are commonly used with \textsc{CAS} loops. -Return to our lock-free multiply: -\begin{cppcode} -void atomicMultiply(int by) -{ - int expected = foo.load(memory_order_relaxed); - - while (!foo.compare_exchange_weak( - expected, expected * by, - memory_order_release, - memory_order_relaxed)) { - /* empty loop */ - } -} -\end{cppcode} +Continue the example above, +a \textsc{CAS} loop is performed to claim a job. All of the loads can be relaxed as we do not need to enforce any order until we have successfully modified our value. -The initial load of \texttt{expected} is not strictly necessary but can help avoid an extra loop iteration if \texttt{foo} remains unmodified by other threads before the \textsc{CAS} operation. \subsection{Acquire-Release} @@ -1217,7 +1292,7 @@ \section{Additional Resources} \textit{\cpp|atomic<> Weapons|: The \cplusplus{11} Memory Model and Modern Hardware}} by Herb Sutter, a three-hour talk that provides a deeper dive. -Also the source of figures \ref{ideal-machine} and \ref{dunnington}. +Also the source of figures \ref{fig:ideal-machine} and \ref{fig:dunnington}. \href{https://www.akkadia.org/drepper/futex.pdf}{\textit{Futexes are Tricky}}, a paper by Ulrich Drepper on how mutexes and other synchronization primitives can be built in Linux using atomic operations and syscalls. diff --git a/examples/.clang-format b/examples/.clang-format new file mode 100644 index 0000000..e1e184f --- /dev/null +++ b/examples/.clang-format @@ -0,0 +1,98 @@ +Language: Cpp + +AccessModifierOffset: -4 +AlignAfterOpenBracket: Align +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignOperands: true +AlignTrailingComments: false +AllowAllParametersOfDeclarationOnNextLine: false +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: None +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +AlwaysBreakTemplateDeclarations: false +BinPackArguments: true +BinPackParameters: true + +BraceWrapping: + AfterClass: false + AfterControlStatement: false + AfterEnum: false + AfterFunction: true + AfterNamespace: true + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + AfterExternBlock: false + BeforeCatch: false + BeforeElse: false + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true + +BreakBeforeBinaryOperators: None +BreakBeforeBraces: Custom +BreakBeforeInheritanceComma: false +BreakBeforeTernaryOperators: false +BreakConstructorInitializersBeforeComma: false +BreakConstructorInitializers: BeforeComma +BreakAfterJavaFieldAnnotations: false +BreakStringLiterals: false +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerAllOnOneLineOrOnePerLine: false +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: false +DerivePointerAlignment: false +DisableFormat: false +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: false + +ForEachMacros: + - 'list_for_each' + - 'list_for_each_safe' + +IncludeBlocks: Preserve +IncludeCategories: + - Regex: '.*' + Priority: 1 +IncludeIsMainRegex: '(Test)?$' +IndentCaseLabels: false +IndentPPDirectives: None +IndentWidth: 4 +IndentWrappedFunctionNames: false +KeepEmptyLinesAtTheStartOfBlocks: false +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None + +PointerAlignment: Right +ReflowComments: false +SortIncludes: false +SortUsingDeclarations: false +SpaceAfterCStyleCast: false +SpaceAfterTemplateKeyword: true +SpaceBeforeAssignmentOperators: true +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: true +SpaceBeforeParens: ControlStatements +SpaceBeforeRangeBasedForLoopColon: true +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: false +SpacesInContainerLiterals: false +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +Standard: Cpp03 +TabWidth: 4 +UseTab: Never \ No newline at end of file diff --git a/examples/Makefile b/examples/Makefile new file mode 100644 index 0000000..f5a58e6 --- /dev/null +++ b/examples/Makefile @@ -0,0 +1,6 @@ +all: + $(CC) -Wall -o rmw_example rmw_example.c -pthread -lm +clean: + rm -f rmw_example +check: all + ./rmw_example \ No newline at end of file diff --git a/examples/rmw_example.c b/examples/rmw_example.c new file mode 100644 index 0000000..991b7c5 --- /dev/null +++ b/examples/rmw_example.c @@ -0,0 +1,240 @@ +#include +#include +#include +#include +#include +#include +#include + +#define PRECISION 100 /* upper bound in BPP sum */ +#define CACHE_LINE_SIZE 64 +#define N_THREADS 64 + +struct tpool_future { + void *result; + void *arg; + atomic_flag flag; +}; + +typedef struct job { + void *(*func)(void *); + struct tpool_future *future; + struct job *next, *prev; +} job_t; + +typedef struct idle_job { + _Atomic(job_t *) prev; + char padding[CACHE_LINE_SIZE - + sizeof(_Atomic(job_t *))]; /* avoid false sharing */ + job_t job; +} idle_job_t; + +enum state { idle, running, cancelled }; + +typedef struct tpool { + atomic_flag initialezed; + int size; + thrd_t *pool; + atomic_int state; + thrd_start_t func; + idle_job_t *head; /* job queue is a SPMC ring buffer */ +} tpool_t; + +static struct tpool_future *tpool_future_create(void *arg) +{ + struct tpool_future *future = malloc(sizeof(struct tpool_future)); + if (future) { + future->result = NULL; + future->arg = arg; + atomic_flag_clear(&future->flag); + atomic_flag_test_and_set(&future->flag); + } + return future; +} + +void tpool_future_wait(struct tpool_future *future) +{ + while (atomic_flag_test_and_set(&future->flag)) + ; +} + +void tpool_future_destroy(struct tpool_future *future) +{ + free(future->result); + free(future); +} + +static int worker(void *args) +{ + if (!args) + return EXIT_FAILURE; + tpool_t *thrd_pool = (tpool_t *)args; + + while (1) { + /* worker is laid off */ + if (atomic_load(&thrd_pool->state) == cancelled) + return EXIT_SUCCESS; + if (atomic_load(&thrd_pool->state) == running) { + /* worker takes the job */ + job_t *job = atomic_load(&thrd_pool->head->prev); + /* worker checks if there is only an idle job in the job queue */ + if (job == &thrd_pool->head->job) { + /* worker says it is idle */ + atomic_store(&thrd_pool->state, idle); + thrd_yield(); + continue; + } + while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job, + job->prev)) + ; + job->future->result = (void *)job->func(job->future->arg); + atomic_flag_clear(&job->future->flag); + free(job); + } else { + /* worker is idle */ + thrd_yield(); + } + }; + return EXIT_SUCCESS; +} + +static bool tpool_init(tpool_t *thrd_pool, size_t size) +{ + if (atomic_flag_test_and_set(&thrd_pool->initialezed)) { + printf("This thread pool has already been initialized.\n"); + return false; + } + + assert(size > 0); + thrd_pool->pool = malloc(sizeof(thrd_t) * size); + if (!thrd_pool->pool) { + printf("Failed to allocate thread identifiers.\n"); + return false; + } + + idle_job_t *idle_job = malloc(sizeof(idle_job_t)); + if (!idle_job) { + printf("Failed to allocate idle job.\n"); + return false; + } + + /* idle_job will always be the first job */ + idle_job->job.next = &idle_job->job; + idle_job->job.prev = &idle_job->job; + idle_job->prev = &idle_job->job; + thrd_pool->func = worker; + thrd_pool->head = idle_job; + thrd_pool->state = idle; + thrd_pool->size = size; + + /* employer hires many workers */ + for (size_t i = 0; i < size; i++) + thrd_create(thrd_pool->pool + i, worker, thrd_pool); + + return true; +} + +static void tpool_destroy(tpool_t *thrd_pool) +{ + if (atomic_exchange(&thrd_pool->state, cancelled)) + printf("Thread pool cancelled with jobs still running.\n"); + + for (int i = 0; i < thrd_pool->size; i++) + thrd_join(thrd_pool->pool[i], NULL); + + while (thrd_pool->head->prev != &thrd_pool->head->job) { + job_t *job = thrd_pool->head->prev->prev; + free(thrd_pool->head->prev); + thrd_pool->head->prev = job; + } + free(thrd_pool->head); + free(thrd_pool->pool); + atomic_fetch_and(&thrd_pool->state, 0); + atomic_flag_clear(&thrd_pool->initialezed); +} + +/* Use Bailey–Borwein–Plouffe formula to approximate PI */ +static void *bbp(void *arg) +{ + int k = *(int *)arg; + double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) - + (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6)); + double *product = malloc(sizeof(double)); + if (!product) + return NULL; + + *product = 1 / pow(16, k) * sum; + return (void *)product; +} + +struct tpool_future *add_job(tpool_t *thrd_pool, void *(*func)(void *), + void *arg) +{ + job_t *job = malloc(sizeof(job_t)); + if (!job) + return NULL; + + struct tpool_future *future = tpool_future_create(arg); + if (!future) { + free(job); + return NULL; + } + + job->func = func; + job->future = future; + job->next = thrd_pool->head->job.next; + job->prev = &thrd_pool->head->job; + thrd_pool->head->job.next->prev = job; + thrd_pool->head->job.next = job; + if (thrd_pool->head->prev == &thrd_pool->head->job) { + thrd_pool->head->prev = job; + /* the previous job of the idle job is itself */ + thrd_pool->head->job.prev = &thrd_pool->head->job; + } + return future; +} + +static inline void wait_until(tpool_t *thrd_pool, int state) +{ + while (atomic_load(&thrd_pool->state) != state) + thrd_yield(); +} + +int main() +{ + int bbp_args[PRECISION]; + struct tpool_future *futures[PRECISION]; + double bbp_sum = 0; + + tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT }; + if (!tpool_init(&thrd_pool, N_THREADS)) { + printf("failed to init.\n"); + return 0; + } + /* employer ask workers to work */ + atomic_store(&thrd_pool.state, running); + + /* employer wait ... until workers are idle */ + wait_until(&thrd_pool, idle); + + /* employer add more job to the job queue */ + for (int i = 0; i < PRECISION; i++) { + bbp_args[i] = i; + futures[i] = add_job(&thrd_pool, bbp, &bbp_args[i]); + } + + /* employer ask workers to work */ + atomic_store(&thrd_pool.state, running); + + /* employer wait for the result of job */ + for (int i = 0; i < PRECISION; i++) { + tpool_future_wait(futures[i]); + bbp_sum += *(double *)(futures[i]->result); + tpool_future_destroy(futures[i]); + } + + /* employer destroys the job queue and lays workers off */ + tpool_destroy(&thrd_pool); + printf("PI calculated with %d terms: %.15f\n", PRECISION, bbp_sum); + return 0; +} diff --git a/images/atomic-rmw.pdf b/images/atomic-rmw.pdf new file mode 100644 index 0000000..9f105d0 Binary files /dev/null and b/images/atomic-rmw.pdf differ diff --git a/images/atomic-types.pdf b/images/atomic-types.pdf new file mode 100644 index 0000000..9c39ece Binary files /dev/null and b/images/atomic-types.pdf differ