Skip to content

[WIP] Add support for recursive locks and try_lock #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/qt_addrstat.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ static QINLINE qthread_addrstat_t *qthread_addrstat_new(void)
ret->FEQ = NULL;
ret->FFQ = NULL;
ret->FFWQ = NULL;
ret->acq_owner_stat.state = FEB_is_not_recursive_lock;
ret->acq_owner_stat.recursive_access_counter = 0;
QTHREAD_EMPTY_TIMER_INIT(ret);
}
return ret;
Expand Down
10 changes: 10 additions & 0 deletions include/qt_blocking_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include "qt_profiling.h"
#include "qt_debug.h"

#define FEB_is_recursive_lock (-1)
#define FEB_is_not_recursive_lock (-2)

typedef enum blocking_syscalls {
ACCEPT,
CONNECT,
Expand Down Expand Up @@ -46,6 +49,11 @@ typedef struct _qt_blocking_queue_node_s {
int err;
} qt_blocking_queue_node_t;

typedef struct qthread_acquire_owner_stat_s {
int64_t state;
int64_t recursive_access_counter;
} qthread_acquire_owner_stat_t;

typedef struct qthread_addrstat_s {
QTHREAD_FASTLOCK_TYPE lock;
qthread_addrres_t *EFQ;
Expand All @@ -57,6 +65,8 @@ typedef struct qthread_addrstat_s {
#endif
uint_fast8_t full;
uint_fast8_t valid;
qthread_acquire_owner_stat_t acq_owner_stat;

} qthread_addrstat_t;

#ifdef UNPOOLED
Expand Down
3 changes: 3 additions & 0 deletions include/qt_feb.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ typedef filter_code (*qt_feb_taskfilter_f)(qt_key_t addr,
qthread_t *restrict waiter,
void *restrict arg);

int INTERNAL qthread_feb_adr_init(const aligned_t *dest, const bool is_from_recursive_lock);
int INTERNAL qthread_feb_adr_remove(aligned_t *dest);

void INTERNAL qt_feb_subsystem_init(uint_fast8_t);

int API_FUNC qthread_writeEF_nb(aligned_t *restrict const dest,
Expand Down
8 changes: 7 additions & 1 deletion include/qthread/qthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "common.h" /* important configuration options */

#include <string.h> /* for memcpy() */
#include <stdbool.h> /* for bool */

#ifndef QTHREAD_NOALIGNCHECK
# include <stdio.h> /* for fprintf() */
Expand Down Expand Up @@ -124,7 +125,6 @@ typedef struct _syncvar_s {
typedef unsigned short qthread_shepherd_id_t;
typedef unsigned short qthread_worker_id_t;


/* for convenient arguments to qthread_fork */
typedef aligned_t (*qthread_f)(void *arg);
typedef void (*qthread_agg_f)(int count, qthread_f *f, void **arg, void **ret, uint16_t flags);
Expand Down Expand Up @@ -434,6 +434,7 @@ int qthread_syncvar_status(syncvar_t *const v);
int qthread_empty(const aligned_t *dest);
int qthread_syncvar_empty(syncvar_t *restrict dest);
int qthread_fill(const aligned_t *dest);
int qthread_fill_recursive(const aligned_t *dest);
int qthread_syncvar_fill(syncvar_t *restrict dest);

/* These functions wait for memory to become empty, and then fill it. When
Expand Down Expand Up @@ -553,6 +554,11 @@ int qthread_readXX(aligned_t *dest,
*/
int qthread_lock(const aligned_t *a);
int qthread_unlock(const aligned_t *a);
int qthread_lock_init(const aligned_t *a, const bool is_recursive);
int qthread_lock_destroy(aligned_t *a);

const int qthread_trylock(const aligned_t *a);


#if defined(QTHREAD_MUTEX_INCREMENT) || \
(QTHREAD_ASSEMBLY_ARCH == QTHREAD_POWERPC32) || \
Expand Down
144 changes: 140 additions & 4 deletions src/feb.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ static QINLINE void qthread_gotlock_empty_inner(qthread_shepherd_t *shep,
FREE_ADDRRES(X);
qthread_gotlock_fill_inner(shep, m, maddr, 1, precond_tasks);
}
if ((m->full == 1) && (m->EFQ == NULL) && (m->FEQ == NULL) && (m->FFQ == NULL) && (m->FFWQ == NULL)) {
if ((m->full == 1) && (m->EFQ == NULL) && (m->FEQ == NULL) && (m->FFQ == NULL) && (m->FFWQ == NULL)
&& (m->acq_owner_stat.state <FEB_is_recursive_lock )) {
removeable = 1;
} else {
removeable = 0;
Expand Down Expand Up @@ -551,7 +552,9 @@ static QINLINE void qthread_gotlock_fill_inner(qthread_shepherd_t *shep,
}
if (recursive == 0) {
int removeable;
if ((m->EFQ == NULL) && (m->FEQ == NULL) && (m->full == 1)) {
if ((m->EFQ == NULL) && (m->FEQ == NULL) && (m->full == 1)
&&
(m->acq_owner_stat.state < FEB_is_recursive_lock)) {
qthread_debug(FEB_DETAILS, "m(%p), addr(%p), recursive(%u): addrstat removeable!\n", m, maddr, recursive);
removeable = 1;
} else {
Expand Down Expand Up @@ -665,6 +668,82 @@ int API_FUNC qthread_empty(const aligned_t *dest)
return QTHREAD_SUCCESS;
} /*}}} */

int INTERNAL qthread_feb_adr_init(const aligned_t *dest, const bool is_from_recursive_lock)
{ /*{{{ */
const aligned_t *alignedaddr;

if (qlib == NULL) {
return QTHREAD_SUCCESS;
}
qthread_addrstat_t *m;
const int lockbin = QTHREAD_CHOOSE_STRIPE2(dest);
qthread_shepherd_t *shep = qthread_internal_getshep();

assert(qthread_library_initialized);

if (!shep) {
printf("\nXXX ERROR XXX ERROR XXX ERROR XXX ERROR\n\n");
exit(0);//FIXME
return qthread_feb_blocker_func((void *)dest, NULL, FILL/*!!!!!!!XXX!!!*/);
}

qthread_debug(FEB_CALLS, "dest=%p (tid=%i)\n", dest, qthread_id());
QALIGN(dest, alignedaddr);
/* lock hash */
QTHREAD_COUNT_THREADS_BINCOUNTER(febs, lockbin);
#ifdef LOCK_FREE_FEBS
do {
m = qt_hash_get(FEBs[lockbin], (void *)alignedaddr);
if (m) {
return QTHREAD_NOT_ALLOWED;
} else {
if(is_from_recursive_lock) {
m = qthread_addrstat_new();
if (!m) { return QTHREAD_MALLOC_ERROR; }
m->full = 1;
m->acq_owner_stat.state = FEB_is_recursive_lock;
MACHINE_FENCE;

if (!qt_hash_put(FEBs[lockbin], (void *)alignedaddr, m)) {
qthread_addrstat_delete(m);
continue;
}
}
/* already full */
break;
}
} while (1);
#else /* ifdef LOCK_FREE_FEBS */
qt_hash_lock(FEBs[lockbin]);
{ /* BEGIN CRITICAL SECTION */
m = (qthread_addrstat_t *)qt_hash_get_locked(FEBs[lockbin], (void *)alignedaddr);
if (m) {
return QTHREAD_NOT_ALLOWED;
} else {
if(is_from_recursive_lock) {
m = qthread_addrstat_new();
if (!m) { return QTHREAD_MALLOC_ERROR; }
m->full = 1;
m->acq_owner_stat.state = FEB_is_recursive_lock;
MACHINE_FENCE;

if (!qt_hash_put_locked(FEBs[lockbin], (void *)alignedaddr, m)){
qthread_addrstat_delete(m);
return QTHREAD_MALLOC_ERROR;
}
}
}
} /* END CRITICAL SECTION */
qt_hash_unlock(FEBs[lockbin]); /* unlock hash */
#endif /* ifdef LOCK_FREE_FEBS */
return QTHREAD_SUCCESS;
} /*}}} */

int INTERNAL qthread_feb_adr_remove(aligned_t *dest)
{ /*{{{ */
qthread_FEB_remove(dest);
} /*}}} */

int API_FUNC qthread_fill(const aligned_t *dest)
{ /*{{{ */
const aligned_t *alignedaddr;
Expand Down Expand Up @@ -713,8 +792,20 @@ int API_FUNC qthread_fill(const aligned_t *dest)
qt_hash_unlock(FEBs[lockbin]); /* unlock hash */
#endif /* ifdef LOCK_FREE_FEBS */
if (m) {
if(m->acq_owner_stat.state >= FEB_is_recursive_lock) {
//assert(m->acq_owner_stat.recursive_access_counter <=-1);
--m->acq_owner_stat.recursive_access_counter;
if (m->acq_owner_stat.recursive_access_counter > 0){
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): released recursive lock (inner:%i)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),m->acq_owner_stat.recursive_access_counter );
QTHREAD_FASTLOCK_UNLOCK(&m->lock);
return QTHREAD_SUCCESS;
}
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): released recursive lock (outer)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER));
m->acq_owner_stat.state = FEB_is_recursive_lock; //reset owner, maintain lock type
}
/* if dest wasn't in the hash, it was already full. Since it was,
* we need to fill it. */

qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i): filling, maybe alerting waiters\n", dest, qthread_id());
qthread_gotlock_fill(shep, m, (void *)alignedaddr);
}
Expand Down Expand Up @@ -785,6 +876,7 @@ int API_FUNC qthread_writeF(aligned_t *restrict dest,
return QTHREAD_SUCCESS;
} /*}}} */


int API_FUNC qthread_writeF_const(aligned_t *dest,
aligned_t src)
{ /*{{{ */
Expand Down Expand Up @@ -1357,6 +1449,7 @@ int API_FUNC qthread_readFF_nb(aligned_t *restrict dest,
return QTHREAD_SUCCESS;
} /*}}} */


/* the way this works is that:
* 1 - src's FEB state must be "full"
* 2 - data is copied from src to destination
Expand All @@ -1379,6 +1472,7 @@ int API_FUNC qthread_readFE(aligned_t *restrict dest,
if (!me) {
return qthread_feb_blocker_func(dest, (void *)src, READFE);
}

assert(me->rdata);
qthread_debug(FEB_CALLS, "dest=%p, src=%p (tid=%i)\n", dest, src, me->thread_id);
QTHREAD_FEB_UNIQUERECORD(feb, src, me);
Expand Down Expand Up @@ -1436,7 +1530,19 @@ int API_FUNC qthread_readFE(aligned_t *restrict dest,
assert(m);
qthread_debug(FEB_DETAILS, "data structure locked\n");
/* by this point m is locked */
if (m->full == 0) { /* empty, thus, we must block */
if (m->full == 0) { /* empty, thus, we must block */
/* unless read_FE was taken by recursive qthread_lock() */
/* on same thread */
if(m->acq_owner_stat.state >= FEB_is_recursive_lock) {
if (m->acq_owner_stat.state == qthread_readstate(CURRENT_UNIQUE_WORKER)) {
++m->acq_owner_stat.recursive_access_counter;
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (inner:%ld)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),(long long)m->acq_owner_stat.recursive_access_counter );
QTHREAD_FASTLOCK_UNLOCK(&m->lock);
QTHREAD_FEB_TIMER_STOP(febblock, me);
return QTHREAD_SUCCESS;
}
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): attempt to acquire recursive lock from pid=%ld failed, back to master\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),(long long)m->acq_owner_stat.state);
}
QTHREAD_WAIT_TIMER_DECLARATION;
qthread_addrres_t *X = ALLOC_ADDRRES();

