Skip to content

Add new example for section 5 #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
201a044
Add new example for chapter 5
idoleat Apr 17, 2024
6abc1ea
Refactor worker thread conditional code path
idoleat Apr 17, 2024
6949475
Combine identifier declarations in one line
idoleat Apr 17, 2024
b1b3431
Add assert to avoid the case size equals zero
idoleat Apr 17, 2024
fabf742
Shorten else if as an if
idoleat Apr 17, 2024
aefdd16
Replace long expressions with macros
idoleat Apr 18, 2024
47702ca
Fix wrong behavior of job queue
idoleat Apr 21, 2024
347b752
Add use case of atomic exhange, TAS, fetch AND
idoleat Apr 21, 2024
928c1ec
Define job count and thread count as constant
idoleat Apr 23, 2024
4c9a908
Revise section 5 with new example
idoleat May 3, 2024
c88b4dc
Rename ch5_example.c to rmw_example.c
idoleat May 3, 2024
1ad39bb
Remove unneeded comment
idoleat May 3, 2024
619b05d
Add race conditoin example
idoleat May 6, 2024
3ea014e
Refine the statement on atomicity
idoleat May 29, 2024
834a856
Update descriptions referencing to section 5
idoleat May 30, 2024
8d7e2c2
Refine rmw example and remove sleep()
idoleat Jun 16, 2024
138421e
Add BBP formula to approximate PI
weihsinyeh Jun 22, 2024
267f1ad
Worker do the job to execute BBP
weihsinyeh Jun 23, 2024
389875d
Change the implementation of mutex to test and set
weihsinyeh Jun 23, 2024
d9dd152
Check if job is finished by the result of future
weihsinyeh Jun 23, 2024
5a060d3
Avoid the memory leak in the add_job function
weihsinyeh Jun 24, 2024
597a90d
Simplify the operation of test and set
weihsinyeh Jun 25, 2024
07e0f1c
Avoid memory leak in the bbp function
weihsinyeh Jun 25, 2024
a997d4e
Integrate RMW with concepts from previous sections
weihsinyeh Jun 26, 2024
dcb8f9a
Remove the thread santizer usage and diff file
idoleat Jun 26, 2024
a6c68a9
Restore previously removed spinlock example
idoleat Jun 26, 2024
4dc5699
Supplement statements with references to C11 and LLVM docs
idoleat Jun 27, 2024
11baa72
Add atomic instruction and simplify rmw_example.c
weihsinyeh Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 196 additions & 121 deletions concurrency-primer.tex

Large diffs are not rendered by default.

98 changes: 98 additions & 0 deletions examples/.clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
Language: Cpp

AccessModifierOffset: -4
AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlignOperands: true
AlignTrailingComments: false
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: None
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: false
AlwaysBreakTemplateDeclarations: false
BinPackArguments: true
BinPackParameters: true

BraceWrapping:
AfterClass: false
AfterControlStatement: false
AfterEnum: false
AfterFunction: true
AfterNamespace: true
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
AfterExternBlock: false
BeforeCatch: false
BeforeElse: false
IndentBraces: false
SplitEmptyFunction: true
SplitEmptyRecord: true
SplitEmptyNamespace: true

BreakBeforeBinaryOperators: None
BreakBeforeBraces: Custom
BreakBeforeInheritanceComma: false
BreakBeforeTernaryOperators: false
BreakConstructorInitializersBeforeComma: false
BreakConstructorInitializers: BeforeComma
BreakAfterJavaFieldAnnotations: false
BreakStringLiterals: false
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerAllOnOneLineOrOnePerLine: false
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: false
DerivePointerAlignment: false
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
FixNamespaceComments: false

ForEachMacros:
- 'list_for_each'
- 'list_for_each_safe'

IncludeBlocks: Preserve
IncludeCategories:
- Regex: '.*'
Priority: 1
IncludeIsMainRegex: '(Test)?$'
IndentCaseLabels: false
IndentPPDirectives: None
IndentWidth: 4
IndentWrappedFunctionNames: false
KeepEmptyLinesAtTheStartOfBlocks: false
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None

PointerAlignment: Right
ReflowComments: false
SortIncludes: false
SortUsingDeclarations: false
SpaceAfterCStyleCast: false
SpaceAfterTemplateKeyword: true
SpaceBeforeAssignmentOperators: true
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeParens: ControlStatements
SpaceBeforeRangeBasedForLoopColon: true
SpaceInEmptyParentheses: false
SpacesBeforeTrailingComments: 1
SpacesInAngles: false
SpacesInContainerLiterals: false
SpacesInCStyleCastParentheses: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
Standard: Cpp03
TabWidth: 4
UseTab: Never
6 changes: 6 additions & 0 deletions examples/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
all:
$(CC) -Wall -o rmw_example rmw_example.c -pthread -lm
clean:
rm -f rmw_example
check: all
./rmw_example
240 changes: 240 additions & 0 deletions examples/rmw_example.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
#include <stdio.h>
#include <stdatomic.h>
#include <threads.h>
#include <stdlib.h>
#include <stdbool.h>
#include <assert.h>
#include <math.h>

#define PRECISION 100 /* upper bound in BPP sum */
#define CACHE_LINE_SIZE 64
#define N_THREADS 64

struct tpool_future {
void *result;
void *arg;
atomic_flag flag;
};

typedef struct job {
void *(*func)(void *);
struct tpool_future *future;
struct job *next, *prev;
} job_t;

typedef struct idle_job {
_Atomic(job_t *) prev;
char padding[CACHE_LINE_SIZE -
sizeof(_Atomic(job_t *))]; /* avoid false sharing */
job_t job;
} idle_job_t;

enum state { idle, running, cancelled };

