diff --git a/include/qt_addrstat.h b/include/qt_addrstat.h index 7f7124eed..16d0df5bb 100644 --- a/include/qt_addrstat.h +++ b/include/qt_addrstat.h @@ -18,6 +18,8 @@ static QINLINE qthread_addrstat_t *qthread_addrstat_new(void) ret->FEQ = NULL; ret->FFQ = NULL; ret->FFWQ = NULL; + ret->acq_owner_stat.state = FEB_is_not_recursive_lock; + ret->acq_owner_stat.recursive_access_counter = 0; QTHREAD_EMPTY_TIMER_INIT(ret); } return ret; diff --git a/include/qt_blocking_structs.h b/include/qt_blocking_structs.h index 4b373d854..261781610 100644 --- a/include/qt_blocking_structs.h +++ b/include/qt_blocking_structs.h @@ -9,6 +9,9 @@ #include "qt_profiling.h" #include "qt_debug.h" +#define FEB_is_recursive_lock (-1) +#define FEB_is_not_recursive_lock (-2) + typedef enum blocking_syscalls { ACCEPT, CONNECT, @@ -46,6 +49,11 @@ typedef struct _qt_blocking_queue_node_s { int err; } qt_blocking_queue_node_t; +typedef struct qthread_acquire_owner_stat_s { + int64_t state; + int64_t recursive_access_counter; +} qthread_acquire_owner_stat_t; + typedef struct qthread_addrstat_s { QTHREAD_FASTLOCK_TYPE lock; qthread_addrres_t *EFQ; @@ -57,6 +65,8 @@ typedef struct qthread_addrstat_s { #endif uint_fast8_t full; uint_fast8_t valid; + qthread_acquire_owner_stat_t acq_owner_stat; + } qthread_addrstat_t; #ifdef UNPOOLED diff --git a/include/qt_feb.h b/include/qt_feb.h index f501817f4..e12c31059 100644 --- a/include/qt_feb.h +++ b/include/qt_feb.h @@ -17,6 +17,9 @@ typedef filter_code (*qt_feb_taskfilter_f)(qt_key_t addr, qthread_t *restrict waiter, void *restrict arg); +int INTERNAL qthread_feb_adr_init(const aligned_t *dest, const bool is_from_recursive_lock); +int INTERNAL qthread_feb_adr_remove(aligned_t *dest); + void INTERNAL qt_feb_subsystem_init(uint_fast8_t); int API_FUNC qthread_writeEF_nb(aligned_t *restrict const dest, diff --git a/include/qthread/qthread.h b/include/qthread/qthread.h index 694096420..55c8fb206 100644 --- a/include/qthread/qthread.h +++ b/include/qthread/qthread.h @@ -8,6 +8,7 @@ #include "common.h" /* important configuration options */ #include /* for memcpy() */ +#include /* for bool */ #ifndef QTHREAD_NOALIGNCHECK # include /* for fprintf() */ @@ -124,7 +125,6 @@ typedef struct _syncvar_s { typedef unsigned short qthread_shepherd_id_t; typedef unsigned short qthread_worker_id_t; - /* for convenient arguments to qthread_fork */ typedef aligned_t (*qthread_f)(void *arg); typedef void (*qthread_agg_f)(int count, qthread_f *f, void **arg, void **ret, uint16_t flags); @@ -434,6 +434,7 @@ int qthread_syncvar_status(syncvar_t *const v); int qthread_empty(const aligned_t *dest); int qthread_syncvar_empty(syncvar_t *restrict dest); int qthread_fill(const aligned_t *dest); +int qthread_fill_recursive(const aligned_t *dest); int qthread_syncvar_fill(syncvar_t *restrict dest); /* These functions wait for memory to become empty, and then fill it. When @@ -553,6 +554,11 @@ int qthread_readXX(aligned_t *dest, */ int qthread_lock(const aligned_t *a); int qthread_unlock(const aligned_t *a); +int qthread_lock_init(const aligned_t *a, const bool is_recursive); +int qthread_lock_destroy(aligned_t *a); + +const int qthread_trylock(const aligned_t *a); + #if defined(QTHREAD_MUTEX_INCREMENT) || \ (QTHREAD_ASSEMBLY_ARCH == QTHREAD_POWERPC32) || \ diff --git a/src/feb.c b/src/feb.c index 138e5205c..8ebaf68a4 100644 --- a/src/feb.c +++ b/src/feb.c @@ -430,7 +430,8 @@ static QINLINE void qthread_gotlock_empty_inner(qthread_shepherd_t *shep, FREE_ADDRRES(X); qthread_gotlock_fill_inner(shep, m, maddr, 1, precond_tasks); } - if ((m->full == 1) && (m->EFQ == NULL) && (m->FEQ == NULL) && (m->FFQ == NULL) && (m->FFWQ == NULL)) { + if ((m->full == 1) && (m->EFQ == NULL) && (m->FEQ == NULL) && (m->FFQ == NULL) && (m->FFWQ == NULL) + && (m->acq_owner_stat.state EFQ == NULL) && (m->FEQ == NULL) && (m->full == 1)) { + if ((m->EFQ == NULL) && (m->FEQ == NULL) && (m->full == 1) + && + (m->acq_owner_stat.state < FEB_is_recursive_lock)) { qthread_debug(FEB_DETAILS, "m(%p), addr(%p), recursive(%u): addrstat removeable!\n", m, maddr, recursive); removeable = 1; } else { @@ -665,6 +668,82 @@ int API_FUNC qthread_empty(const aligned_t *dest) return QTHREAD_SUCCESS; } /*}}} */ +int INTERNAL qthread_feb_adr_init(const aligned_t *dest, const bool is_from_recursive_lock) +{ /*{{{ */ + const aligned_t *alignedaddr; + + if (qlib == NULL) { + return QTHREAD_SUCCESS; + } + qthread_addrstat_t *m; + const int lockbin = QTHREAD_CHOOSE_STRIPE2(dest); + qthread_shepherd_t *shep = qthread_internal_getshep(); + + assert(qthread_library_initialized); + + if (!shep) { + printf("\nXXX ERROR XXX ERROR XXX ERROR XXX ERROR\n\n"); + exit(0);//FIXME + return qthread_feb_blocker_func((void *)dest, NULL, FILL/*!!!!!!!XXX!!!*/); + } + + qthread_debug(FEB_CALLS, "dest=%p (tid=%i)\n", dest, qthread_id()); + QALIGN(dest, alignedaddr); + /* lock hash */ + QTHREAD_COUNT_THREADS_BINCOUNTER(febs, lockbin); +#ifdef LOCK_FREE_FEBS + do { + m = qt_hash_get(FEBs[lockbin], (void *)alignedaddr); + if (m) { + return QTHREAD_NOT_ALLOWED; + } else { + if(is_from_recursive_lock) { + m = qthread_addrstat_new(); + if (!m) { return QTHREAD_MALLOC_ERROR; } + m->full = 1; + m->acq_owner_stat.state = FEB_is_recursive_lock; + MACHINE_FENCE; + + if (!qt_hash_put(FEBs[lockbin], (void *)alignedaddr, m)) { + qthread_addrstat_delete(m); + continue; + } + } + /* already full */ + break; + } + } while (1); +#else /* ifdef LOCK_FREE_FEBS */ + qt_hash_lock(FEBs[lockbin]); + { /* BEGIN CRITICAL SECTION */ + m = (qthread_addrstat_t *)qt_hash_get_locked(FEBs[lockbin], (void *)alignedaddr); + if (m) { + return QTHREAD_NOT_ALLOWED; + } else { + if(is_from_recursive_lock) { + m = qthread_addrstat_new(); + if (!m) { return QTHREAD_MALLOC_ERROR; } + m->full = 1; + m->acq_owner_stat.state = FEB_is_recursive_lock; + MACHINE_FENCE; + + if (!qt_hash_put_locked(FEBs[lockbin], (void *)alignedaddr, m)){ + qthread_addrstat_delete(m); + return QTHREAD_MALLOC_ERROR; + } + } + } + } /* END CRITICAL SECTION */ + qt_hash_unlock(FEBs[lockbin]); /* unlock hash */ +#endif /* ifdef LOCK_FREE_FEBS */ + return QTHREAD_SUCCESS; +} /*}}} */ + +int INTERNAL qthread_feb_adr_remove(aligned_t *dest) +{ /*{{{ */ + qthread_FEB_remove(dest); +} /*}}} */ + int API_FUNC qthread_fill(const aligned_t *dest) { /*{{{ */ const aligned_t *alignedaddr; @@ -713,8 +792,20 @@ int API_FUNC qthread_fill(const aligned_t *dest) qt_hash_unlock(FEBs[lockbin]); /* unlock hash */ #endif /* ifdef LOCK_FREE_FEBS */ if (m) { + if(m->acq_owner_stat.state >= FEB_is_recursive_lock) { + //assert(m->acq_owner_stat.recursive_access_counter <=-1); + --m->acq_owner_stat.recursive_access_counter; + if (m->acq_owner_stat.recursive_access_counter > 0){ + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): released recursive lock (inner:%i)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),m->acq_owner_stat.recursive_access_counter ); + QTHREAD_FASTLOCK_UNLOCK(&m->lock); + return QTHREAD_SUCCESS; + } + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): released recursive lock (outer)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER)); + m->acq_owner_stat.state = FEB_is_recursive_lock; //reset owner, maintain lock type + } /* if dest wasn't in the hash, it was already full. Since it was, * we need to fill it. */ + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i): filling, maybe alerting waiters\n", dest, qthread_id()); qthread_gotlock_fill(shep, m, (void *)alignedaddr); } @@ -785,6 +876,7 @@ int API_FUNC qthread_writeF(aligned_t *restrict dest, return QTHREAD_SUCCESS; } /*}}} */ + int API_FUNC qthread_writeF_const(aligned_t *dest, aligned_t src) { /*{{{ */ @@ -1357,6 +1449,7 @@ int API_FUNC qthread_readFF_nb(aligned_t *restrict dest, return QTHREAD_SUCCESS; } /*}}} */ + /* the way this works is that: * 1 - src's FEB state must be "full" * 2 - data is copied from src to destination @@ -1379,6 +1472,7 @@ int API_FUNC qthread_readFE(aligned_t *restrict dest, if (!me) { return qthread_feb_blocker_func(dest, (void *)src, READFE); } + assert(me->rdata); qthread_debug(FEB_CALLS, "dest=%p, src=%p (tid=%i)\n", dest, src, me->thread_id); QTHREAD_FEB_UNIQUERECORD(feb, src, me); @@ -1436,7 +1530,19 @@ int API_FUNC qthread_readFE(aligned_t *restrict dest, assert(m); qthread_debug(FEB_DETAILS, "data structure locked\n"); /* by this point m is locked */ - if (m->full == 0) { /* empty, thus, we must block */ + if (m->full == 0) { /* empty, thus, we must block */ + /* unless read_FE was taken by recursive qthread_lock() */ + /* on same thread */ + if(m->acq_owner_stat.state >= FEB_is_recursive_lock) { + if (m->acq_owner_stat.state == qthread_readstate(CURRENT_UNIQUE_WORKER)) { + ++m->acq_owner_stat.recursive_access_counter; + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (inner:%ld)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),(long long)m->acq_owner_stat.recursive_access_counter ); + QTHREAD_FASTLOCK_UNLOCK(&m->lock); + QTHREAD_FEB_TIMER_STOP(febblock, me); + return QTHREAD_SUCCESS; + } + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): attempt to acquire recursive lock from pid=%ld failed, back to master\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),(long long)m->acq_owner_stat.state); + } QTHREAD_WAIT_TIMER_DECLARATION; qthread_addrres_t *X = ALLOC_ADDRRES(); @@ -1461,10 +1567,20 @@ int API_FUNC qthread_readFE(aligned_t *restrict dest, #endif /* QTHREAD_USE_EUREKAS */ qthread_debug(FEB_BEHAVIOR, "tid %u succeeded on %p=%p after waiting\n", me->thread_id, dest, src); } else { /* full, thus IT IS OURS! MUAHAHAHA! */ + + if (dest && (dest != src)) { *(aligned_t *)dest = *(aligned_t *)src; MACHINE_FENCE; } + + if(m->acq_owner_stat.state >= FEB_is_recursive_lock) { + m->acq_owner_stat.state = qthread_readstate(CURRENT_UNIQUE_WORKER); /* Set owner */ + ++m->acq_owner_stat.recursive_access_counter; + MACHINE_FENCE; + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (outer)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER)); + } + qthread_debug(FEB_BEHAVIOR, "tid %u succeeded on %p=%p\n", me->thread_id, dest, src); qthread_gotlock_empty(me->rdata->shepherd_ptr, m, (void *)alignedaddr); } @@ -1472,6 +1588,7 @@ int API_FUNC qthread_readFE(aligned_t *restrict dest, return QTHREAD_SUCCESS; } /*}}} */ + /* the way this works is that: * 1 - src's FEB state is ignored * 2 - data is copied from src to destination @@ -1551,7 +1668,18 @@ int API_FUNC qthread_readFE_nb(aligned_t *restrict dest, # endif /* ifdef LOCK_FREE_FEBS */ qthread_debug(FEB_DETAILS, "data structure locked\n"); /* by this point m is locked */ - if (m->full == 0) { /* empty, thus, we must fail */ + if (m->full == 0) { /* empty, thus, we must block */ + /* unless read_FE was taken by qthread_lock() */ + /* on same thread */ + if(m->acq_owner_stat.state >= FEB_is_recursive_lock) { + if (m->acq_owner_stat.state == qthread_readstate(CURRENT_UNIQUE_WORKER)) { + ++m->acq_owner_stat.recursive_access_counter; + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (inner:%i)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),m->acq_owner_stat.recursive_access_counter ); + QTHREAD_FASTLOCK_UNLOCK(&m->lock); + return QTHREAD_SUCCESS; + } + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): attempt to acquire inner recursive lock from pid=%i failed thus blocking\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER),m->acq_owner_stat.state); + } qthread_debug(FEB_BEHAVIOR, "tid %u non-blocking fail\n", me->thread_id); QTHREAD_FASTLOCK_UNLOCK(&m->lock); return QTHREAD_OPFAIL; @@ -1560,6 +1688,14 @@ int API_FUNC qthread_readFE_nb(aligned_t *restrict dest, *(aligned_t *)dest = *(aligned_t *)src; MACHINE_FENCE; } + + if(m->acq_owner_stat.state >= FEB_is_recursive_lock) { + m->acq_owner_stat.state = qthread_readstate(CURRENT_UNIQUE_WORKER); /* Set owner */ + ++m->acq_owner_stat.recursive_access_counter; + MACHINE_FENCE; + qthread_debug(FEB_BEHAVIOR, "dest=%p (tid=%i,pid=%i): acquired recursive lock (outer)\n", dest, qthread_id(), qthread_readstate(CURRENT_UNIQUE_WORKER)); + } + qthread_debug(FEB_BEHAVIOR, "tid %u succeeded on %p=%p\n", me->thread_id, dest, src); qthread_gotlock_empty(me->rdata->shepherd_ptr, m, (void *)alignedaddr); } diff --git a/src/locks.c b/src/locks.c index cd2a45902..f9d0d4bf9 100644 --- a/src/locks.c +++ b/src/locks.c @@ -4,17 +4,33 @@ /* The API */ #include "qthread/qthread.h" +#include "qt_feb.h" /* Internal Headers */ #include "qt_visibility.h" /* functions to implement FEB-ish locking/unlocking*/ +int API_FUNC qthread_lock_init(const aligned_t *a, const bool is_recursive) +{ /*{{{ */ + return qthread_feb_adr_init(a, is_recursive); +} /*}}} */ + +int API_FUNC qthread_lock_destroy(aligned_t *a) +{ /*{{{ */ + return qthread_feb_adr_remove(a); +} /*}}} */ + int API_FUNC qthread_lock(const aligned_t *a) { /*{{{ */ return qthread_readFE(NULL, a); } /*}}} */ +const int API_FUNC qthread_trylock(const aligned_t *a) +{ /*{{{ */ + return qthread_readFE_nb(NULL, a); +} /*}}} */ + int API_FUNC qthread_unlock(const aligned_t *a) { /*{{{ */ return qthread_fill(a); diff --git a/test/stress/lock_acq_rel.c b/test/stress/lock_acq_rel.c index a21e6c788..9c18b354e 100644 --- a/test/stress/lock_acq_rel.c +++ b/test/stress/lock_acq_rel.c @@ -2,20 +2,52 @@ #include #include #include +#include -static int64_t count = 0; +static int64_t count; static aligned_t lock; +bool is_recursive_lock = true; +bool is_not_recursive_lock = false; -static void task(size_t start, size_t stop, void *args_) { +pthread_mutex_t count_mutex; + +static void task_1(size_t start, size_t stop, void *args_) { qthread_lock(&lock); count++; qthread_unlock(&lock); } +static void task_2(size_t start, size_t stop, void *args_) { + qthread_lock(&lock); + qthread_lock(&lock); + count++; + qthread_unlock(&lock); + qthread_unlock(&lock); +} + int main(int argc, char *argv[]) { - uint64_t iters = 1000000l; + uint64_t iters = 4l; assert(qthread_initialize() == 0); - qt_loop(0, iters, task, NULL); + + /* Simple lock acquire and release */ + count = 0; + qt_loop(0, iters, task_1, NULL); assert(iters == count); + + /* Recursive lock acquire and release, no recursion */ + qthread_lock_init(&lock, is_recursive_lock); + { + /* Recursive lock acquire and release, no recursion */ + count = 0; + qt_loop(0, iters, task_1, NULL); + assert(iters == count); + + /* Recursive lock acquire and release */ + count = 0; + qt_loop(0, iters, task_2, NULL); + assert(iters == count); + } + qthread_lock_destroy(&lock); + return 0; }