Expand All @@ -1461,17 +1567,28 @@ int API_FUNC qthread_readFE(aligned_t *restrict dest,
#endif /* QTHREAD_USE_EUREKAS */
qthread_debug(FEB_BEHAVIOR, "tid %u succeeded on %p=%p after waiting\n", me->thread_id, dest, src);
} else { /* full, thus IT IS OURS! MUAHAHAHA! */


if (dest && (dest != src)) {
*(aligned_t *)dest = *(aligned_t *)src;
MACHINE_FENCE;
}

if(m->acq_owner_stat.state >= FEB_is_recursive_lock) {
m->acq_owner_stat.state = qthread_readstate(CURRENT_UNIQUE_WORKER); /* Set owner */
++m->acq_owner_stat.recursive_access_counter;
MACHINE_FENCE;
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (outer)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER));
}

qthread_debug(FEB_BEHAVIOR, "tid %u succeeded on %p=%p\n", me->thread_id, dest, src);
qthread_gotlock_empty(me->rdata->shepherd_ptr, m, (void *)alignedaddr);
}
QTHREAD_FEB_TIMER_STOP(febblock, me);
return QTHREAD_SUCCESS;
} /*}}} */


/* the way this works is that:
* 1 - src's FEB state is ignored
* 2 - data is copied from src to destination
Expand Down Expand Up @@ -1551,7 +1668,18 @@ int API_FUNC qthread_readFE_nb(aligned_t *restrict dest,
# endif /* ifdef LOCK_FREE_FEBS */
qthread_debug(FEB_DETAILS, "data structure locked\n");
/* by this point m is locked */
if (m->full == 0) { /* empty, thus, we must fail */
if (m->full == 0) { /* empty, thus, we must block */
/* unless read_FE was taken by qthread_lock() */
/* on same thread */
if(m->acq_owner_stat.state >= FEB_is_recursive_lock) {
if (m->acq_owner_stat.state == qthread_readstate(CURRENT_UNIQUE_WORKER)) {
++m->acq_owner_stat.recursive_access_counter;
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (inner:%i)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),m->acq_owner_stat.recursive_access_counter );
QTHREAD_FASTLOCK_UNLOCK(&m->lock);
return QTHREAD_SUCCESS;
}
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): attempt to acquire inner recursive lock from pid=%i failed thus blocking\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),m->acq_owner_stat.state);
}
qthread_debug(FEB_BEHAVIOR, "tid %u non-blocking fail\n", me->thread_id);
QTHREAD_FASTLOCK_UNLOCK(&m->lock);
return QTHREAD_OPFAIL;
Expand All @@ -1560,6 +1688,14 @@ int API_FUNC qthread_readFE_nb(aligned_t *restrict dest,
*(aligned_t *)dest = *(aligned_t *)src;
MACHINE_FENCE;
}