typedef struct tpool {
atomic_flag initialezed;
int size;
thrd_t *pool;
atomic_int state;
thrd_start_t func;
idle_job_t *head; /* job queue is a SPMC ring buffer */
} tpool_t;

static struct tpool_future *tpool_future_create(void *arg)
{
struct tpool_future *future = malloc(sizeof(struct tpool_future));
if (future) {
future->result = NULL;
future->arg = arg;
atomic_flag_clear(&future->flag);
atomic_flag_test_and_set(&future->flag);
}
return future;
}

void tpool_future_wait(struct tpool_future *future)
{
while (atomic_flag_test_and_set(&future->flag))
;
}

void tpool_future_destroy(struct tpool_future *future)
{
free(future->result);
free(future);
}

static int worker(void *args)
{
if (!args)
return EXIT_FAILURE;
tpool_t *thrd_pool = (tpool_t *)args;

while (1) {
/* worker is laid off */
if (atomic_load(&thrd_pool->state) == cancelled)
return EXIT_SUCCESS;
if (atomic_load(&thrd_pool->state) == running) {
/* worker takes the job */
job_t *job = atomic_load(&thrd_pool->head->prev);
/* worker checks if there is only an idle job in the job queue */
if (job == &thrd_pool->head->job) {
/* worker says it is idle */
atomic_store(&thrd_pool->state, idle);
thrd_yield();
continue;
}
while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job,
job->prev))
;
job->future->result = (void *)job->func(job->future->arg);
atomic_flag_clear(&job->future->flag);
free(job);
} else {
/* worker is idle */
thrd_yield();
}
};
return EXIT_SUCCESS;
}

static bool tpool_init(tpool_t *thrd_pool, size_t size)
{
if (atomic_flag_test_and_set(&thrd_pool->initialezed)) {
printf("This thread pool has already been initialized.\n");
return false;
}

assert(size > 0);
thrd_pool->pool = malloc(sizeof(thrd_t) * size);
if (!thrd_pool->pool) {
printf("Failed to allocate thread identifiers.\n");
return false;
}

idle_job_t *idle_job = malloc(sizeof(idle_job_t));
if (!idle_job) {
printf("Failed to allocate idle job.\n");
return false;
}

/* idle_job will always be the first job */
idle_job->job.next = &idle_job->job;
idle_job->job.prev = &idle_job->job;
idle_job->prev = &idle_job->job;
thrd_pool->func = worker;
thrd_pool->head = idle_job;
thrd_pool->state = idle;
thrd_pool->size = size;

/* employer hires many workers */
for (size_t i = 0; i < size; i++)
thrd_create(thrd_pool->pool + i, worker, thrd_pool);

return true;
}

static void tpool_destroy(tpool_t *thrd_pool)
{
if (atomic_exchange(&thrd_pool->state, cancelled))
printf("Thread pool cancelled with jobs still running.\n");

for (int i = 0; i < thrd_pool->size; i++)
thrd_join(thrd_pool->pool[i], NULL);

while (thrd_pool->head->prev != &thrd_pool->head->job) {
job_t *job = thrd_pool->head->prev->prev;
free(thrd_pool->head->prev);
thrd_pool->head->prev = job;
}
free(thrd_pool->head);
free(thrd_pool->pool);
atomic_fetch_and(&thrd_pool->state, 0);
atomic_flag_clear(&thrd_pool->initialezed);
}

/* Use Bailey–Borwein–Plouffe formula to approximate PI */
static void *bbp(void *arg)
{
int k = *(int *)arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (!product)
return NULL;

*product = 1 / pow(16, k) * sum;
return (void *)product;
}

struct tpool_future *add_job(tpool_t *thrd_pool, void *(*func)(void *),
void *arg)
{
job_t *job = malloc(sizeof(job_t));
if (!job)
return NULL;

struct tpool_future *future = tpool_future_create(arg);
if (!future) {
free(job);
return NULL;
}

job->func = func;
job->future = future;
job->next = thrd_pool->head->job.next;
job->prev = &thrd_pool->head->job;
thrd_pool->head->job.next->prev = job;
thrd_pool->head->job.next = job;
if (thrd_pool->head->prev == &thrd_pool->head->job) {
thrd_pool->head->prev = job;
/* the previous job of the idle job is itself */
thrd_pool->head->job.prev = &thrd_pool->head->job;
}
return future;
}

static inline void wait_until(tpool_t *thrd_pool, int state)
{
while (atomic_load(&thrd_pool->state) != state)
thrd_yield();
}

int main()
{
int bbp_args[PRECISION];
struct tpool_future *futures[PRECISION];
double bbp_sum = 0;

tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
if (!tpool_init(&thrd_pool, N_THREADS)) {
printf("failed to init.\n");
return 0;
}
/* employer ask workers to work */
atomic_store(&thrd_pool.state, running);

/* employer wait ... until workers are idle */
wait_until(&thrd_pool, idle);

/* employer add more job to the job queue */
for (int i = 0; i < PRECISION; i++) {
bbp_args[i] = i;
futures[i] = add_job(&thrd_pool, bbp, &bbp_args[i]);
}

/* employer ask workers to work */
atomic_store(&thrd_pool.state, running);

/* employer wait for the result of job */
for (int i = 0; i < PRECISION; i++) {
tpool_future_wait(futures[i]);
bbp_sum += *(double *)(futures[i]->result);
tpool_future_destroy(futures[i]);
}

/* employer destroys the job queue and lays workers off */
tpool_destroy(&thrd_pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION, bbp_sum);
return 0;
}
Binary file added images/atomic-rmw.pdf
Binary file not shown.
Binary file added images/atomic-types.pdf
Binary file not shown.