Skip to content

Commit 925907e

Browse files
gh-113884: Make queue.SimpleQueue thread-safe when the GIL is disabled (#114161)
* use the ParkingLot API to manage waiting threads * use Argument Clinic's critical section directive to protect queue methods * remove unnecessary overflow check Co-authored-by: Erlend E. Aasland <[email protected]>
1 parent 441affc commit 925907e

File tree

3 files changed

+137
-90
lines changed

3 files changed

+137
-90
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make :class:`queue.SimpleQueue` thread safe when the GIL is disabled.

Modules/_queuemodule.c

+115-87
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
#endif
44

55
#include "Python.h"
6-
#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
6+
#include "pycore_ceval.h" // Py_MakePendingCalls()
77
#include "pycore_moduleobject.h" // _PyModule_GetState()
8+
#include "pycore_parking_lot.h"
89
#include "pycore_time.h" // _PyTime_t
910

1011
#include <stdbool.h>
@@ -151,7 +152,9 @@ RingBuf_Get(RingBuf *buf)
151152
return item;
152153
}
153154

154-
// Returns 0 on success or -1 if the buffer failed to grow
155+
// Returns 0 on success or -1 if the buffer failed to grow.
156+
//
157+
// Steals a reference to item.
155158
static int
156159
RingBuf_Put(RingBuf *buf, PyObject *item)
157160
{
@@ -164,7 +167,7 @@ RingBuf_Put(RingBuf *buf, PyObject *item)
164167
return -1;
165168
}
166169
}
167-
buf->items[buf->put_idx] = Py_NewRef(item);
170+
buf->items[buf->put_idx] = item;
168171
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
169172
buf->num_items++;
170173
return 0;
@@ -184,9 +187,13 @@ RingBuf_IsEmpty(RingBuf *buf)
184187

185188
typedef struct {
186189
PyObject_HEAD
187-
PyThread_type_lock lock;
188-
int locked;
190+
191+
// Are there threads waiting for items
192+
bool has_threads_waiting;
193+
194+
// Items in the queue
189195
RingBuf buf;
196+
190197
PyObject *weakreflist;
191198
} simplequeueobject;
192199

@@ -209,12 +216,6 @@ simplequeue_dealloc(simplequeueobject *self)
209216
PyTypeObject *tp = Py_TYPE(self);
210217