if(m->acq_owner_stat.state >= FEB_is_recursive_lock) {
m->acq_owner_stat.state = qthread_readstate(CURRENT_UNIQUE_WORKER); /* Set owner */
++m->acq_owner_stat.recursive_access_counter;
MACHINE_FENCE;
qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (outer)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER));
}

qthread_debug(FEB_BEHAVIOR, "tid %u succeeded on %p=%p\n", me->thread_id, dest, src);
qthread_gotlock_empty(me->rdata->shepherd_ptr, m, (void *)alignedaddr);
}
Expand Down
16 changes: 16 additions & 0 deletions src/locks.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,33 @@

/* The API */
#include "qthread/qthread.h"
#include "qt_feb.h"

/* Internal Headers */
#include "qt_visibility.h"

/* functions to implement FEB-ish locking/unlocking*/

int API_FUNC qthread_lock_init(const aligned_t *a, const bool is_recursive)
{ /*{{{ */
return qthread_feb_adr_init(a, is_recursive);
} /*}}} */

int API_FUNC qthread_lock_destroy(aligned_t *a)
{ /*{{{ */
return qthread_feb_adr_remove(a);
} /*}}} */

int API_FUNC qthread_lock(const aligned_t *a)
{ /*{{{ */
return qthread_readFE(NULL, a);
} /*}}} */

const int API_FUNC qthread_trylock(const aligned_t *a)
{ /*{{{ */
return qthread_readFE_nb(NULL, a);
} /*}}} */

int API_FUNC qthread_unlock(const aligned_t *a)
{ /*{{{ */
return qthread_fill(a);
Expand Down
Loading