diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index 5e4bcbf835a4d0..adb4764d1bbac6 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -127,6 +127,7 @@ struct _is { uintptr_t last_restart_version; struct pythreads { + _PyRecursiveMutex mutex; uint64_t next_unique_id; /* The linked list of threads, newest first. */ PyThreadState *head; diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h index 57cbce8f126aca..a14493c4965ad5 100644 --- a/Include/internal/pycore_lock.h +++ b/Include/internal/pycore_lock.h @@ -151,10 +151,12 @@ _PyOnceFlag_CallOnce(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg) return _PyOnceFlag_CallOnceSlow(flag, fn, arg); } +#define _PyThread_ident_t unsigned long long + // A recursive mutex. The mutex should zero-initialized. typedef struct { PyMutex mutex; - unsigned long long thread; // i.e., PyThread_get_thread_ident_ex() + _PyThread_ident_t thread; // i.e., PyThread_get_thread_ident_ex() size_t level; } _PyRecursiveMutex; @@ -164,6 +166,37 @@ extern PyLockStatus _PyRecursiveMutex_LockTimed(_PyRecursiveMutex *m, PyTime_t t PyAPI_FUNC(void) _PyRecursiveMutex_Unlock(_PyRecursiveMutex *m); extern int _PyRecursiveMutex_TryUnlock(_PyRecursiveMutex *m); +PyAPI_FUNC(_PyThread_ident_t) PyThread_get_thread_ident_ex(void); + +static inline void +_PyRecursiveMutex_LockFlags(_PyRecursiveMutex *m, _PyLockFlags flags) +{ + uint8_t expected = _Py_UNLOCKED; + if (_Py_atomic_compare_exchange_uint8(&m->mutex._bits, &expected, _Py_LOCKED)) { + m->thread = PyThread_get_thread_ident_ex(); + assert(m->level == 0); + } + else { + _PyRecursiveMutex_LockTimed(m, -1, flags); + } +} + +#ifdef HAVE_FORK +PyAPI_FUNC(_PyThread_ident_t) PyThread_get_thread_ident_ex(void); + +static inline void +_PyRecursiveMutex_at_fork_reinit(_PyRecursiveMutex *m, + _PyThread_ident_t parent) +{ + if (m->thread == parent) { + m->thread = PyThread_get_thread_ident_ex(); + } + else { + memset(m, 0, sizeof(*m)); + } +} +#endif + // A readers-writer (RW) lock. The lock supports multiple concurrent readers or // a single writer. The lock is write-preferring: if a writer is waiting while // the lock is read-locked then, new readers will be blocked. This avoids diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index 54d8803bc0bdb6..d9d9ff780a312d 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -269,14 +269,18 @@ extern int _PyOS_InterruptOccurred(PyThreadState *tstate); #define HEAD_UNLOCK(runtime) \ PyMutex_Unlock(&(runtime)->interpreters.mutex) +#define THREADS_HEAD_LOCK(interp) \ + _PyRecursiveMutex_LockFlags(&(interp)->threads.mutex, _Py_LOCK_DONT_DETACH) +#define THREADS_HEAD_UNLOCK(interp) \ + _PyRecursiveMutex_Unlock(&(interp)->threads.mutex) + #define _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) \ for (PyThreadState *t = interp->threads.head; t; t = t->next) #define _Py_FOR_EACH_TSTATE_BEGIN(interp, t) \ - HEAD_LOCK(interp->runtime); \ + THREADS_HEAD_LOCK(interp); \ _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) #define _Py_FOR_EACH_TSTATE_END(interp) \ - HEAD_UNLOCK(interp->runtime) - + THREADS_HEAD_UNLOCK(interp) // Get the configuration of the current interpreter. // The caller must hold the GIL. diff --git a/Include/internal/pycore_runtime.h b/Include/internal/pycore_runtime.h index 2f2cec22cf1589..da5b5b4b086be7 100644 --- a/Include/internal/pycore_runtime.h +++ b/Include/internal/pycore_runtime.h @@ -164,6 +164,11 @@ typedef struct pyruntimestate { _Py_AuditHookEntry *head; } audit_hooks; +#ifdef HAVE_FORK + // Guarded by multiple global locks. + PyThread_ident_t fork_tid; +#endif + struct _py_object_runtime_state object_state; struct _Py_float_runtime_state float_state; struct _Py_unicode_runtime_state unicode_state; diff --git a/Modules/posixmodule.c b/Modules/posixmodule.c index 6eb7054b566e3f..40f9d2a7edc923 100644 --- a/Modules/posixmodule.c +++ b/Modules/posixmodule.c @@ -624,11 +624,13 @@ PyOS_BeforeFork(void) _PyImport_AcquireLock(interp); _PyEval_StopTheWorldAll(&_PyRuntime); HEAD_LOCK(&_PyRuntime); + _PyRuntime.fork_tid = PyThread_get_thread_ident_ex(); } void PyOS_AfterFork_Parent(void) { + _PyRuntime.fork_tid = 0; HEAD_UNLOCK(&_PyRuntime); _PyEval_StartTheWorldAll(&_PyRuntime); @@ -648,6 +650,7 @@ PyOS_AfterFork_Child(void) if (_PyStatus_EXCEPTION(status)) { goto fatal_error; } + _PyRuntime.fork_tid = 0; PyThreadState *tstate = _PyThreadState_GET(); _Py_EnsureTstateNotNULL(tstate); diff --git a/Objects/object.c b/Objects/object.c index 8868fa29066404..76855bf306b9bf 100644 --- a/Objects/object.c +++ b/Objects/object.c @@ -119,11 +119,11 @@ get_reftotal(PyInterpreterState *interp) since we can't determine which interpreter updated it. */ Py_ssize_t total = REFTOTAL(interp); #ifdef Py_GIL_DISABLED - _Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) { - /* This may race with other threads modifications to their reftotal */ + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { _PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)p; total += _Py_atomic_load_ssize_relaxed(&tstate_impl->reftotal); } + _Py_FOR_EACH_TSTATE_END(interp); #endif return total; } @@ -317,10 +317,7 @@ _Py_GetLegacyRefTotal(void) Py_ssize_t _PyInterpreterState_GetRefTotal(PyInterpreterState *interp) { - HEAD_LOCK(&_PyRuntime); - Py_ssize_t total = get_reftotal(interp); - HEAD_UNLOCK(&_PyRuntime); - return total; + return get_reftotal(interp); } #endif /* Py_REF_DEBUG */ diff --git a/Objects/obmalloc.c b/Objects/obmalloc.c index 2cc0377f68f990..f13518cdb81608 100644 --- a/Objects/obmalloc.c +++ b/Objects/obmalloc.c @@ -1439,13 +1439,14 @@ get_mimalloc_allocated_blocks(PyInterpreterState *interp) { size_t allocated_blocks = 0; #ifdef Py_GIL_DISABLED - _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, t) { _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)t; for (int i = 0; i < _Py_MIMALLOC_HEAP_COUNT; i++) { mi_heap_t *heap = &tstate->mimalloc.heaps[i]; mi_heap_visit_blocks(heap, false, &count_blocks, &allocated_blocks); } } + _Py_FOR_EACH_TSTATE_END(interp); mi_abandoned_pool_t *pool = &interp->mimalloc.abandoned_pool; for (uint8_t tag = 0; tag < _Py_MIMALLOC_HEAP_COUNT; tag++) { diff --git a/Python/gc_free_threading.c b/Python/gc_free_threading.c index f7f44407494e51..f168ecc20ada28 100644 --- a/Python/gc_free_threading.c +++ b/Python/gc_free_threading.c @@ -304,7 +304,8 @@ gc_visit_heaps_lock_held(PyInterpreterState *interp, mi_block_visit_fun *visitor Py_ssize_t offset_pre = offset_base + 2 * sizeof(PyObject*); // visit each thread's heaps for GC objects - _Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) { + int failed = 0; + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { struct _mimalloc_thread_state *m = &((_PyThreadStateImpl *)p)->mimalloc; if (!_Py_atomic_load_int(&m->initialized)) { // The thread may not have called tstate_mimalloc_bind() yet. @@ -314,14 +315,20 @@ gc_visit_heaps_lock_held(PyInterpreterState *interp, mi_block_visit_fun *visitor arg->offset = offset_base; if (!mi_heap_visit_blocks(&m->heaps[_Py_MIMALLOC_HEAP_GC], true, visitor, arg)) { - return -1; + failed = 1; + break; } arg->offset = offset_pre; if (!mi_heap_visit_blocks(&m->heaps[_Py_MIMALLOC_HEAP_GC_PRE], true, visitor, arg)) { - return -1; + failed = 1; + break; } } + _Py_FOR_EACH_TSTATE_END(interp); + if (failed) { + return -1; + } // visit blocks in the per-interpreter abandoned pool (from dead threads) mi_abandoned_pool_t *pool = &interp->mimalloc.abandoned_pool; diff --git a/Python/pystate.c b/Python/pystate.c index 975eb6d4fbd0f2..1df0c562de3ccc 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -499,6 +499,9 @@ _PyRuntimeState_Fini(_PyRuntimeState *runtime) PyStatus _PyRuntimeState_ReInitThreads(_PyRuntimeState *runtime) { + PyThread_ident_t parent = _PyRuntime.fork_tid; + assert(parent != 0); + // This was initially set in _PyRuntimeState_Init(). runtime->main_thread = PyThread_get_thread_ident(); @@ -515,6 +518,7 @@ _PyRuntimeState_ReInitThreads(_PyRuntimeState *runtime) for (PyInterpreterState *interp = runtime->interpreters.head; interp != NULL; interp = interp->next) { + _PyRecursiveMutex_at_fork_reinit(&interp->threads.mutex, parent); for (int i = 0; i < NUM_WEAKREF_LIST_LOCKS; i++) { _PyMutex_at_fork_reinit(&interp->weakref_locks[i]); } @@ -773,7 +777,6 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate) { assert(interp != NULL); assert(tstate != NULL); - _PyRuntimeState *runtime = interp->runtime; /* XXX Conditions we need to enforce: @@ -790,15 +793,19 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate) } // Clear the current/main thread state last. + HEAD_LOCK(interp->runtime); _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { // See https://github.com/python/cpython/issues/102126 // Must be called without HEAD_LOCK held as it can deadlock // if any finalizer tries to acquire that lock. - HEAD_UNLOCK(runtime); + THREADS_HEAD_UNLOCK(interp); + HEAD_UNLOCK(interp->runtime); PyThreadState_Clear(p); - HEAD_LOCK(runtime); + HEAD_LOCK(interp->runtime); + THREADS_HEAD_LOCK(interp); } _Py_FOR_EACH_TSTATE_END(interp); + HEAD_UNLOCK(interp->runtime); if (tstate->interp == interp) { /* We fix tstate->_status below when we for sure aren't using it (e.g. no longer need the GIL). */ @@ -1537,6 +1544,7 @@ new_threadstate(PyInterpreterState *interp, int whence) /* We serialize concurrent creation to protect global state. */ HEAD_LOCK(interp->runtime); + THREADS_HEAD_LOCK(interp); // Initialize the new thread state. interp->threads.next_unique_id += 1; @@ -1547,6 +1555,7 @@ new_threadstate(PyInterpreterState *interp, int whence) PyThreadState *old_head = interp->threads.head; add_threadstate(interp, (PyThreadState *)tstate, old_head); + THREADS_HEAD_UNLOCK(interp); HEAD_UNLOCK(interp->runtime); #ifdef Py_GIL_DISABLED @@ -1744,6 +1753,8 @@ tstate_delete_common(PyThreadState *tstate, int release_gil) _PyRuntimeState *runtime = interp->runtime; HEAD_LOCK(runtime); + + THREADS_HEAD_LOCK(interp); if (tstate->prev) { tstate->prev->next = tstate->next; } @@ -1753,6 +1764,8 @@ tstate_delete_common(PyThreadState *tstate, int release_gil) if (tstate->next) { tstate->next->prev = tstate->prev; } + THREADS_HEAD_UNLOCK(interp); + if (tstate->state != _Py_THREAD_SUSPENDED) { // Any ongoing stop-the-world request should not wait for us because // our thread is getting deleted. @@ -1800,11 +1813,12 @@ zapthreads(PyInterpreterState *interp) { /* No need to lock the mutex here because this should only happen when the threads are all really dead (XXX famous last words). */ - _Py_FOR_EACH_TSTATE_UNLOCKED(interp, tstate) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, tstate) { tstate_verify_not_active(tstate); tstate_delete_common(tstate, 0); free_threadstate((_PyThreadStateImpl *)tstate); } + _Py_FOR_EACH_TSTATE_END(interp); } @@ -1852,13 +1866,12 @@ _PyThreadState_RemoveExcept(PyThreadState *tstate) { assert(tstate != NULL); PyInterpreterState *interp = tstate->interp; - _PyRuntimeState *runtime = interp->runtime; #ifdef Py_GIL_DISABLED - assert(runtime->stoptheworld.world_stopped); + assert(interp->runtime->stoptheworld.world_stopped); #endif - HEAD_LOCK(runtime); + THREADS_HEAD_LOCK(interp); /* Remove all thread states, except tstate, from the linked list of thread states. */ PyThreadState *list = interp->threads.head; @@ -1873,7 +1886,7 @@ _PyThreadState_RemoveExcept(PyThreadState *tstate) } tstate->prev = tstate->next = NULL; interp->threads.head = tstate; - HEAD_UNLOCK(runtime); + THREADS_HEAD_UNLOCK(interp); return list; } @@ -2184,7 +2197,7 @@ park_detached_threads(struct _stoptheworld_state *stw) { int num_parked = 0; _Py_FOR_EACH_STW_INTERP(stw, i) { - _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + _Py_FOR_EACH_TSTATE_BEGIN(i, t) { int state = _Py_atomic_load_int_relaxed(&t->state); if (state == _Py_THREAD_DETACHED) { // Atomically transition to "suspended" if in "detached" state. @@ -2197,6 +2210,7 @@ park_detached_threads(struct _stoptheworld_state *stw) _Py_set_eval_breaker_bit(t, _PY_EVAL_PLEASE_STOP_BIT); } } + _Py_FOR_EACH_TSTATE_END(i); } stw->thread_countdown -= num_parked; assert(stw->thread_countdown >= 0); @@ -2223,12 +2237,13 @@ stop_the_world(struct _stoptheworld_state *stw) stw->requester = _PyThreadState_GET(); // may be NULL _Py_FOR_EACH_STW_INTERP(stw, i) { - _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + _Py_FOR_EACH_TSTATE_BEGIN(i, t) { if (t != stw->requester) { // Count all the other threads (we don't wait on ourself). stw->thread_countdown++; } } + _Py_FOR_EACH_TSTATE_END(i); } if (stw->thread_countdown == 0) { @@ -2269,7 +2284,7 @@ start_the_world(struct _stoptheworld_state *stw) stw->world_stopped = 0; // Switch threads back to the detached state. _Py_FOR_EACH_STW_INTERP(stw, i) { - _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + _Py_FOR_EACH_TSTATE_BEGIN(i, t) { if (t != stw->requester) { assert(_Py_atomic_load_int_relaxed(&t->state) == _Py_THREAD_SUSPENDED); @@ -2277,6 +2292,7 @@ start_the_world(struct _stoptheworld_state *stw) _PyParkingLot_UnparkAll(&t->state); } } + _Py_FOR_EACH_TSTATE_END(i); } stw->requester = NULL; HEAD_UNLOCK(runtime); @@ -2511,7 +2527,8 @@ _PyThread_CurrentFrames(void) HEAD_LOCK(runtime); PyInterpreterState *i; for (i = runtime->interpreters.head; i != NULL; i = i->next) { - _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + int failed = 0; + _Py_FOR_EACH_TSTATE_BEGIN(i, t) { _PyInterpreterFrame *frame = t->current_frame; frame = _PyFrame_GetFirstComplete(frame); if (frame == NULL) { @@ -2519,19 +2536,26 @@ _PyThread_CurrentFrames(void) } PyObject *id = PyLong_FromUnsignedLong(t->thread_id); if (id == NULL) { - goto fail; + failed = 1; + break; } PyObject *frameobj = (PyObject *)_PyFrame_GetFrameObject(frame); if (frameobj == NULL) { Py_DECREF(id); - goto fail; + failed = 1; + break; } int stat = PyDict_SetItem(result, id, frameobj); Py_DECREF(id); if (stat < 0) { - goto fail; + failed = 1; + break; } } + _Py_FOR_EACH_TSTATE_END(i); + if (failed) { + goto fail; + } } goto done; @@ -2576,14 +2600,16 @@ _PyThread_CurrentExceptions(void) HEAD_LOCK(runtime); PyInterpreterState *i; for (i = runtime->interpreters.head; i != NULL; i = i->next) { - _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + int failed = 0; + _Py_FOR_EACH_TSTATE_BEGIN(i, t) { _PyErr_StackItem *err_info = _PyErr_GetTopmostException(t); if (err_info == NULL) { continue; } PyObject *id = PyLong_FromUnsignedLong(t->thread_id); if (id == NULL) { - goto fail; + failed = 1; + break; } PyObject *exc = err_info->exc_value; assert(exc == NULL || @@ -2593,9 +2619,14 @@ _PyThread_CurrentExceptions(void) int stat = PyDict_SetItem(result, id, exc == NULL ? Py_None : exc); Py_DECREF(id); if (stat < 0) { - goto fail; + failed = 1; + break; } } + _Py_FOR_EACH_TSTATE_END(i); + if (failed) { + goto fail; + } } goto done;