211218
PyObject_GC_UnTrack(self);
212-
if (self->lock != NULL) {
213-
/* Unlock the lock so it's safe to free it */
214-
if (self->locked > 0)
215-
PyThread_release_lock(self->lock);
216-
PyThread_free_lock(self->lock);
217-
}
218219
(void)simplequeue_clear(self);
219220
if (self->weakreflist != NULL)
220221
PyObject_ClearWeakRefs((PyObject *) self);
@@ -249,12 +250,6 @@ simplequeue_new_impl(PyTypeObject *type)
249250
self = (simplequeueobject *) type->tp_alloc(type, 0);
250251
if (self != NULL) {
251252
self->weakreflist = NULL;
252-
self->lock = PyThread_allocate_lock();
253-
if (self->lock == NULL) {
254-
Py_DECREF(self);
255-
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
256-
return NULL;
257-
}
258253
if (RingBuf_Init(&self->buf) < 0) {
259254
Py_DECREF(self);
260255
return NULL;
@@ -264,7 +259,29 @@ simplequeue_new_impl(PyTypeObject *type)
264259
return (PyObject *) self;
265260
}
266261

262+
typedef struct {
263+
bool handed_off;
264+
simplequeueobject *queue;
265+
PyObject *item;
266+
} HandoffData;
267+
268+
static void
269+
maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
270+
{
271+
if (item == NULL) {
272+
// No threads were waiting
273+
data->handed_off = false;
274+
}
275+
else {
276+
// There was at least one waiting thread, hand off the item
277+
*item = data->item;
278+
data->handed_off = true;
279+
}
280+
data->queue->has_threads_waiting = has_more_waiters;
281+
}
282+
267283
/*[clinic input]
284+
@critical_section
268285
_queue.SimpleQueue.put
269286
item: object
270287
block: bool = True
@@ -280,21 +297,28 @@ never blocks. They are provided for compatibility with the Queue class.
280297
static PyObject *
281298
_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
282299
int block, PyObject *timeout)
283-
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
300+
/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
284301
{
285-
/* BEGIN GIL-protected critical section */
286-
if (RingBuf_Put(&self->buf, item) < 0)
287-
return NULL;
288-
if (self->locked) {
289-
/* A get() may be waiting, wake it up */
290-
self->locked = 0;
291-
PyThread_release_lock(self->lock);
302+
HandoffData data = {
303+
.handed_off = 0,
304+
.item = Py_NewRef(item),
305+
.queue = self,
306+
};
307+
if (self->has_threads_waiting) {
308+
// Try to hand the item off directly if there are threads waiting
309+
_PyParkingLot_Unpark(&self->has_threads_waiting,
310+
(_Py_unpark_fn_t *)maybe_handoff_item, &data);
311+
}
312+
if (!data.handed_off) {
313+
if (RingBuf_Put(&self->buf, item) < 0) {
314+
return NULL;
315+
}
292316
}
293-
/* END GIL-protected critical section */
294317
Py_RETURN_NONE;
295318
}
296319

297320
/*[clinic input]
321+
@critical_section
298322
_queue.SimpleQueue.put_nowait
299323
item: object
300324
@@ -307,12 +331,23 @@ for compatibility with the Queue class.
307331

308332
static PyObject *
309333
_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
310-
/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/
334+
/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
311335
{
312336
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
313337
}
314338

339+
static PyObject *
340+
empty_error(PyTypeObject *cls)
341+
{
342+
PyObject *module = PyType_GetModule(cls);
343+
assert(module != NULL);
344+
simplequeue_state *state = simplequeue_get_state(module);
345+
PyErr_SetNone(state->EmptyError);
346+
return NULL;
347+
}
348+
315349
/*[clinic input]
350+
@critical_section
316351
_queue.SimpleQueue.get
317352
318353
cls: defining_class
@@ -335,23 +370,15 @@ in that case).
335370
static PyObject *
336371
_queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
337372
int block, PyObject *timeout_obj)
338-
/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
373+
/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
339374
{
340375
_PyTime_t endtime = 0;
341-
_PyTime_t timeout;
342-
PyObject *item;
343-
PyLockStatus r;
344-
PY_TIMEOUT_T microseconds;
345-
PyThreadState *tstate = PyThreadState_Get();
346376

347377
// XXX Use PyThread_ParseTimeoutArg().
348378

349-
if (block == 0) {
350-
/* Non-blocking */
351-
microseconds = 0;
352-
}
353-
else if (timeout_obj != Py_None) {
379+
if (block != 0 && !Py_IsNone(timeout_obj)) {
354380
/* With timeout */
381+
_PyTime_t timeout;
355382
if (_PyTime_FromSecondsObject(&timeout,
356383
timeout_obj, _PyTime_ROUND_CEILING) < 0) {
357384
return NULL;
@@ -361,65 +388,64 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
361388
"'timeout' must be a non-negative number");
362389
return NULL;
363390
}
364-
microseconds = _PyTime_AsMicroseconds(timeout,
365-
_PyTime_ROUND_CEILING);
366-
if (microseconds > PY_TIMEOUT_MAX) {
367-
PyErr_SetString(PyExc_OverflowError,
368-
"timeout value is too large");
369-
return NULL;
370-
}
371391
endtime = _PyDeadline_Init(timeout);
372392
}
373-
else {
374-
/* Infinitely blocking */
375-
microseconds = -1;
376-
}
377393

