Skip to content

gh-114940: Add a Per-Interpreter Lock For the List of Thread States #127115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ struct _is {

uintptr_t last_restart_version;
struct pythreads {
_PyRecursiveMutex mutex;
uint64_t next_unique_id;
/* The linked list of threads, newest first. */
PyThreadState *head;
Expand Down
35 changes: 34 additions & 1 deletion Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ _PyOnceFlag_CallOnce(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg)
return _PyOnceFlag_CallOnceSlow(flag, fn, arg);
}

#define _PyThread_ident_t unsigned long long

// A recursive mutex. The mutex should zero-initialized.
typedef struct {
PyMutex mutex;
unsigned long long thread; // i.e., PyThread_get_thread_ident_ex()
_PyThread_ident_t thread; // i.e., PyThread_get_thread_ident_ex()
size_t level;
} _PyRecursiveMutex;

Expand All @@ -164,6 +166,37 @@ extern PyLockStatus _PyRecursiveMutex_LockTimed(_PyRecursiveMutex *m, PyTime_t t
PyAPI_FUNC(void) _PyRecursiveMutex_Unlock(_PyRecursiveMutex *m);
extern int _PyRecursiveMutex_TryUnlock(_PyRecursiveMutex *m);

PyAPI_FUNC(_PyThread_ident_t) PyThread_get_thread_ident_ex(void);

static inline void
_PyRecursiveMutex_LockFlags(_PyRecursiveMutex *m, _PyLockFlags flags)
{
uint8_t expected = _Py_UNLOCKED;
if (_Py_atomic_compare_exchange_uint8(&m->mutex._bits, &expected, _Py_LOCKED)) {
m->thread = PyThread_get_thread_ident_ex();
assert(m->level == 0);
}
else {
_PyRecursiveMutex_LockTimed(m, -1, flags);
}
}

#ifdef HAVE_FORK
PyAPI_FUNC(_PyThread_ident_t) PyThread_get_thread_ident_ex(void);

static inline void
_PyRecursiveMutex_at_fork_reinit(_PyRecursiveMutex *m,
_PyThread_ident_t parent)
{
if (m->thread == parent) {
m->thread = PyThread_get_thread_ident_ex();
}
else {
memset(m, 0, sizeof(*m));
}
}
#endif

// A readers-writer (RW) lock. The lock supports multiple concurrent readers or
// a single writer. The lock is write-preferring: if a writer is waiting while
// the lock is read-locked then, new readers will be blocked. This avoids
Expand Down
10 changes: 7 additions & 3 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,18 @@ extern int _PyOS_InterruptOccurred(PyThreadState *tstate);
#define HEAD_UNLOCK(runtime) \
PyMutex_Unlock(&(runtime)->interpreters.mutex)

#define THREADS_HEAD_LOCK(interp) \
_PyRecursiveMutex_LockFlags(&(interp)->threads.mutex, _Py_LOCK_DONT_DETACH)
#define THREADS_HEAD_UNLOCK(interp) \
_PyRecursiveMutex_Unlock(&(interp)->threads.mutex)

#define _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) \
for (PyThreadState *t = interp->threads.head; t; t = t->next)
#define _Py_FOR_EACH_TSTATE_BEGIN(interp, t) \
HEAD_LOCK(interp->runtime); \
THREADS_HEAD_LOCK(interp); \
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, t)
#define _Py_FOR_EACH_TSTATE_END(interp) \
HEAD_UNLOCK(interp->runtime)

THREADS_HEAD_UNLOCK(interp)

// Get the configuration of the current interpreter.
// The caller must hold the GIL.
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ typedef struct pyruntimestate {
_Py_AuditHookEntry *head;
} audit_hooks;

#ifdef HAVE_FORK
// Guarded by multiple global locks.
PyThread_ident_t fork_tid;
#endif

struct _py_object_runtime_state object_state;
struct _Py_float_runtime_state float_state;
struct _Py_unicode_runtime_state unicode_state;
Expand Down
3 changes: 3 additions & 0 deletions Modules/posixmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,13 @@ PyOS_BeforeFork(void)
_PyImport_AcquireLock(interp);
_PyEval_StopTheWorldAll(&_PyRuntime);
HEAD_LOCK(&_PyRuntime);
_PyRuntime.fork_tid = PyThread_get_thread_ident_ex();
}

void
PyOS_AfterFork_Parent(void)
{
_PyRuntime.fork_tid = 0;
HEAD_UNLOCK(&_PyRuntime);
_PyEval_StartTheWorldAll(&_PyRuntime);

Expand All @@ -648,6 +650,7 @@ PyOS_AfterFork_Child(void)
if (_PyStatus_EXCEPTION(status)) {
goto fatal_error;
}
_PyRuntime.fork_tid = 0;

PyThreadState *tstate = _PyThreadState_GET();
_Py_EnsureTstateNotNULL(tstate);
Expand Down
9 changes: 3 additions & 6 deletions Objects/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ get_reftotal(PyInterpreterState *interp)
since we can't determine which interpreter updated it. */
Py_ssize_t total = REFTOTAL(interp);
#ifdef Py_GIL_DISABLED
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) {
/* This may race with other threads modifications to their reftotal */
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)p;
total += _Py_atomic_load_ssize_relaxed(&tstate_impl->reftotal);
}
_Py_FOR_EACH_TSTATE_END(interp);
#endif
return total;
}
Expand Down Expand Up @@ -317,10 +317,7 @@ _Py_GetLegacyRefTotal(void)
Py_ssize_t
_PyInterpreterState_GetRefTotal(PyInterpreterState *interp)
{
HEAD_LOCK(&_PyRuntime);
Py_ssize_t total = get_reftotal(interp);
HEAD_UNLOCK(&_PyRuntime);
return total;
return get_reftotal(interp);
}

#endif /* Py_REF_DEBUG */
Expand Down
3 changes: 2 additions & 1 deletion Objects/obmalloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1439,13 +1439,14 @@ get_mimalloc_allocated_blocks(PyInterpreterState *interp)
{
size_t allocated_blocks = 0;
#ifdef Py_GIL_DISABLED
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, t) {
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)t;
for (int i = 0; i < _Py_MIMALLOC_HEAP_COUNT; i++) {
mi_heap_t *heap = &tstate->mimalloc.heaps[i];
mi_heap_visit_blocks(heap, false, &count_blocks, &allocated_blocks);
}
}
_Py_FOR_EACH_TSTATE_END(interp);

mi_abandoned_pool_t *pool = &interp->mimalloc.abandoned_pool;
for (uint8_t tag = 0; tag < _Py_MIMALLOC_HEAP_COUNT; tag++) {
Expand Down
13 changes: 10 additions & 3 deletions Python/gc_free_threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ gc_visit_heaps_lock_held(PyInterpreterState *interp, mi_block_visit_fun *visitor
Py_ssize_t offset_pre = offset_base + 2 * sizeof(PyObject*);

// visit each thread's heaps for GC objects
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) {
int failed = 0;
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
struct _mimalloc_thread_state *m = &((_PyThreadStateImpl *)p)->mimalloc;
if (!_Py_atomic_load_int(&m->initialized)) {
// The thread may not have called tstate_mimalloc_bind() yet.
Expand All @@ -314,14 +315,20 @@ gc_visit_heaps_lock_held(PyInterpreterState *interp, mi_block_visit_fun *visitor
arg->offset = offset_base;
if (!mi_heap_visit_blocks(&m->heaps[_Py_MIMALLOC_HEAP_GC], true,
visitor, arg)) {
return -1;
failed = 1;
break;
}
arg->offset = offset_pre;
if (!mi_heap_visit_blocks(&m->heaps[_Py_MIMALLOC_HEAP_GC_PRE], true,
visitor, arg)) {
return -1;
failed = 1;
break;
}
}
_Py_FOR_EACH_TSTATE_END(interp);
if (failed) {
return -1;
}

// visit blocks in the per-interpreter abandoned pool (from dead threads)
mi_abandoned_pool_t *pool = &interp->mimalloc.abandoned_pool;
Expand Down
Loading
Loading