378-
/* put() signals the queue to be non-empty by releasing the lock.
379-
* So we simply try to acquire the lock in a loop, until the condition
380-
* (queue non-empty) becomes true.
381-
*/
382-
while (RingBuf_IsEmpty(&self->buf)) {
383-
/* First a simple non-blocking try without releasing the GIL */
384-
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
385-
if (r == PY_LOCK_FAILURE && microseconds != 0) {
386-
Py_BEGIN_ALLOW_THREADS
387-
r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
388-
Py_END_ALLOW_THREADS
394+
for (;;) {
395+
if (!RingBuf_IsEmpty(&self->buf)) {
396+
return RingBuf_Get(&self->buf);
389397
}
390398

391-
if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
392-
return NULL;
393-
}
394-
if (r == PY_LOCK_FAILURE) {
395-
PyObject *module = PyType_GetModule(cls);
396-
simplequeue_state *state = simplequeue_get_state(module);
397-
/* Timed out */
398-
PyErr_SetNone(state->EmptyError);
399-
return NULL;
399+
if (!block) {
400+
return empty_error(cls);
400401
}
401-
self->locked = 1;
402402

403-
/* Adjust timeout for next iteration (if any) */
404-
if (microseconds > 0) {
405-
timeout = _PyDeadline_Get(endtime);
406-
microseconds = _PyTime_AsMicroseconds(timeout,
407-
_PyTime_ROUND_CEILING);
403+
int64_t timeout_ns = -1;
404+
if (endtime != 0) {
405+
timeout_ns = _PyDeadline_Get(endtime);
406+
if (timeout_ns < 0) {
407+
return empty_error(cls);
408+
}
408409
}
409-
}
410410

411-
/* BEGIN GIL-protected critical section */
412-
item = RingBuf_Get(&self->buf);
413-
if (self->locked) {
414-
PyThread_release_lock(self->lock);
415-
self->locked = 0;
411+
bool waiting = 1;
412+
self->has_threads_waiting = waiting;
413+
414+
PyObject *item = NULL;
415+
int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
416+
sizeof(bool), timeout_ns, &item,
417+
/* detach */ 1);
418+
switch (st) {
419+
case Py_PARK_OK: {
420+
assert(item != NULL);
421+
return item;
422+
}
423+
case Py_PARK_TIMEOUT: {
424+
return empty_error(cls);
425+
}
426+
case Py_PARK_INTR: {
427+
// Interrupted
428+
if (Py_MakePendingCalls() < 0) {
429+
return NULL;
430+
}
431+
break;
432+
}
433+
case Py_PARK_AGAIN: {
434+
// This should be impossible with the current implementation of
435+
// PyParkingLot, but would be possible if critical sections /
436+
// the GIL were released before the thread was added to the
437+
// internal thread queue in the parking lot.
438+
break;
439+
}
440+
default: {
441+
Py_UNREACHABLE();
442+
}
443+
}
416444
}
417-
/* END GIL-protected critical section */
418-
419-
return item;
420445
}
421446

422447
/*[clinic input]
448+
@critical_section
423449
_queue.SimpleQueue.get_nowait
424450
425451
cls: defining_class
@@ -434,33 +460,35 @@ raise the Empty exception.
434460
static PyObject *
435461
_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
436462
PyTypeObject *cls)
437-
/*[clinic end generated code: output=620c58e2750f8b8a input=842f732bf04216d3]*/
463+
/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
438464
{
439465
return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
440466
}
441467

442468
/*[clinic input]
469+
@critical_section
443470
_queue.SimpleQueue.empty -> bool
444471
445472
Return True if the queue is empty, False otherwise (not reliable!).
446473
[clinic start generated code]*/
447474

448475
static int
449476
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
450-
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
477+
/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
451478
{
452479
return RingBuf_IsEmpty(&self->buf);
453480
}
454481

455482
/*[clinic input]
483+
@critical_section
456484
_queue.SimpleQueue.qsize -> Py_ssize_t
457485
458486
Return the approximate size of the queue (not reliable!).
459487
[clinic start generated code]*/
460488

461489
static Py_ssize_t
462490
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
463-
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
491+
/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
464492
{
465493
return RingBuf_Len(&self->buf);
466494
}

0 commit comments

Comments
 (0)