From aa4fad099f69eb07c7730f08d11dd449818a509a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 15:36:43 -0700 Subject: [PATCH 01/19] Expose interpreters.NotShareableError. --- Lib/test/support/interpreters/__init__.py | 3 ++- Modules/_xxsubinterpretersmodule.c | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index 15a908e9663593..2134211a255bf5 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -6,7 +6,7 @@ # aliases: from _xxsubinterpreters import ( - InterpreterError, InterpreterNotFoundError, + InterpreterError, InterpreterNotFoundError, NotShareableError, is_shareable, ) @@ -15,6 +15,7 @@ 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', 'Interpreter', 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', + 'NotShareableError', 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', ] diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index b4004d165078f7..ade37d7cc9421c 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -1113,6 +1113,7 @@ The 'interpreters' module provides a more convenient interface."); static int module_exec(PyObject *mod) { + PyInterpreterState *interp = PyInterpreterState_Get(); module_state *state = get_module_state(mod); // exceptions @@ -1122,6 +1123,11 @@ module_exec(PyObject *mod) if (PyModule_AddType(mod, (PyTypeObject *)PyExc_InterpreterNotFoundError) < 0) { goto error; } + PyObject *PyExc_NotShareableError = \ + _PyInterpreterState_GetXIState(interp)->PyExc_NotShareableError; + if (PyModule_AddType(mod, (PyTypeObject *)PyExc_NotShareableError) < 0) { + goto error; + } if (register_memoryview_xid(mod, &state->XIBufferViewType) < 0) { goto error; From 01e692f30f6ff0c85b3ffb844ddfd399ea8eb47c Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 15:28:12 -0700 Subject: [PATCH 02/19] Handle a set exception in handle_queue_error(). --- Modules/_xxinterpqueuesmodule.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index 7d8c67f49fefb8..bf1bc645b28ddd 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -294,6 +294,8 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) case ERR_QUEUES_ALLOC: PyErr_NoMemory(); break; + case -1: + return -1; default: state = get_module_state(mod); assert(state->QueueError != NULL); From 18d4c8fdbb3e4ea37487c2ce355c4dec682101bc Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 11:44:26 -0700 Subject: [PATCH 03/19] Queue -> SharedQueue --- Lib/test/support/interpreters/__init__.py | 13 ++-- Lib/test/support/interpreters/queues.py | 14 ++--- Lib/test/test_interpreters/test_api.py | 8 +-- Lib/test/test_interpreters/test_queues.py | 74 +++++++++++------------ 4 files changed, 56 insertions(+), 53 deletions(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index 2134211a255bf5..a16eaf85bded99 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -16,19 +16,22 @@ 'Interpreter', 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', 'NotShareableError', - 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', + 'create_shared_queue', 'SharedQueue', 'QueueEmpty', 'QueueFull', ] _queuemod = None def __getattr__(name): - if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'): - global create_queue, Queue, QueueEmpty, QueueFull + if name in ('QueueEmpty', 'QueueFull', + 'SharedQueue', 'create_shared_queue'): + global QueueEmpty, QueueFull + global create_shared_queue, SharedQueue ns = globals() from .queues import ( - create as create_queue, - Queue, QueueEmpty, QueueFull, + QueueEmpty, QueueFull, + create_shared as create_shared_queue, + SharedQueue, ) return ns[name] else: diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index aead0c40ca9667..dd0f46dcb639a2 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -11,8 +11,8 @@ ) __all__ = [ - 'create', 'list_all', - 'Queue', + 'create_shared', 'list_all', + 'SharedQueue', 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', ] @@ -31,25 +31,25 @@ class QueueFull(_queues.QueueFull, queue.Full): """ -def create(maxsize=0): +def create_shared(maxsize=0): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. """ qid = _queues.create(maxsize) - return Queue(qid) + return SharedQueue(qid) def list_all(): """Return a list of all open queues.""" - return [Queue(qid) + return [SharedQueue(qid) for qid in _queues.list_all()] _known_queues = weakref.WeakValueDictionary() -class Queue: +class SharedQueue: """A cross-interpreter queue.""" def __new__(cls, id, /): @@ -169,4 +169,4 @@ def get_nowait(self): raise # re-raise -_queues._register_queue_type(Queue) +_queues._register_queue_type(SharedQueue) diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index aefd326977095f..f35c28934a83b6 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -16,14 +16,14 @@ class ModuleTests(TestBase): def test_queue_aliases(self): first = [ - interpreters.create_queue, - interpreters.Queue, + interpreters.create_shared_queue, + interpreters.SharedQueue, interpreters.QueueEmpty, interpreters.QueueFull, ] second = [ - interpreters.create_queue, - interpreters.Queue, + interpreters.create_shared_queue, + interpreters.SharedQueue, interpreters.QueueEmpty, interpreters.QueueFull, ] diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 2a8ca99c1f6e3f..ee51cd6f1f7c9d 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -20,50 +20,50 @@ def tearDown(self): pass -class QueueTests(TestBase): +class SharedQueueTests(TestBase): def test_create(self): with self.subTest('vanilla'): - queue = queues.create() + queue = queues.create_shared() self.assertEqual(queue.maxsize, 0) with self.subTest('small maxsize'): - queue = queues.create(3) + queue = queues.create_shared(3) self.assertEqual(queue.maxsize, 3) with self.subTest('big maxsize'): - queue = queues.create(100) + queue = queues.create_shared(100) self.assertEqual(queue.maxsize, 100) with self.subTest('no maxsize'): - queue = queues.create(0) + queue = queues.create_shared(0) self.assertEqual(queue.maxsize, 0) with self.subTest('negative maxsize'): - queue = queues.create(-10) + queue = queues.create_shared(-10) self.assertEqual(queue.maxsize, -10) with self.subTest('bad maxsize'): with self.assertRaises(TypeError): - queues.create('1') + queues.create_shared('1') def test_shareable(self): - queue1 = queues.create() + queue1 = queues.create_shared() interp = interpreters.create() interp.exec_sync(dedent(f""" from test.support.interpreters import queues - queue1 = queues.Queue({queue1.id}) + queue1 = queues.SharedQueue({queue1.id}) """)); with self.subTest('same interpreter'): - queue2 = queues.create() + queue2 = queues.create_shared() queue1.put(queue2) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): - queue4 = queues.create() + queue4 = queues.create_shared() queue1.put(queue4) out = _run_output(interp, dedent(""" queue4 = queue1.get() @@ -74,7 +74,7 @@ def test_shareable(self): with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" - queue5 = queues.create() + queue5 = queues.create_shared() queue1.put(queue5) print(queue5.id) """)) @@ -83,40 +83,40 @@ def test_shareable(self): self.assertEqual(queue5.id, qid) def test_id_type(self): - queue = queues.create() + queue = queues.create_shared() self.assertIsInstance(queue.id, int) def test_custom_id(self): with self.assertRaises(queues.QueueNotFoundError): - queues.Queue(1_000_000) + queues.SharedQueue(1_000_000) def test_id_readonly(self): - queue = queues.create() + queue = queues.create_shared() with self.assertRaises(AttributeError): queue.id = 1_000_000 def test_maxsize_readonly(self): - queue = queues.create(10) + queue = queues.create_shared(10) with self.assertRaises(AttributeError): queue.maxsize = 1_000_000 def test_hashable(self): - queue = queues.create() + queue = queues.create_shared() expected = hash(queue.id) actual = hash(queue) self.assertEqual(actual, expected) def test_equality(self): - queue1 = queues.create() - queue2 = queues.create() + queue1 = queues.create_shared() + queue2 = queues.create_shared() self.assertEqual(queue1, queue1) self.assertNotEqual(queue1, queue2) -class TestQueueOps(TestBase): +class TestSharedQueueOps(TestBase): def test_empty(self): - queue = queues.create() + queue = queues.create_shared() before = queue.empty() queue.put(None) during = queue.empty() @@ -130,7 +130,7 @@ def test_empty(self): def test_full(self): expected = [False, False, False, True, False, False, False] actual = [] - queue = queues.create(3) + queue = queues.create_shared(3) for _ in range(3): actual.append(queue.full()) queue.put(None) @@ -144,7 +144,7 @@ def test_full(self): def test_qsize(self): expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0] actual = [] - queue = queues.create() + queue = queues.create_shared() for _ in range(3): actual.append(queue.qsize()) queue.put(None) @@ -165,7 +165,7 @@ def test_qsize(self): def test_put_get_main(self): expected = list(range(20)) - queue = queues.create() + queue = queues.create_shared() for i in range(20): queue.put(i) actual = [queue.get() for _ in range(20)] @@ -173,7 +173,7 @@ def test_put_get_main(self): self.assertEqual(actual, expected) def test_put_timeout(self): - queue = queues.create(2) + queue = queues.create_shared(2) queue.put(None) queue.put(None) with self.assertRaises(queues.QueueFull): @@ -182,7 +182,7 @@ def test_put_timeout(self): queue.put(None) def test_put_nowait(self): - queue = queues.create(2) + queue = queues.create_shared(2) queue.put_nowait(None) queue.put_nowait(None) with self.assertRaises(queues.QueueFull): @@ -191,12 +191,12 @@ def test_put_nowait(self): queue.put_nowait(None) def test_get_timeout(self): - queue = queues.create() + queue = queues.create_shared() with self.assertRaises(queues.QueueEmpty): queue.get(timeout=0.1) def test_get_nowait(self): - queue = queues.create() + queue = queues.create_shared() with self.assertRaises(queues.QueueEmpty): queue.get_nowait() @@ -204,7 +204,7 @@ def test_put_get_same_interpreter(self): interp = interpreters.create() interp.exec_sync(dedent(""" from test.support.interpreters import queues - queue = queues.create() + queue = queues.create_shared() orig = b'spam' queue.put(orig) obj = queue.get() @@ -214,8 +214,8 @@ def test_put_get_same_interpreter(self): def test_put_get_different_interpreters(self): interp = interpreters.create() - queue1 = queues.create() - queue2 = queues.create() + queue1 = queues.create_shared() + queue2 = queues.create_shared() self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' @@ -225,8 +225,8 @@ def test_put_get_different_interpreters(self): interp, dedent(f""" from test.support.interpreters import queues - queue1 = queues.Queue({queue1.id}) - queue2 = queues.Queue({queue2.id}) + queue1 = queues.SharedQueue({queue1.id}) + queue2 = queues.SharedQueue({queue2.id}) assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1' obj = queue1.get() assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0' @@ -249,13 +249,13 @@ def test_put_get_different_interpreters(self): def test_put_cleared_with_subinterpreter(self): interp = interpreters.create() - queue = queues.create() + queue = queues.create_shared() out = _run_output( interp, dedent(f""" from test.support.interpreters import queues - queue = queues.Queue({queue.id}) + queue = queues.SharedQueue({queue.id}) obj1 = b'spam' obj2 = b'eggs' queue.put(obj1) @@ -271,8 +271,8 @@ def test_put_cleared_with_subinterpreter(self): self.assertEqual(queue.qsize(), 0) def test_put_get_different_threads(self): - queue1 = queues.create() - queue2 = queues.create() + queue1 = queues.create_shared() + queue2 = queues.create_shared() def f(): while True: From cc4dc35216efeac179046b0c29831e6de69e18f2 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 12:01:55 -0700 Subject: [PATCH 04/19] Add Queue. --- Lib/test/support/interpreters/__init__.py | 11 +++++-- Lib/test/support/interpreters/queues.py | 40 ++++++++++++++++++++--- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index a16eaf85bded99..ec9f72cf807117 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -16,7 +16,9 @@ 'Interpreter', 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', 'NotShareableError', - 'create_shared_queue', 'SharedQueue', 'QueueEmpty', 'QueueFull', + 'create_shared_queue', 'create_queue', + 'SharedQueue', 'Queue', + 'QueueEmpty', 'QueueFull', ] @@ -24,14 +26,17 @@ def __getattr__(name): if name in ('QueueEmpty', 'QueueFull', - 'SharedQueue', 'create_shared_queue'): + 'SharedQueue', 'create_shared_queue', + 'Queue', 'create_queue'): global QueueEmpty, QueueFull global create_shared_queue, SharedQueue + global create_queue, Queue ns = globals() from .queues import ( QueueEmpty, QueueFull, create_shared as create_shared_queue, - SharedQueue, + create as create_queue, + SharedQueue, Queue, ) return ns[name] else: diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index dd0f46dcb639a2..4deff1b4ff22fa 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -11,8 +11,8 @@ ) __all__ = [ - 'create_shared', 'list_all', - 'SharedQueue', + 'create_shared', 'create', 'list_all', + 'SharedQueue', 'Queue', 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', ] @@ -35,15 +35,29 @@ def create_shared(maxsize=0): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. + Only "shareable" objects put into the queue. The data is handled + with maximum efficiency. """ - qid = _queues.create(maxsize) + qid = _queues.create(maxsize, sharedonly=True) return SharedQueue(qid) +def create(maxsize=0): + """Return a new cross-interpreter queue. + + The queue may be used to pass data safely between interpreters. + Any object may be put into the queue. Each is serialized, and thus + copied. This approach is not as efficient as queues made with + create_shared(). + """ + qid = _queues.create(maxsize, sharedonly=False) + return Queue(qid) + + def list_all(): """Return a list of all open queues.""" - return [SharedQueue(qid) - for qid in _queues.list_all()] + return [SharedQueue(qid) if sharedonly else Queue(qid) + for qid, sharedonly in _queues.list_all()] @@ -109,6 +123,8 @@ def put(self, obj, timeout=None, *, ): """Add the object to the queue. + The object must be "shareable". + This blocks while the queue is full. """ if timeout is not None: @@ -169,4 +185,18 @@ def get_nowait(self): raise # re-raise +class Queue(SharedQueue): + """A cross-interpreter queue.""" + + def put(self, obj, timeout=None): + """Add the object to the queue. + + All objects are supported. + + This blocks while the queue is full. + """ + super().put(obj, timeout) + + _queues._register_queue_type(SharedQueue) +_queues._register_queue_type(Queue) From ac66dc04e4695b52024ba0905fe48625cbfd4b55 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 12:03:13 -0700 Subject: [PATCH 05/19] Revert to a single Queue class. --- Lib/test/support/interpreters/__init__.py | 16 ++--- Lib/test/support/interpreters/queues.py | 42 ++----------- Lib/test/test_interpreters/test_api.py | 8 +-- Lib/test/test_interpreters/test_queues.py | 74 +++++++++++------------ 4 files changed, 51 insertions(+), 89 deletions(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index ec9f72cf807117..2134211a255bf5 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -16,27 +16,19 @@ 'Interpreter', 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', 'NotShareableError', - 'create_shared_queue', 'create_queue', - 'SharedQueue', 'Queue', - 'QueueEmpty', 'QueueFull', + 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', ] _queuemod = None def __getattr__(name): - if name in ('QueueEmpty', 'QueueFull', - 'SharedQueue', 'create_shared_queue', - 'Queue', 'create_queue'): - global QueueEmpty, QueueFull - global create_shared_queue, SharedQueue - global create_queue, Queue + if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'): + global create_queue, Queue, QueueEmpty, QueueFull ns = globals() from .queues import ( - QueueEmpty, QueueFull, - create_shared as create_shared_queue, create as create_queue, - SharedQueue, Queue, + Queue, QueueEmpty, QueueFull, ) return ns[name] else: diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 4deff1b4ff22fa..aead0c40ca9667 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -11,8 +11,8 @@ ) __all__ = [ - 'create_shared', 'create', 'list_all', - 'SharedQueue', 'Queue', + 'create', 'list_all', + 'Queue', 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', ] @@ -31,39 +31,25 @@ class QueueFull(_queues.QueueFull, queue.Full): """ -def create_shared(maxsize=0): - """Return a new cross-interpreter queue. - - The queue may be used to pass data safely between interpreters. - Only "shareable" objects put into the queue. The data is handled - with maximum efficiency. - """ - qid = _queues.create(maxsize, sharedonly=True) - return SharedQueue(qid) - - def create(maxsize=0): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. - Any object may be put into the queue. Each is serialized, and thus - copied. This approach is not as efficient as queues made with - create_shared(). """ - qid = _queues.create(maxsize, sharedonly=False) + qid = _queues.create(maxsize) return Queue(qid) def list_all(): """Return a list of all open queues.""" - return [SharedQueue(qid) if sharedonly else Queue(qid) - for qid, sharedonly in _queues.list_all()] + return [Queue(qid) + for qid in _queues.list_all()] _known_queues = weakref.WeakValueDictionary() -class SharedQueue: +class Queue: """A cross-interpreter queue.""" def __new__(cls, id, /): @@ -123,8 +109,6 @@ def put(self, obj, timeout=None, *, ): """Add the object to the queue. - The object must be "shareable". - This blocks while the queue is full. """ if timeout is not None: @@ -185,18 +169,4 @@ def get_nowait(self): raise # re-raise -class Queue(SharedQueue): - """A cross-interpreter queue.""" - - def put(self, obj, timeout=None): - """Add the object to the queue. - - All objects are supported. - - This blocks while the queue is full. - """ - super().put(obj, timeout) - - -_queues._register_queue_type(SharedQueue) _queues._register_queue_type(Queue) diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index f35c28934a83b6..aefd326977095f 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -16,14 +16,14 @@ class ModuleTests(TestBase): def test_queue_aliases(self): first = [ - interpreters.create_shared_queue, - interpreters.SharedQueue, + interpreters.create_queue, + interpreters.Queue, interpreters.QueueEmpty, interpreters.QueueFull, ] second = [ - interpreters.create_shared_queue, - interpreters.SharedQueue, + interpreters.create_queue, + interpreters.Queue, interpreters.QueueEmpty, interpreters.QueueFull, ] diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index ee51cd6f1f7c9d..2a8ca99c1f6e3f 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -20,50 +20,50 @@ def tearDown(self): pass -class SharedQueueTests(TestBase): +class QueueTests(TestBase): def test_create(self): with self.subTest('vanilla'): - queue = queues.create_shared() + queue = queues.create() self.assertEqual(queue.maxsize, 0) with self.subTest('small maxsize'): - queue = queues.create_shared(3) + queue = queues.create(3) self.assertEqual(queue.maxsize, 3) with self.subTest('big maxsize'): - queue = queues.create_shared(100) + queue = queues.create(100) self.assertEqual(queue.maxsize, 100) with self.subTest('no maxsize'): - queue = queues.create_shared(0) + queue = queues.create(0) self.assertEqual(queue.maxsize, 0) with self.subTest('negative maxsize'): - queue = queues.create_shared(-10) + queue = queues.create(-10) self.assertEqual(queue.maxsize, -10) with self.subTest('bad maxsize'): with self.assertRaises(TypeError): - queues.create_shared('1') + queues.create('1') def test_shareable(self): - queue1 = queues.create_shared() + queue1 = queues.create() interp = interpreters.create() interp.exec_sync(dedent(f""" from test.support.interpreters import queues - queue1 = queues.SharedQueue({queue1.id}) + queue1 = queues.Queue({queue1.id}) """)); with self.subTest('same interpreter'): - queue2 = queues.create_shared() + queue2 = queues.create() queue1.put(queue2) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): - queue4 = queues.create_shared() + queue4 = queues.create() queue1.put(queue4) out = _run_output(interp, dedent(""" queue4 = queue1.get() @@ -74,7 +74,7 @@ def test_shareable(self): with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" - queue5 = queues.create_shared() + queue5 = queues.create() queue1.put(queue5) print(queue5.id) """)) @@ -83,40 +83,40 @@ def test_shareable(self): self.assertEqual(queue5.id, qid) def test_id_type(self): - queue = queues.create_shared() + queue = queues.create() self.assertIsInstance(queue.id, int) def test_custom_id(self): with self.assertRaises(queues.QueueNotFoundError): - queues.SharedQueue(1_000_000) + queues.Queue(1_000_000) def test_id_readonly(self): - queue = queues.create_shared() + queue = queues.create() with self.assertRaises(AttributeError): queue.id = 1_000_000 def test_maxsize_readonly(self): - queue = queues.create_shared(10) + queue = queues.create(10) with self.assertRaises(AttributeError): queue.maxsize = 1_000_000 def test_hashable(self): - queue = queues.create_shared() + queue = queues.create() expected = hash(queue.id) actual = hash(queue) self.assertEqual(actual, expected) def test_equality(self): - queue1 = queues.create_shared() - queue2 = queues.create_shared() + queue1 = queues.create() + queue2 = queues.create() self.assertEqual(queue1, queue1) self.assertNotEqual(queue1, queue2) -class TestSharedQueueOps(TestBase): +class TestQueueOps(TestBase): def test_empty(self): - queue = queues.create_shared() + queue = queues.create() before = queue.empty() queue.put(None) during = queue.empty() @@ -130,7 +130,7 @@ def test_empty(self): def test_full(self): expected = [False, False, False, True, False, False, False] actual = [] - queue = queues.create_shared(3) + queue = queues.create(3) for _ in range(3): actual.append(queue.full()) queue.put(None) @@ -144,7 +144,7 @@ def test_full(self): def test_qsize(self): expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0] actual = [] - queue = queues.create_shared() + queue = queues.create() for _ in range(3): actual.append(queue.qsize()) queue.put(None) @@ -165,7 +165,7 @@ def test_qsize(self): def test_put_get_main(self): expected = list(range(20)) - queue = queues.create_shared() + queue = queues.create() for i in range(20): queue.put(i) actual = [queue.get() for _ in range(20)] @@ -173,7 +173,7 @@ def test_put_get_main(self): self.assertEqual(actual, expected) def test_put_timeout(self): - queue = queues.create_shared(2) + queue = queues.create(2) queue.put(None) queue.put(None) with self.assertRaises(queues.QueueFull): @@ -182,7 +182,7 @@ def test_put_timeout(self): queue.put(None) def test_put_nowait(self): - queue = queues.create_shared(2) + queue = queues.create(2) queue.put_nowait(None) queue.put_nowait(None) with self.assertRaises(queues.QueueFull): @@ -191,12 +191,12 @@ def test_put_nowait(self): queue.put_nowait(None) def test_get_timeout(self): - queue = queues.create_shared() + queue = queues.create() with self.assertRaises(queues.QueueEmpty): queue.get(timeout=0.1) def test_get_nowait(self): - queue = queues.create_shared() + queue = queues.create() with self.assertRaises(queues.QueueEmpty): queue.get_nowait() @@ -204,7 +204,7 @@ def test_put_get_same_interpreter(self): interp = interpreters.create() interp.exec_sync(dedent(""" from test.support.interpreters import queues - queue = queues.create_shared() + queue = queues.create() orig = b'spam' queue.put(orig) obj = queue.get() @@ -214,8 +214,8 @@ def test_put_get_same_interpreter(self): def test_put_get_different_interpreters(self): interp = interpreters.create() - queue1 = queues.create_shared() - queue2 = queues.create_shared() + queue1 = queues.create() + queue2 = queues.create() self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' @@ -225,8 +225,8 @@ def test_put_get_different_interpreters(self): interp, dedent(f""" from test.support.interpreters import queues - queue1 = queues.SharedQueue({queue1.id}) - queue2 = queues.SharedQueue({queue2.id}) + queue1 = queues.Queue({queue1.id}) + queue2 = queues.Queue({queue2.id}) assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1' obj = queue1.get() assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0' @@ -249,13 +249,13 @@ def test_put_get_different_interpreters(self): def test_put_cleared_with_subinterpreter(self): interp = interpreters.create() - queue = queues.create_shared() + queue = queues.create() out = _run_output( interp, dedent(f""" from test.support.interpreters import queues - queue = queues.SharedQueue({queue.id}) + queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' queue.put(obj1) @@ -271,8 +271,8 @@ def test_put_cleared_with_subinterpreter(self): self.assertEqual(queue.qsize(), 0) def test_put_get_different_threads(self): - queue1 = queues.create_shared() - queue2 = queues.create_shared() + queue1 = queues.create() + queue2 = queues.create() def f(): while True: From df5c337b08939159e767e610155dcfc9dd029844 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 12:57:30 -0700 Subject: [PATCH 06/19] Add the item format. --- Modules/_xxinterpqueuesmodule.c | 98 +++++++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 16 deletions(-) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index bf1bc645b28ddd..ae8ace300de3e1 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -316,20 +316,56 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) } +/* queue item formats *******************************************************/ + +enum item_format { + ITEM_FORMAT_SHARED, +}; + +static PyObject * +convert_object(PyObject *obj, enum item_format fmt) +{ + if (fmt == ITEM_FORMAT_SHARED) { + return Py_NewRef(obj); + } + else { + assert(0 && "format not implemented"); + Py_FatalError("format not implemented"); + return NULL; + } +} + +static PyObject * +unconvert_object(PyObject *obj, enum item_format fmt) +{ + if (fmt == ITEM_FORMAT_SHARED) { + return obj; + } + else { + assert(0 && "format not implemented"); + Py_FatalError("format not implemented"); + return NULL; + } +} + + /* the basic queue **********************************************************/ struct _queueitem; typedef struct _queueitem { _PyCrossInterpreterData *data; + enum item_format fmt; struct _queueitem *next; } _queueitem; static void -_queueitem_init(_queueitem *item, _PyCrossInterpreterData *data) +_queueitem_init(_queueitem *item, + _PyCrossInterpreterData *data, enum item_format fmt) { *item = (_queueitem){ .data = data, + .fmt = fmt, }; } @@ -346,14 +382,14 @@ _queueitem_clear(_queueitem *item) } static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data) +_queueitem_new(_PyCrossInterpreterData *data, enum item_format fmt) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, data); + _queueitem_init(item, data, fmt); return item; } @@ -375,9 +411,11 @@ _queueitem_free_all(_queueitem *item) } static void -_queueitem_popped(_queueitem *item, _PyCrossInterpreterData **p_data) +_queueitem_popped(_queueitem *item, + _PyCrossInterpreterData **p_data, enum item_format *p_fmt) { *p_data = item->data; + *p_fmt = item->fmt; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; _queueitem_free(item); @@ -488,7 +526,7 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data) +_queue_add(_queue *queue, _PyCrossInterpreterData *data, enum item_format fmt) { int err = _queue_lock(queue); if (err < 0) { @@ -504,7 +542,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data) return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(data); + _queueitem *item = _queueitem_new(data, fmt); if (item == NULL) { _queue_unlock(queue); return -1; @@ -524,7 +562,8 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data) } static int -_queue_next(_queue *queue, _PyCrossInterpreterData **p_data) +_queue_next(_queue *queue, + _PyCrossInterpreterData **p_data, enum item_format *p_fmt) { int err = _queue_lock(queue); if (err < 0) { @@ -543,7 +582,7 @@ _queue_next(_queue *queue, _PyCrossInterpreterData **p_data) } queue->items.count -= 1; - _queueitem_popped(item, p_data); + _queueitem_popped(item, p_data, p_fmt); _queue_unlock(queue); return 0; @@ -927,7 +966,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj) +queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) { // Look up the queue. _queue *queue = NULL; @@ -937,6 +976,11 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj) } assert(queue != NULL); + obj = convert_object(obj, fmt); + if (obj == NULL) { + return -1; + } + // Convert the object to cross-interpreter data. _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData); if (data == NULL) { @@ -948,9 +992,10 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj) GLOBAL_FREE(data); return -1; } + Py_DECREF(obj); // Add the data to the queue. - int res = _queue_add(queue, data); + int res = _queue_add(queue, data, fmt); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: @@ -981,7 +1026,8 @@ queue_get(_queues *queues, int64_t qid, PyObject **res) // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; - err = _queue_next(queue, &data); + enum item_format fmt; + err = _queue_next(queue, &data, &fmt); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1008,6 +1054,13 @@ queue_get(_queues *queues, int64_t qid, PyObject **res) return -1; } + PyObject *actual = unconvert_object(obj, fmt); + if (actual == NULL) { + Py_DECREF(obj); + return -1; + } + obj = actual; + *res = obj; return 0; } @@ -1365,17 +1418,30 @@ Return the list of IDs for all queues."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"qid", "obj", NULL}; + static char *kwlist[] = {"qid", "obj", "sharedonly", NULL}; qidarg_converter_data qidarg; PyObject *obj; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:put", kwlist, - qidarg_converter, &qidarg, &obj)) { + int sharedonly = -1; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|p:put", kwlist, + qidarg_converter, &qidarg, &obj, + &sharedonly)) { return NULL; } int64_t qid = qidarg.id; + enum item_format fmt; + if (sharedonly == -1) { + sharedonly = 1; + } + if (sharedonly) { + fmt = ITEM_FORMAT_SHARED; + } + else { + PyErr_SetNone(PyExc_NotImplementedError); + return NULL; + } /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj); + int err = queue_put(&_globals.queues, qid, obj, fmt); if (handle_queue_error(err, self, qid)) { return NULL; } @@ -1384,7 +1450,7 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(queuesmod_put_doc, -"put(qid, obj)\n\ +"put(qid, obj, sharedonly=False)\n\ \n\ Add the object's data to the queue."); From 5e5bc37103459ff0e045da85cb101da0c6c00a36 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 13:22:05 -0700 Subject: [PATCH 07/19] Add pickle as a supported format. --- Modules/_xxinterpqueuesmodule.c | 61 ++++++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index ae8ace300de3e1..be3f8a12a3438d 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -130,7 +130,27 @@ idarg_int64_converter(PyObject *arg, void *ptr) /* module state *************************************************************/ +struct cached_deps { + PyObject *picklemod; +}; + +static PyObject * +get_picklemod(struct cached_deps *cached) +{ + PyObject *picklemod = cached->picklemod; + if (picklemod == NULL) { + picklemod = PyImport_ImportModule("pickle"); + if (picklemod == NULL) { + return NULL; + } + cached->picklemod = picklemod; + } + return picklemod; +} + typedef struct { + struct cached_deps cached; + /* external types (added at runtime by interpreters module) */ PyTypeObject *queue_type; @@ -153,6 +173,8 @@ get_module_state(PyObject *mod) static int traverse_module_state(module_state *state, visitproc visit, void *arg) { + Py_VISIT(state->cached.picklemod); + /* external types */ Py_VISIT(state->queue_type); @@ -168,6 +190,8 @@ traverse_module_state(module_state *state, visitproc visit, void *arg) static int clear_module_state(module_state *state) { + Py_CLEAR(state->cached.picklemod); + /* external types */ Py_CLEAR(state->queue_type); @@ -320,14 +344,22 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) enum item_format { ITEM_FORMAT_SHARED, + ITEM_FORMAT_PICKLED, }; static PyObject * -convert_object(PyObject *obj, enum item_format fmt) +convert_object(PyObject *obj, enum item_format fmt, struct cached_deps *cached) { if (fmt == ITEM_FORMAT_SHARED) { return Py_NewRef(obj); } + else if (fmt == ITEM_FORMAT_PICKLED) { + PyObject *picklemod = get_picklemod(cached); + if (picklemod == NULL) { + return NULL; + } + return PyObject_CallMethod(picklemod, "dumps", "O", obj); + } else { assert(0 && "format not implemented"); Py_FatalError("format not implemented"); @@ -336,11 +368,18 @@ convert_object(PyObject *obj, enum item_format fmt) } static PyObject * -unconvert_object(PyObject *obj, enum item_format fmt) +unconvert_object(PyObject *obj, enum item_format fmt, struct cached_deps *cached) { if (fmt == ITEM_FORMAT_SHARED) { return obj; } + else if (fmt == ITEM_FORMAT_PICKLED) { + PyObject *picklemod = get_picklemod(cached); + if (picklemod == NULL) { + return NULL; + } + return PyObject_CallMethod(picklemod, "loads", "O", obj); + } else { assert(0 && "format not implemented"); Py_FatalError("format not implemented"); @@ -966,7 +1005,8 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) +queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt, + struct cached_deps *cached) { // Look up the queue. _queue *queue = NULL; @@ -976,7 +1016,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) } assert(queue != NULL); - obj = convert_object(obj, fmt); + obj = convert_object(obj, fmt, cached); if (obj == NULL) { return -1; } @@ -1010,7 +1050,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int -queue_get(_queues *queues, int64_t qid, PyObject **res) +queue_get(_queues *queues, int64_t qid, PyObject **res, + struct cached_deps *cached) { int err; *res = NULL; @@ -1054,7 +1095,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res) return -1; } - PyObject *actual = unconvert_object(obj, fmt); + PyObject *actual = unconvert_object(obj, fmt, cached); if (actual == NULL) { Py_DECREF(obj); return -1; @@ -1440,8 +1481,10 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } + module_state *state = get_module_state(self); + /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, fmt); + int err = queue_put(&_globals.queues, qid, obj, fmt, &state->cached); if (handle_queue_error(err, self, qid)) { return NULL; } @@ -1466,8 +1509,10 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) } int64_t qid = qidarg.id; + module_state *state = get_module_state(self); + PyObject *obj = NULL; - int err = queue_get(&_globals.queues, qid, &obj); + int err = queue_get(&_globals.queues, qid, &obj, &state->cached); if (err == ERR_QUEUE_EMPTY && dflt != NULL) { assert(obj == NULL); obj = Py_NewRef(dflt); From 1e8b6ade48fac7539b3cde573cc26204ac9605ca Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 13:22:38 -0700 Subject: [PATCH 08/19] Revert "Add pickle as a supported format." This reverts commit 90085abb8812c1231f96d5b8c7288c1f2c4410aa. --- Modules/_xxinterpqueuesmodule.c | 61 +++++---------------------------- 1 file changed, 8 insertions(+), 53 deletions(-) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index be3f8a12a3438d..ae8ace300de3e1 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -130,27 +130,7 @@ idarg_int64_converter(PyObject *arg, void *ptr) /* module state *************************************************************/ -struct cached_deps { - PyObject *picklemod; -}; - -static PyObject * -get_picklemod(struct cached_deps *cached) -{ - PyObject *picklemod = cached->picklemod; - if (picklemod == NULL) { - picklemod = PyImport_ImportModule("pickle"); - if (picklemod == NULL) { - return NULL; - } - cached->picklemod = picklemod; - } - return picklemod; -} - typedef struct { - struct cached_deps cached; - /* external types (added at runtime by interpreters module) */ PyTypeObject *queue_type; @@ -173,8 +153,6 @@ get_module_state(PyObject *mod) static int traverse_module_state(module_state *state, visitproc visit, void *arg) { - Py_VISIT(state->cached.picklemod); - /* external types */ Py_VISIT(state->queue_type); @@ -190,8 +168,6 @@ traverse_module_state(module_state *state, visitproc visit, void *arg) static int clear_module_state(module_state *state) { - Py_CLEAR(state->cached.picklemod); - /* external types */ Py_CLEAR(state->queue_type); @@ -344,22 +320,14 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) enum item_format { ITEM_FORMAT_SHARED, - ITEM_FORMAT_PICKLED, }; static PyObject * -convert_object(PyObject *obj, enum item_format fmt, struct cached_deps *cached) +convert_object(PyObject *obj, enum item_format fmt) { if (fmt == ITEM_FORMAT_SHARED) { return Py_NewRef(obj); } - else if (fmt == ITEM_FORMAT_PICKLED) { - PyObject *picklemod = get_picklemod(cached); - if (picklemod == NULL) { - return NULL; - } - return PyObject_CallMethod(picklemod, "dumps", "O", obj); - } else { assert(0 && "format not implemented"); Py_FatalError("format not implemented"); @@ -368,18 +336,11 @@ convert_object(PyObject *obj, enum item_format fmt, struct cached_deps *cached) } static PyObject * -unconvert_object(PyObject *obj, enum item_format fmt, struct cached_deps *cached) +unconvert_object(PyObject *obj, enum item_format fmt) { if (fmt == ITEM_FORMAT_SHARED) { return obj; } - else if (fmt == ITEM_FORMAT_PICKLED) { - PyObject *picklemod = get_picklemod(cached); - if (picklemod == NULL) { - return NULL; - } - return PyObject_CallMethod(picklemod, "loads", "O", obj); - } else { assert(0 && "format not implemented"); Py_FatalError("format not implemented"); @@ -1005,8 +966,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt, - struct cached_deps *cached) +queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) { // Look up the queue. _queue *queue = NULL; @@ -1016,7 +976,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt, } assert(queue != NULL); - obj = convert_object(obj, fmt, cached); + obj = convert_object(obj, fmt); if (obj == NULL) { return -1; } @@ -1050,8 +1010,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt, // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int -queue_get(_queues *queues, int64_t qid, PyObject **res, - struct cached_deps *cached) +queue_get(_queues *queues, int64_t qid, PyObject **res) { int err; *res = NULL; @@ -1095,7 +1054,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, return -1; } - PyObject *actual = unconvert_object(obj, fmt, cached); + PyObject *actual = unconvert_object(obj, fmt); if (actual == NULL) { Py_DECREF(obj); return -1; @@ -1481,10 +1440,8 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } - module_state *state = get_module_state(self); - /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, fmt, &state->cached); + int err = queue_put(&_globals.queues, qid, obj, fmt); if (handle_queue_error(err, self, qid)) { return NULL; } @@ -1509,10 +1466,8 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) } int64_t qid = qidarg.id; - module_state *state = get_module_state(self); - PyObject *obj = NULL; - int err = queue_get(&_globals.queues, qid, &obj, &state->cached); + int err = queue_get(&_globals.queues, qid, &obj); if (err == ERR_QUEUE_EMPTY && dflt != NULL) { assert(obj == NULL); obj = Py_NewRef(dflt); From 681f31663dbd9c2bac458fa216ae9c071f25425a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 13:50:58 -0700 Subject: [PATCH 09/19] Simplify the format. --- Lib/test/support/interpreters/queues.py | 12 +++- Modules/_xxinterpqueuesmodule.c | 93 +++++-------------------- 2 files changed, 28 insertions(+), 77 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index aead0c40ca9667..9b807a87ed9108 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -46,6 +46,7 @@ def list_all(): for qid in _queues.list_all()] +_SHARED_ONLY = 0 _known_queues = weakref.WeakValueDictionary() @@ -111,6 +112,7 @@ def put(self, obj, timeout=None, *, This blocks while the queue is full. """ + fmt = _SHARED_ONLY if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -118,7 +120,7 @@ def put(self, obj, timeout=None, *, end = time.time() + timeout while True: try: - _queues.put(self._id, obj) + _queues.put(self._id, obj, fmt) except _queues.QueueFull as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueFull @@ -128,8 +130,9 @@ def put(self, obj, timeout=None, *, break def put_nowait(self, obj): + fmt = _SHARED_ONLY try: - return _queues.put(self._id, obj) + return _queues.put(self._id, obj, fmt) except _queues.QueueFull as exc: exc.__class__ = QueueFull raise # re-raise @@ -148,12 +151,15 @@ def get(self, timeout=None, *, end = time.time() + timeout while True: try: - return _queues.get(self._id) + obj, fmt = _queues.get(self._id) except _queues.QueueEmpty as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueEmpty raise # re-raise time.sleep(_delay) + else: + break + assert fmt == _SHARED_ONLY return obj def get_nowait(self): diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index ae8ace300de3e1..7f2e9fbf2d199b 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -316,52 +316,19 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) } -/* queue item formats *******************************************************/ - -enum item_format { - ITEM_FORMAT_SHARED, -}; - -static PyObject * -convert_object(PyObject *obj, enum item_format fmt) -{ - if (fmt == ITEM_FORMAT_SHARED) { - return Py_NewRef(obj); - } - else { - assert(0 && "format not implemented"); - Py_FatalError("format not implemented"); - return NULL; - } -} - -static PyObject * -unconvert_object(PyObject *obj, enum item_format fmt) -{ - if (fmt == ITEM_FORMAT_SHARED) { - return obj; - } - else { - assert(0 && "format not implemented"); - Py_FatalError("format not implemented"); - return NULL; - } -} - - /* the basic queue **********************************************************/ struct _queueitem; typedef struct _queueitem { _PyCrossInterpreterData *data; - enum item_format fmt; + int fmt; struct _queueitem *next; } _queueitem; static void _queueitem_init(_queueitem *item, - _PyCrossInterpreterData *data, enum item_format fmt) + _PyCrossInterpreterData *data, int fmt) { *item = (_queueitem){ .data = data, @@ -382,7 +349,7 @@ _queueitem_clear(_queueitem *item) } static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data, enum item_format fmt) +_queueitem_new(_PyCrossInterpreterData *data, int fmt) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { @@ -412,7 +379,7 @@ _queueitem_free_all(_queueitem *item) static void _queueitem_popped(_queueitem *item, - _PyCrossInterpreterData **p_data, enum item_format *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt) { *p_data = item->data; *p_fmt = item->fmt; @@ -526,7 +493,7 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data, enum item_format fmt) +_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) { int err = _queue_lock(queue); if (err < 0) { @@ -563,7 +530,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, enum item_format fmt) static int _queue_next(_queue *queue, - _PyCrossInterpreterData **p_data, enum item_format *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt) { int err = _queue_lock(queue); if (err < 0) { @@ -966,7 +933,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) +queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) { // Look up the queue. _queue *queue = NULL; @@ -976,11 +943,6 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) } assert(queue != NULL); - obj = convert_object(obj, fmt); - if (obj == NULL) { - return -1; - } - // Convert the object to cross-interpreter data. _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData); if (data == NULL) { @@ -992,7 +954,6 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) GLOBAL_FREE(data); return -1; } - Py_DECREF(obj); // Add the data to the queue. int res = _queue_add(queue, data, fmt); @@ -1010,7 +971,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, enum item_format fmt) // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int -queue_get(_queues *queues, int64_t qid, PyObject **res) +queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) { int err; *res = NULL; @@ -1026,8 +987,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res) // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; - enum item_format fmt; - err = _queue_next(queue, &data, &fmt); + err = _queue_next(queue, &data, p_fmt); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1054,13 +1014,6 @@ queue_get(_queues *queues, int64_t qid, PyObject **res) return -1; } - PyObject *actual = unconvert_object(obj, fmt); - if (actual == NULL) { - Py_DECREF(obj); - return -1; - } - obj = actual; - *res = obj; return 0; } @@ -1418,27 +1371,15 @@ Return the list of IDs for all queues."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"qid", "obj", "sharedonly", NULL}; + static char *kwlist[] = {"qid", "obj", "fmt", NULL}; qidarg_converter_data qidarg; PyObject *obj; - int sharedonly = -1; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|p:put", kwlist, - qidarg_converter, &qidarg, &obj, - &sharedonly)) { + int fmt; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist, + qidarg_converter, &qidarg, &obj, &fmt)) { return NULL; } int64_t qid = qidarg.id; - enum item_format fmt; - if (sharedonly == -1) { - sharedonly = 1; - } - if (sharedonly) { - fmt = ITEM_FORMAT_SHARED; - } - else { - PyErr_SetNone(PyExc_NotImplementedError); - return NULL; - } /* Queue up the object. */ int err = queue_put(&_globals.queues, qid, obj, fmt); @@ -1467,7 +1408,8 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) int64_t qid = qidarg.id; PyObject *obj = NULL; - int err = queue_get(&_globals.queues, qid, &obj); + int fmt; + int err = queue_get(&_globals.queues, qid, &obj, &fmt); if (err == ERR_QUEUE_EMPTY && dflt != NULL) { assert(obj == NULL); obj = Py_NewRef(dflt); @@ -1475,7 +1417,10 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) else if (handle_queue_error(err, self, qid)) { return NULL; } - return obj; + + PyObject *res = Py_BuildValue("Oi", obj, fmt); + Py_DECREF(obj); + return res; } PyDoc_STRVAR(queuesmod_get_doc, From 2b891dfdf04c9919f9447e33f3afb1210f0de36c Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 14:21:23 -0700 Subject: [PATCH 10/19] Add support for pickling. --- Lib/test/support/interpreters/queues.py | 24 ++++- Lib/test/test_interpreters/test_queues.py | 119 +++++++++++++++------- 2 files changed, 104 insertions(+), 39 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 9b807a87ed9108..810cd8354d17a7 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -1,5 +1,6 @@ """Cross-interpreter Queues High Level Module.""" +import pickle import queue import time import weakref @@ -47,6 +48,7 @@ def list_all(): _SHARED_ONLY = 0 +_PICKLED = 1 _known_queues = weakref.WeakValueDictionary() @@ -106,18 +108,25 @@ def qsize(self): return _queues.get_count(self._id) def put(self, obj, timeout=None, *, + sharedonly=False, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. This blocks while the queue is full. + + If "sharedonly" is true then the object must be "shareable". + It will be passed through the queue efficiently. Otherwise + all objects are supported, at the expense of worse performance. """ - fmt = _SHARED_ONLY + fmt = _SHARED_ONLY if sharedonly else _PICKLED if timeout is not None: timeout = int(timeout) if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout + if fmt is _PICKLED: + obj = pickle.dumps(obj) while True: try: _queues.put(self._id, obj, fmt) @@ -129,10 +138,12 @@ def put(self, obj, timeout=None, *, else: break - def put_nowait(self, obj): - fmt = _SHARED_ONLY + def put_nowait(self, obj, *, sharedonly=False): + fmt = _SHARED_ONLY if sharedonly else _PICKLED + if fmt is _PICKLED: + obj = pickle.dumps(obj) try: - return _queues.put(self._id, obj, fmt) + _queues.put(self._id, obj, fmt) except _queues.QueueFull as exc: exc.__class__ = QueueFull raise # re-raise @@ -159,7 +170,10 @@ def get(self, timeout=None, *, time.sleep(_delay) else: break - assert fmt == _SHARED_ONLY + if fmt == _PICKLED: + obj = pickle.loads(obj) + else: + assert fmt == _SHARED_ONLY return obj def get_nowait(self): diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 2a8ca99c1f6e3f..7da786b5243407 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -58,13 +58,13 @@ def test_shareable(self): with self.subTest('same interpreter'): queue2 = queues.create() - queue1.put(queue2) + queue1.put(queue2, sharedonly=True) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): queue4 = queues.create() - queue1.put(queue4) + queue1.put(queue4, sharedonly=True) out = _run_output(interp, dedent(""" queue4 = queue1.get() print(queue4.id) @@ -75,7 +75,7 @@ def test_shareable(self): with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" queue5 = queues.create() - queue1.put(queue5) + queue1.put(queue5, sharedonly=True) print(queue5.id) """)) qid = int(out) @@ -118,7 +118,7 @@ class TestQueueOps(TestBase): def test_empty(self): queue = queues.create() before = queue.empty() - queue.put(None) + queue.put(None, sharedonly=True) during = queue.empty() queue.get() after = queue.empty() @@ -133,7 +133,7 @@ def test_full(self): queue = queues.create(3) for _ in range(3): actual.append(queue.full()) - queue.put(None) + queue.put(None, sharedonly=True) actual.append(queue.full()) for _ in range(3): queue.get() @@ -147,16 +147,16 @@ def test_qsize(self): queue = queues.create() for _ in range(3): actual.append(queue.qsize()) - queue.put(None) + queue.put(None, sharedonly=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) - queue.put(None) + queue.put(None, sharedonly=True) actual.append(queue.qsize()) for _ in range(3): queue.get() actual.append(queue.qsize()) - queue.put(None) + queue.put(None, sharedonly=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) @@ -165,30 +165,81 @@ def test_qsize(self): def test_put_get_main(self): expected = list(range(20)) - queue = queues.create() - for i in range(20): - queue.put(i) - actual = [queue.get() for _ in range(20)] + for sharedonly in (True, False): + kwds = dict(sharedonly=sharedonly) + with self.subTest(f'sharedonly={sharedonly}'): + queue = queues.create() + for i in range(20): + queue.put(i, **kwds) + actual = [queue.get() for _ in range(20)] - self.assertEqual(actual, expected) + self.assertEqual(actual, expected) def test_put_timeout(self): - queue = queues.create(2) - queue.put(None) - queue.put(None) - with self.assertRaises(queues.QueueFull): - queue.put(None, timeout=0.1) - queue.get() - queue.put(None) + for sharedonly in (True, False): + kwds = dict(sharedonly=sharedonly) + with self.subTest(f'sharedonly={sharedonly}'): + queue = queues.create(2) + queue.put(None, **kwds) + queue.put(None, **kwds) + with self.assertRaises(queues.QueueFull): + queue.put(None, timeout=0.1, **kwds) + queue.get() + queue.put(None, **kwds) def test_put_nowait(self): - queue = queues.create(2) - queue.put_nowait(None) - queue.put_nowait(None) - with self.assertRaises(queues.QueueFull): - queue.put_nowait(None) - queue.get() - queue.put_nowait(None) + for sharedonly in (True, False): + kwds = dict(sharedonly=sharedonly) + with self.subTest(f'sharedonly={sharedonly}'): + queue = queues.create(2) + queue.put_nowait(None, **kwds) + queue.put_nowait(None, **kwds) + with self.assertRaises(queues.QueueFull): + queue.put_nowait(None, **kwds) + queue.get() + queue.put_nowait(None, **kwds) + + def test_put_sharedonly(self): + for obj in [ + None, + True, + 10, + 'spam', + b'spam', + (0, 'a'), + ]: + with self.subTest(repr(obj)): + queue = queues.create() + queue.put(obj, sharedonly=True) + obj2 = queue.get() + self.assertEqual(obj2, obj) + + for obj in [ + [1, 2, 3], + {'a': 13, 'b': 17}, + ]: + with self.subTest(repr(obj)): + queue = queues.create() + with self.assertRaises(interpreters.NotShareableError): + queue.put(obj, sharedonly=True) + + def test_put_not_sharedonly(self): + for obj in [ + None, + True, + 10, + 'spam', + b'spam', + (0, 'a'), + # not shareable + [1, 2, 3], + {'a': 13, 'b': 17}, + ]: + with self.subTest(repr(obj)): + queue = queues.create() + queue.put(obj, sharedonly=False) + obj2 = queue.get() + self.assertEqual(obj2, obj) def test_get_timeout(self): queue = queues.create() @@ -206,7 +257,7 @@ def test_put_get_same_interpreter(self): from test.support.interpreters import queues queue = queues.create() orig = b'spam' - queue.put(orig) + queue.put(orig, sharedonly=True) obj = queue.get() assert obj == orig, 'expected: obj == orig' assert obj is not orig, 'expected: obj is not orig' @@ -219,7 +270,7 @@ def test_put_get_different_interpreters(self): self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' - queue1.put(obj1) + queue1.put(obj1, sharedonly=True) out = _run_output( interp, @@ -236,7 +287,7 @@ def test_put_get_different_interpreters(self): obj2 = b'eggs' print(id(obj2)) assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0' - queue2.put(obj2) + queue2.put(obj2, sharedonly=True) assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1' """)) self.assertEqual(len(queues.list_all()), 2) @@ -258,8 +309,8 @@ def test_put_cleared_with_subinterpreter(self): queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' - queue.put(obj1) - queue.put(obj2) + queue.put(obj1, sharedonly=True) + queue.put(obj2, sharedonly=True) """)) self.assertEqual(queue.qsize(), 2) @@ -281,12 +332,12 @@ def f(): break except queues.QueueEmpty: continue - queue2.put(obj) + queue2.put(obj, sharedonly=True) t = threading.Thread(target=f) t.start() orig = b'spam' - queue1.put(orig) + queue1.put(orig, sharedonly=True) obj = queue2.get() t.join() From 1e6b6d2e851469c695b96845335ddafb684aac3b Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 15:27:38 -0700 Subject: [PATCH 11/19] Set a default fmt on Queue objects. --- Lib/test/support/interpreters/queues.py | 40 ++++++++---- Lib/test/test_interpreters/test_queues.py | 28 +++++++++ Modules/_xxinterpqueuesmodule.c | 74 +++++++++++++++++------ 3 files changed, 112 insertions(+), 30 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 810cd8354d17a7..c746378fdb13df 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -32,40 +32,46 @@ class QueueFull(_queues.QueueFull, queue.Full): """ -def create(maxsize=0): +_SHARED_ONLY = 0 +_PICKLED = 1 + +def create(maxsize=0, *, sharedonly=False): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. + + "sharedonly" sets the default for Queue.put() and Queue.put_nowait(). """ - qid = _queues.create(maxsize) - return Queue(qid) + fmt = _SHARED_ONLY if sharedonly else _PICKLED + qid = _queues.create(maxsize, fmt) + return Queue(qid, _fmt=fmt) def list_all(): """Return a list of all open queues.""" - return [Queue(qid) - for qid in _queues.list_all()] - + return [Queue(qid, _fmt=fmt) + for qid, fmt in _queues.list_all()] -_SHARED_ONLY = 0 -_PICKLED = 1 _known_queues = weakref.WeakValueDictionary() class Queue: """A cross-interpreter queue.""" - def __new__(cls, id, /): + def __new__(cls, id, /, *, _fmt=None): # There is only one instance for any given ID. if isinstance(id, int): id = int(id) else: raise TypeError(f'id must be an int, got {id!r}') + if _fmt is None: + _fmt = _queues.get_default_fmt(id) try: self = _known_queues[id] except KeyError: self = super().__new__(cls) self._id = id + self._fmt = _fmt _known_queues[id] = self _queues.bind(id) return self @@ -108,7 +114,7 @@ def qsize(self): return _queues.get_count(self._id) def put(self, obj, timeout=None, *, - sharedonly=False, + sharedonly=None, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. @@ -116,10 +122,14 @@ def put(self, obj, timeout=None, *, This blocks while the queue is full. If "sharedonly" is true then the object must be "shareable". - It will be passed through the queue efficiently. Otherwise + It will be passed through the queue efficiently. If false then all objects are supported, at the expense of worse performance. + If None (the default) then it uses the queue's default. """ - fmt = _SHARED_ONLY if sharedonly else _PICKLED + if sharedonly is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if sharedonly else _PICKLED if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -138,7 +148,11 @@ def put(self, obj, timeout=None, *, else: break - def put_nowait(self, obj, *, sharedonly=False): + def put_nowait(self, obj, *, sharedonly=None): + if sharedonly is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if sharedonly else _PICKLED fmt = _SHARED_ONLY if sharedonly else _PICKLED if fmt is _PICKLED: obj = pickle.dumps(obj) diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 7da786b5243407..ea4a4da0f41c50 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -251,6 +251,34 @@ def test_get_nowait(self): with self.assertRaises(queues.QueueEmpty): queue.get_nowait() + def test_put_get_default_sharedonly(self): + expected = list(range(20)) + queue = queues.create(sharedonly=True) + for i in range(20): + queue.put(i) + actual = [queue.get() for _ in range(20)] + + self.assertEqual(actual, expected) + + obj = [1, 2, 3] # lists are not shareable + with self.assertRaises(interpreters.NotShareableError): + queue.put(obj) + + def test_put_get_default_not_sharedonly(self): + expected = list(range(20)) + queue = queues.create(sharedonly=False) + for i in range(20): + queue.put(i) + actual = [queue.get() for _ in range(20)] + + self.assertEqual(actual, expected) + + obj = [1, 2, 3] # lists are not shareable + queue.put(obj) + obj2 = queue.get() + self.assertEqual(obj, obj2) + self.assertIsNot(obj, obj2) + def test_put_get_same_interpreter(self): interp = interpreters.create() interp.exec_sync(dedent(""" diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index 7f2e9fbf2d199b..715bb766cac624 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -400,10 +400,11 @@ typedef struct _queue { _queueitem *first; _queueitem *last; } items; + int fmt; } _queue; static int -_queue_init(_queue *queue, Py_ssize_t maxsize) +_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { @@ -415,6 +416,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize) .items = { .maxsize = maxsize, }, + .fmt = fmt, }; return 0; } @@ -851,18 +853,26 @@ _queues_decref(_queues *queues, int64_t qid) PyThread_release_lock(queues->mutex); } -static int64_t * +struct queue_id_and_fmt { + int64_t id; + int fmt; +}; + +static struct queue_id_and_fmt * _queues_list_all(_queues *queues, int64_t *count) { - int64_t *qids = NULL; + struct queue_id_and_fmt *qids = NULL; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); - int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(queues->count)); + struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt, + (Py_ssize_t)(queues->count)); if (ids == NULL) { goto done; } _queueref *ref = queues->head; for (int64_t i=0; ref != NULL; ref = ref->next, i++) { - ids[i] = ref->qid; + ids[i].id = ref->qid; + assert(ref->queue != NULL); + ids[i].fmt = ref->queue->fmt; } *count = queues->count; @@ -898,13 +908,13 @@ _queue_free(_queue *queue) // Create a new queue. static int64_t -queue_create(_queues *queues, Py_ssize_t maxsize) +queue_create(_queues *queues, Py_ssize_t maxsize, int fmt) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } - int err = _queue_init(queue, maxsize); + int err = _queue_init(queue, maxsize, fmt); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; @@ -1275,14 +1285,15 @@ qidarg_converter(PyObject *arg, void *ptr) static PyObject * queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"maxsize", NULL}; - Py_ssize_t maxsize = -1; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n:create", kwlist, - &maxsize)) { + static char *kwlist[] = {"maxsize", "fmt", NULL}; + Py_ssize_t maxsize; + int fmt; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist, + &maxsize, &fmt)) { return NULL; } - int64_t qid = queue_create(&_globals.queues, maxsize); + int64_t qid = queue_create(&_globals.queues, maxsize, fmt); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; @@ -1337,7 +1348,7 @@ static PyObject * queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; - int64_t *qids = _queues_list_all(&_globals.queues, &count); + struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count); if (qids == NULL) { if (count == 0) { return PyList_New(0); @@ -1348,14 +1359,14 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) if (ids == NULL) { goto finally; } - int64_t *cur = qids; + struct queue_id_and_fmt *cur = qids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *qidobj = PyLong_FromLongLong(*cur); - if (qidobj == NULL) { + PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt); + if (item == NULL) { Py_SETREF(ids, NULL); break; } - PyList_SET_ITEM(ids, (Py_ssize_t)i, qidobj); + PyList_SET_ITEM(ids, (Py_ssize_t)i, item); } finally: @@ -1512,6 +1523,33 @@ PyDoc_STRVAR(queuesmod_get_maxsize_doc, \n\ Return the maximum number of items in the queue."); +static PyObject * +queuesmod_get_default_fmt(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"qid", NULL}; + qidarg_converter_data qidarg; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&:get_default_fmt", kwlist, + qidarg_converter, &qidarg)) { + return NULL; + } + int64_t qid = qidarg.id; + + _queue *queue = NULL; + int err = _queues_lookup(&_globals.queues, qid, &queue); + if (handle_queue_error(err, self, qid)) { + return NULL; + } + int fmt = queue->fmt; + _queue_unmark_waiter(queue, _globals.queues.mutex); + return PyLong_FromLong(fmt); +} + +PyDoc_STRVAR(queuesmod_get_default_fmt_doc, +"get_default_fmt(qid)\n\ +\n\ +Return the default format to use for the queue."); + static PyObject * queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds) { @@ -1606,6 +1644,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc}, {"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize), METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc}, + {"get_default_fmt", _PyCFunction_CAST(queuesmod_get_default_fmt), + METH_VARARGS | METH_KEYWORDS, queuesmod_get_default_fmt_doc}, {"is_full", _PyCFunction_CAST(queuesmod_is_full), METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc}, {"get_count", _PyCFunction_CAST(queuesmod_get_count), From 52cfcfb2217f7b2138ef59cb9f1a8e748f50ae06 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Feb 2024 15:55:34 -0700 Subject: [PATCH 12/19] Interpreter.exec_sync() -> exec(). --- Lib/test/support/interpreters/__init__.py | 4 +- Lib/test/test_interpreters/test_api.py | 46 ++++++++++---------- Lib/test/test_interpreters/test_channels.py | 4 +- Lib/test/test_interpreters/test_lifecycle.py | 2 +- Lib/test/test_interpreters/test_queues.py | 4 +- Lib/test/test_interpreters/utils.py | 4 +- 6 files changed, 32 insertions(+), 32 deletions(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index 2134211a255bf5..c95b6f6f770330 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -158,7 +158,7 @@ def prepare_main(self, ns=None, /, **kwargs): ns = dict(ns, **kwargs) if ns is not None else kwargs _interpreters.set___main___attrs(self._id, ns) - def exec_sync(self, code, /): + def exec(self, code, /): """Run the given source code in the interpreter. This is essentially the same as calling the builtin "exec" @@ -182,7 +182,7 @@ def exec_sync(self, code, /): def run(self, code, /): def task(): - self.exec_sync(code) + self.exec(code) t = threading.Thread(target=task) t.start() return t diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index aefd326977095f..dbf941914a5fb7 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -280,7 +280,7 @@ def test_subinterpreter(self): def test_finished(self): r, w = self.pipe() interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os os.write({w}, b'x') """) @@ -312,7 +312,7 @@ def test_with_only_background_threads(self): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading @@ -326,7 +326,7 @@ def task(): self.assertFalse(interp.is_running()) os.write(w_thread, DONE) - interp.exec_sync('t.join()') + interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), FINISHED) @@ -393,7 +393,7 @@ def test_from_sibling(self): interp2 = interpreters.create() self.assertEqual(set(interpreters.list_all()), {main, interp1, interp2}) - interp1.exec_sync(dedent(f""" + interp1.exec(dedent(f""" from test.support import interpreters interp2 = interpreters.Interpreter({interp2.id}) interp2.close() @@ -427,7 +427,7 @@ def test_subthreads_still_running(self): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading import time @@ -504,9 +504,9 @@ def test_not_shareable(self): # Make sure neither was actually bound. with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('print(foo)') + interp.exec('print(foo)') with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('print(spam)') + interp.exec('print(spam)') class TestInterpreterExecSync(TestBase): @@ -515,7 +515,7 @@ def test_success(self): interp = interpreters.create() script, file = _captured_script('print("it worked!", end="")') with file: - interp.exec_sync(script) + interp.exec(script) out = file.read() self.assertEqual(out, 'it worked!') @@ -523,7 +523,7 @@ def test_success(self): def test_failure(self): interp = interpreters.create() with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('raise Exception') + interp.exec('raise Exception') def test_display_preserved_exception(self): tempdir = self.temp_dir() @@ -542,18 +542,18 @@ def script(): spam.eggs() interp = interpreters.create() - interp.exec_sync(script) + interp.exec(script) """) stdout, stderr = self.assert_python_failure(scriptfile) self.maxDiff = None - interpmod_line, = (l for l in stderr.splitlines() if ' exec_sync' in l) - # File "{interpreters.__file__}", line 179, in exec_sync + interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l) + # File "{interpreters.__file__}", line 179, in exec self.assertEqual(stderr, dedent(f"""\ Traceback (most recent call last): File "{scriptfile}", line 9, in - interp.exec_sync(script) - ~~~~~~~~~~~~~~~~^^^^^^^^ + interp.exec(script) + ~~~~~~~~~~~^^^^^^^^ {interpmod_line.strip()} raise ExecFailure(excinfo) test.support.interpreters.ExecFailure: RuntimeError: uh-oh! @@ -578,7 +578,7 @@ def test_in_thread(self): script, file = _captured_script('print("it worked!", end="")') with file: def f(): - interp.exec_sync(script) + interp.exec(script) t = threading.Thread(target=f) t.start() @@ -604,7 +604,7 @@ def test_fork(self): with open('{file.name}', 'w', encoding='utf-8') as out: out.write('{expected}') """) - interp.exec_sync(script) + interp.exec(script) file.seek(0) content = file.read() @@ -615,17 +615,17 @@ def test_already_running(self): interp = interpreters.create() with _running(interp): with self.assertRaises(RuntimeError): - interp.exec_sync('print("spam")') + interp.exec('print("spam")') def test_bad_script(self): interp = interpreters.create() with self.assertRaises(TypeError): - interp.exec_sync(10) + interp.exec(10) def test_bytes_for_script(self): interp = interpreters.create() with self.assertRaises(TypeError): - interp.exec_sync(b'print("spam")') + interp.exec(b'print("spam")') def test_with_background_threads_still_running(self): r_interp, w_interp = self.pipe() @@ -636,7 +636,7 @@ def test_with_background_threads_still_running(self): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading @@ -648,18 +648,18 @@ def task(): t.start() os.write({w_interp}, {RAN!r}) """) - interp.exec_sync(f"""if True: + interp.exec(f"""if True: os.write({w_interp}, {RAN!r}) """) os.write(w_thread, DONE) - interp.exec_sync('t.join()') + interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), FINISHED) # test_xxsubinterpreters covers the remaining - # Interpreter.exec_sync() behavior. + # Interpreter.exec() behavior. class TestInterpreterRun(TestBase): diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py index 3c3e18832d4168..07e503837bcf75 100644 --- a/Lib/test/test_interpreters/test_channels.py +++ b/Lib/test/test_interpreters/test_channels.py @@ -120,7 +120,7 @@ def test_send_recv_main(self): def test_send_recv_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import channels r, s = channels.create() orig = b'spam' @@ -193,7 +193,7 @@ def test_send_recv_nowait_main_with_default(self): def test_send_recv_nowait_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import channels r, s = channels.create() orig = b'spam' diff --git a/Lib/test/test_interpreters/test_lifecycle.py b/Lib/test/test_interpreters/test_lifecycle.py index c2917d839904f9..67b6f439c3191f 100644 --- a/Lib/test/test_interpreters/test_lifecycle.py +++ b/Lib/test/test_interpreters/test_lifecycle.py @@ -124,7 +124,7 @@ def test_sys_path_0(self): orig = sys.path[0] interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import json import sys print(json.dumps({{ diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index ea4a4da0f41c50..4a8ea7d4de72a5 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -51,7 +51,7 @@ def test_shareable(self): queue1 = queues.create() interp = interpreters.create() - interp.exec_sync(dedent(f""" + interp.exec(dedent(f""" from test.support.interpreters import queues queue1 = queues.Queue({queue1.id}) """)); @@ -281,7 +281,7 @@ def test_put_get_default_not_sharedonly(self): def test_put_get_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import queues queue = queues.create() orig = b'spam' diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py index 3a37ed09dd8943..5a628b71a45756 100644 --- a/Lib/test/test_interpreters/utils.py +++ b/Lib/test/test_interpreters/utils.py @@ -41,7 +41,7 @@ def _run_output(interp, request, init=None): with rpipe: if init: interp.prepare_main(init) - interp.exec_sync(script) + interp.exec(script) return rpipe.read() @@ -49,7 +49,7 @@ def _run_output(interp, request, init=None): def _running(interp): r, w = os.pipe() def run(): - interp.exec_sync(dedent(f""" + interp.exec(dedent(f""" # wait for "signal" with open({r}) as rpipe: rpipe.read() From 3bd01f2cf75636d6d0e19796b76c24458d06f679 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 14 Feb 2024 11:04:32 -0700 Subject: [PATCH 13/19] Add Interpreter.call(). --- Lib/test/support/interpreters/__init__.py | 55 ++++- Lib/test/test_interpreters/test_api.py | 243 +++++++++++++++++++--- 2 files changed, 270 insertions(+), 28 deletions(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index c95b6f6f770330..ba0c7f353024dd 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -180,9 +180,60 @@ def exec(self, code, /): if excinfo is not None: raise ExecFailure(excinfo) - def run(self, code, /): + def call(self, callable, /, args=None, kwargs=None): + """Call the object in the interpreter with given args/kwargs. + + Return the function's return value. If it raises an exception, + raise it in the calling interpreter. This contrasts with + Interpreter.exec(), which discards the return value and only + propagates the exception as ExecFailure. + + Unlike Interpreter.exec() and prepare_main(), all objects are + supported, at the expense of some performance. + """ + pickled_callable = pickle.dumps(callable) + pickled_args = pickle.dumps(args) + pickled_kwargs = pickle.dumps(kwargs) + + results = create_queue(sharedonly=False) + self.prepare_main(_call_results=results) + self.exec(f""" + def _call_impl(): + try: + import pickle + callable = pickle.loads({pickled_callable!r}) + if {pickled_args!r} is None: + args = () + else: + args = pickle.loads({pickled_args!r}) + if {pickled_kwargs!r} is None: + kwargs = {} + else: + kwargs = pickle.loads({pickled_kwargs!r}) + + res = callable(*args, **kwargs) + except Exception as exc: + res = pickle.dumps((None, exc)) + else: + res = pickle.dumps((res, None)) + _call_results.put(res) + _call_impl() + del _call_impl + del _call_results + """) + res, exc = results.get() + if exc is None: + raise exc + else: + return res + + def call_in_thread(self, callable, /, args=None, kwargs=None): + """Return a new thread that calls the object in the interpreter. + + The return value and any raised exception are discarded. + """ def task(): - self.exec(code) + self.call(callable, args, kwargs) t = threading.Thread(target=task) t.start() return t diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index dbf941914a5fb7..9f0f535407587c 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -509,7 +509,7 @@ def test_not_shareable(self): interp.exec('print(spam)') -class TestInterpreterExecSync(TestBase): +class TestInterpreterExec(TestBase): def test_success(self): interp = interpreters.create() @@ -662,32 +662,223 @@ def task(): # Interpreter.exec() behavior. -class TestInterpreterRun(TestBase): - - def test_success(self): - interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: - t = interp.run(script) - t.join() - out = file.read() - - self.assertEqual(out, 'it worked!') - - def test_failure(self): - caught = False - def excepthook(args): - nonlocal caught - caught = True - threading.excepthook = excepthook - try: - interp = interpreters.create() - t = interp.run('raise Exception') - t.join() +def call_func_noop(): + pass + + +def call_func_return_shareable(): + return (1, None) + + +def call_func_return_not_shareable(): + return [1, 2, 3] + + +def call_func_failure(): + raise Exception('spam!') + + +def call_func_ident(value): + return value + + +def get_call_func_closure(value): + def call_func_closure(): + return value + return call_func_closure + + +class Spam: + + @staticmethod + def noop(): + pass + + @classmethod + def from_values(cls, *values): + return cls(values) + + def __init__(self, value): + self.value = value + + def __call__(self, *args, **kwargs): + return (self.value, args, kwargs) + + def __eq__(self, other): + if not isinstance(other, Spam): + return NotImplemented + return self.value == other.value + + def run(self, *args, **kwargs): + return (self.value, args, kwargs) + + +def call_func_complex(op, /, value=None, *args, exc=None, **kwargs): + if exc is not None: + raise exc + if op == '': + raise ValueError('missing op') + elif op == 'ident': + if args or kwargs: + raise Exception((args, kwargs)) + return value + elif op == 'full-ident': + return (value, args, kwargs) + elif op == 'globals': + if value is not None or args or kwargs: + raise Exception((value, args, kwargs)) + return __name__ + elif op == 'interpid': + if value is not None or args or kwargs: + raise Exception((value, args, kwargs)) + return interpreters.get_current().id + elif op == 'closure': + if args or kwargs: + raise Exception((args, kwargs)) + return get_call_func_closure(value) + elif op == 'custom': + if args or kwargs: + raise Exception((args, kwargs)) + return Spam(value) + elif op == 'custom-inner': + if args or kwargs: + raise Exception((args, kwargs)) + class Eggs(Spam): + pass + return Eggs(value) + else if not isinstance(op, str): + raise TypeError(op) + else: + raise NotImplementedError(op) + + +class TestInterpreterCall(TestBase): + + # signature + # - blank + # - args + # - kwargs + # - args, kwargs + # return + # - nothing (None) + # - simple + # - closure + # - custom + # ops: + # - do nothing + # - fail + # - echo + # - do complex, relative to interpreter + # scope + # - global func + # - local closure + # - returned closure + # - callable type instance + # - type + # - classmethod + # - staticmethod + # - instance method + # exception + # - builtin + # - custom + # - preserves info (e.g. SyntaxError) + # - matching error display + + def test_call(self): + interp = interpreters.create() + + for i, ((callable, args, kwargs), expected) in enumerate([ + ((call_func_noop, (), {}), + None), + ((call_func_return_shareable, (), {}), + (1, None)), + ((call_func_return_not_shareable, (), {}), + [1, 2, 3]), + ((call_func_ident, ('spamspamspam',), {}), + 'spamspamspam'), + ((get_call_func_closure, (42,), {}), + ...), + ((get_call_func_closure(42), (), {}), + 42), + ((Spam.noop, (), {}), + None), + ((Spam.from_values, (), {}), + None), + ((Spam.from_values, (1, 2, 3), {}), + Spam((1, 2, 3)), + ((Spam, ('???'), {}), + Spam('???')), + ((Spam(101), (), {}), + 101), + ((Spam(10101).run, (), {}), + 10101), + ((call_func_complex, ('ident', 'spam'), {}), + 'spam'), + ((call_func_complex, ('full-ident', 'spam'), {}), + ('spam', (), {})), + ((call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), + ('spam', ('ham',), {'eggs': '!!!'})), + ((call_func_complex, ('globals',), {}), + 'test.test_interpreters.test_api'), + ((call_func_complex, ('interpid',), {}), + interp.id), + ((call_func_complex, ('closure',), {'value': '~~~'}), + '~~~'), + ((call_func_complex, ('custom', 'spam!'), {}), + Spam('spam!')), + ((call_func_complex, ('custom-inner', 'eggs!'), {}), + ...), + ]): + with self.subTest(f'success case #{i+1}'): + res = interp.call(callable, args, kwargs) + self.assertEqual(res, expected) + + for i, ((callable, args, kwargs), expected) in enumerate([ + ((call_func_failure, (), {}), + Exception), + ((call_func_complex, ('???',), {exc=ValueError('spam')}), + ValueError), + ]): + with self.subTest(f'failure case #{i+1}'): + with self.assertRaises(expected): + interp.call(callable, args, kwargs) + + def test_call_in_thread(self): + interp = interpreters.create() + + for i, (callable, args, kwargs) in enumerate([ + (call_func_noop, (), {}), + (call_func_return_shareable, (), {}), + (call_func_return_not_shareable, (), {}), + (call_func_ident, ('spamspamspam',), {}), + (get_call_func_closure, (42,), {}), + (get_call_func_closure(42), (), {}), + (Spam.noop, (), {}), + (Spam.from_values, (), {}), + (Spam.from_values, (1, 2, 3), {}), + (Spam, ('???'), {}), + (Spam(101), (), {}), + (Spam(10101).run, (), {}), + (call_func_complex, ('ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), + (call_func_complex, ('globals',), {}), + (call_func_complex, ('interpid',), {}), + (call_func_complex, ('closure',), {'value': '~~~'}), + (call_func_complex, ('custom', 'spam!'), {}), + (call_func_complex, ('custom-inner', 'eggs!'), {}), + ]): + with self.subTest(f'success case #{i+1}'): + t = interp.call_in_thread(callable, args, kwargs) + t.join() - self.assertTrue(caught) - except BaseException: - threading.excepthook = threading.__excepthook__ + for i, (callable, args, kwargs) in enumerate([ + (call_func_failure, (), {}), + (call_func_complex, ('???',), {exc=ValueError('spam')}), + ]): + with self.subTest(f'failure case #{i+1}'): + t = interp.call_in_thread(callable, args, kwargs) + t.join() class TestIsShareable(TestBase): From ca1dadf91b5fed2cd4016ea031e6b87e84a3da09 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 14 Feb 2024 17:35:00 -0700 Subject: [PATCH 14/19] Simplify Interpreter.call() for now. --- Lib/test/support/interpreters/__init__.py | 74 +++++-------- Lib/test/test_interpreters/test_api.py | 122 ++++++++++------------ Lib/test/test_interpreters/utils.py | 15 ++- Modules/_xxsubinterpretersmodule.c | 52 +++++++++ 4 files changed, 151 insertions(+), 112 deletions(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index ba0c7f353024dd..33de07834a0b5f 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -14,7 +14,8 @@ __all__ = [ 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', 'Interpreter', - 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', + 'InterpreterError', 'InterpreterNotFoundError', + 'ExecFailure', 'CallFailure', 'NotShareableError', 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', ] @@ -43,7 +44,7 @@ def __getattr__(name): {formatted} """.strip() -class ExecFailure(RuntimeError): +class _ExecFailure(RuntimeError): def __init__(self, excinfo): msg = excinfo.formatted @@ -67,6 +68,14 @@ def __str__(self): ) +class ExecFailure(_ExecFailure): + """Raised from Interpreter.exec() for unhandled exceptions.""" + + +class CallFailure(_ExecFailure): + """Raised from Interpreter.call() for unhandled exceptions.""" + + def create(): """Return a new (idle) Python interpreter.""" id = _interpreters.create(isolated=True) @@ -180,60 +189,33 @@ def exec(self, code, /): if excinfo is not None: raise ExecFailure(excinfo) - def call(self, callable, /, args=None, kwargs=None): + def call(self, callable, /): """Call the object in the interpreter with given args/kwargs. - Return the function's return value. If it raises an exception, - raise it in the calling interpreter. This contrasts with - Interpreter.exec(), which discards the return value and only - propagates the exception as ExecFailure. + Only functions that take no arguments and have no closure + are supported. - Unlike Interpreter.exec() and prepare_main(), all objects are - supported, at the expense of some performance. + The return value is discarded. + + If the callable raises an exception then the error display + (including full traceback) is send back between the interpreters + and a CallFailedError is raised, much like what happens with + Interpreter.exec(). """ - pickled_callable = pickle.dumps(callable) - pickled_args = pickle.dumps(args) - pickled_kwargs = pickle.dumps(kwargs) - - results = create_queue(sharedonly=False) - self.prepare_main(_call_results=results) - self.exec(f""" - def _call_impl(): - try: - import pickle - callable = pickle.loads({pickled_callable!r}) - if {pickled_args!r} is None: - args = () - else: - args = pickle.loads({pickled_args!r}) - if {pickled_kwargs!r} is None: - kwargs = {} - else: - kwargs = pickle.loads({pickled_kwargs!r}) - - res = callable(*args, **kwargs) - except Exception as exc: - res = pickle.dumps((None, exc)) - else: - res = pickle.dumps((res, None)) - _call_results.put(res) - _call_impl() - del _call_impl - del _call_results - """) - res, exc = results.get() - if exc is None: - raise exc - else: - return res + # XXX Support args and kwargs. + # XXX Support arbitrary callables. + # XXX Support returning the return value (e.g. via pickle). + excinfo = _interpreters.call(self._id, callable) + if excinfo is not None: + raise CallFailure(excinfo) - def call_in_thread(self, callable, /, args=None, kwargs=None): + def call_in_thread(self, callable, /): """Return a new thread that calls the object in the interpreter. The return value and any raised exception are discarded. """ def task(): - self.call(callable, args, kwargs) + self.call(callable) t = threading.Thread(target=task) t.start() return t diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index 9f0f535407587c..cad37512d55ac0 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -746,7 +746,7 @@ def call_func_complex(op, /, value=None, *args, exc=None, **kwargs): class Eggs(Spam): pass return Eggs(value) - else if not isinstance(op, str): + elif not isinstance(op, str): raise TypeError(op) else: raise NotImplementedError(op) @@ -787,61 +787,43 @@ class TestInterpreterCall(TestBase): def test_call(self): interp = interpreters.create() - for i, ((callable, args, kwargs), expected) in enumerate([ - ((call_func_noop, (), {}), - None), - ((call_func_return_shareable, (), {}), - (1, None)), - ((call_func_return_not_shareable, (), {}), - [1, 2, 3]), - ((call_func_ident, ('spamspamspam',), {}), - 'spamspamspam'), - ((get_call_func_closure, (42,), {}), - ...), - ((get_call_func_closure(42), (), {}), - 42), - ((Spam.noop, (), {}), - None), - ((Spam.from_values, (), {}), - None), - ((Spam.from_values, (1, 2, 3), {}), - Spam((1, 2, 3)), - ((Spam, ('???'), {}), - Spam('???')), - ((Spam(101), (), {}), - 101), - ((Spam(10101).run, (), {}), - 10101), - ((call_func_complex, ('ident', 'spam'), {}), - 'spam'), - ((call_func_complex, ('full-ident', 'spam'), {}), - ('spam', (), {})), - ((call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), - ('spam', ('ham',), {'eggs': '!!!'})), - ((call_func_complex, ('globals',), {}), - 'test.test_interpreters.test_api'), - ((call_func_complex, ('interpid',), {}), - interp.id), - ((call_func_complex, ('closure',), {'value': '~~~'}), - '~~~'), - ((call_func_complex, ('custom', 'spam!'), {}), - Spam('spam!')), - ((call_func_complex, ('custom-inner', 'eggs!'), {}), - ...), + for i, (callable, args, kwargs) in enumerate([ + (call_func_noop, (), {}), + (call_func_return_shareable, (), {}), + (call_func_return_not_shareable, (), {}), + (Spam.noop, (), {}), ]): with self.subTest(f'success case #{i+1}'): - res = interp.call(callable, args, kwargs) - self.assertEqual(res, expected) - - for i, ((callable, args, kwargs), expected) in enumerate([ - ((call_func_failure, (), {}), - Exception), - ((call_func_complex, ('???',), {exc=ValueError('spam')}), - ValueError), + res = interp.call(callable) + self.assertIs(res, None) + + for i, (callable, args, kwargs) in enumerate([ + (call_func_ident, ('spamspamspam',), {}), + (get_call_func_closure, (42,), {}), + (get_call_func_closure(42), (), {}), + (Spam.from_values, (), {}), + (Spam.from_values, (1, 2, 3), {}), + (Spam, ('???'), {}), + (Spam(101), (), {}), + (Spam(10101).run, (), {}), + (call_func_complex, ('ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), + (call_func_complex, ('globals',), {}), + (call_func_complex, ('interpid',), {}), + (call_func_complex, ('closure',), {'value': '~~~'}), + (call_func_complex, ('custom', 'spam!'), {}), + (call_func_complex, ('custom-inner', 'eggs!'), {}), + (call_func_complex, ('???',), {'exc': ValueError('spam')}), ]): - with self.subTest(f'failure case #{i+1}'): - with self.assertRaises(expected): - interp.call(callable, args, kwargs) + with self.subTest(f'invalid case #{i+1}'): + with self.assertRaises(Exception): + if args or kwargs: + raise Exception((args, kwargs)) + interp.call(callable) + + with self.assertRaises(interpreters.CallFailure): + interp.call(call_func_failure) def test_call_in_thread(self): interp = interpreters.create() @@ -850,10 +832,18 @@ def test_call_in_thread(self): (call_func_noop, (), {}), (call_func_return_shareable, (), {}), (call_func_return_not_shareable, (), {}), + (Spam.noop, (), {}), + ]): + with self.subTest(f'success case #{i+1}'): + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(callable) + t.join() + self.assertIsNone(ctx.caught) + + for i, (callable, args, kwargs) in enumerate([ (call_func_ident, ('spamspamspam',), {}), (get_call_func_closure, (42,), {}), (get_call_func_closure(42), (), {}), - (Spam.noop, (), {}), (Spam.from_values, (), {}), (Spam.from_values, (1, 2, 3), {}), (Spam, ('???'), {}), @@ -867,18 +857,20 @@ def test_call_in_thread(self): (call_func_complex, ('closure',), {'value': '~~~'}), (call_func_complex, ('custom', 'spam!'), {}), (call_func_complex, ('custom-inner', 'eggs!'), {}), + (call_func_complex, ('???',), {'exc': ValueError('spam')}), ]): - with self.subTest(f'success case #{i+1}'): - t = interp.call_in_thread(callable, args, kwargs) - t.join() - - for i, (callable, args, kwargs) in enumerate([ - (call_func_failure, (), {}), - (call_func_complex, ('???',), {exc=ValueError('spam')}), - ]): - with self.subTest(f'failure case #{i+1}'): - t = interp.call_in_thread(callable, args, kwargs) - t.join() + with self.subTest(f'invalid case #{i+1}'): + if args or kwargs: + continue + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(callable) + t.join() + self.assertIsNotNone(ctx.caught) + + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(call_func_failure) + t.join() + self.assertIsNotNone(ctx.caught) class TestIsShareable(TestBase): diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py index 5a628b71a45756..973d05d4f96dcb 100644 --- a/Lib/test/test_interpreters/utils.py +++ b/Lib/test/test_interpreters/utils.py @@ -4,8 +4,9 @@ import subprocess import sys import tempfile -import threading from textwrap import dedent +import threading +import types import unittest from test import support @@ -84,6 +85,18 @@ def temp_dir(self): self.addCleanup(lambda: os_helper.rmtree(tempdir)) return tempdir + @contextlib.contextmanager + def captured_thread_exception(self): + ctx = types.SimpleNamespace(caught=None) + def excepthook(args): + ctx.caught = args + orig_excepthook = threading.excepthook + threading.excepthook = excepthook + try: + yield ctx + finally: + threading.excepthook = orig_excepthook + def make_script(self, filename, dirname=None, text=None): if text: text = dedent(text) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index ade37d7cc9421c..28c2f9c08bc0da 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -902,6 +902,56 @@ The code/function must not take any arguments or be a closure\n\ If a function is provided, its code object is used and all its state\n\ is ignored, including its __globals__ dict."); +static PyObject * +interp_call(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"id", "callable", "args", "kwargs", NULL}; + PyObject *id, *callable; + PyObject *args_obj = NULL; + PyObject *kwargs_obj = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "OO|OO:" MODULE_NAME_STR ".call", kwlist, + &id, &callable, &args_obj, &kwargs_obj)) { + return NULL; + } + + if (args_obj != NULL) { + PyErr_SetString(PyExc_ValueError, "got unexpected args"); + return NULL; + } + if (kwargs_obj != NULL) { + PyErr_SetString(PyExc_ValueError, "got unexpected kwargs"); + return NULL; + } + + PyObject *code = (PyObject *)convert_code_arg(callable, MODULE_NAME_STR ".call", + "argument 2", "a function"); + if (code == NULL) { + return NULL; + } + + PyObject *excinfo = NULL; + int res = _interp_exec(self, id, code, NULL, &excinfo); + Py_DECREF(code); + if (res < 0) { + assert((excinfo == NULL) != (PyErr_Occurred() == NULL)); + return excinfo; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(call_doc, +"call(id, callable, args=None, kwargs=None)\n\ +\n\ +Call the provided object in the identified interpreter.\n\ +Pass the given args and kwargs, if possible.\n\ +\n\ +\"callable\" may be a plain function with no free vars that takes\n\ +no arguments.\n\ +\n\ +The function's code object is used and all its state\n\ +is ignored, including its __globals__ dict."); + static PyObject * interp_run_string(PyObject *self, PyObject *args, PyObject *kwds) { @@ -1085,6 +1135,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, is_running_doc}, {"exec", _PyCFunction_CAST(interp_exec), METH_VARARGS | METH_KEYWORDS, exec_doc}, + {"call", _PyCFunction_CAST(interp_call), + METH_VARARGS | METH_KEYWORDS, call_doc}, {"run_string", _PyCFunction_CAST(interp_run_string), METH_VARARGS | METH_KEYWORDS, run_string_doc}, {"run_func", _PyCFunction_CAST(interp_run_func), From ba6bc8521c9048617be609fdfb1bb6502842b48a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 15 Feb 2024 17:45:51 -0700 Subject: [PATCH 15/19] ExecFailure -> ExecutionFailed --- Lib/test/support/interpreters/__init__.py | 33 ++++++++++------------- Lib/test/test_interpreters/test_api.py | 12 ++++----- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index 33de07834a0b5f..d02ffbae1113c0 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -14,8 +14,7 @@ __all__ = [ 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', 'Interpreter', - 'InterpreterError', 'InterpreterNotFoundError', - 'ExecFailure', 'CallFailure', + 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed', 'NotShareableError', 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', ] @@ -44,7 +43,11 @@ def __getattr__(name): {formatted} """.strip() -class _ExecFailure(RuntimeError): +class ExecutionFailed(RuntimeError): + """An unhandled exception happened during execution. + + This is raised from Interpreter.exec() and Interpreter.call(). + """ def __init__(self, excinfo): msg = excinfo.formatted @@ -68,14 +71,6 @@ def __str__(self): ) -class ExecFailure(_ExecFailure): - """Raised from Interpreter.exec() for unhandled exceptions.""" - - -class CallFailure(_ExecFailure): - """Raised from Interpreter.call() for unhandled exceptions.""" - - def create(): """Return a new (idle) Python interpreter.""" id = _interpreters.create(isolated=True) @@ -176,10 +171,10 @@ def exec(self, code, /): There is no return value. - If the code raises an unhandled exception then an ExecFailure - is raised, which summarizes the unhandled exception. The actual - exception is discarded because objects cannot be shared between - interpreters. + If the code raises an unhandled exception then an ExecutionFailed + exception is raised, which summarizes the unhandled exception. + The actual exception is discarded because objects cannot be + shared between interpreters. This blocks the current Python thread until done. During that time, the previous interpreter is allowed to run @@ -187,7 +182,7 @@ def exec(self, code, /): """ excinfo = _interpreters.exec(self._id, code) if excinfo is not None: - raise ExecFailure(excinfo) + raise ExecutionFailed(excinfo) def call(self, callable, /): """Call the object in the interpreter with given args/kwargs. @@ -199,15 +194,15 @@ def call(self, callable, /): If the callable raises an exception then the error display (including full traceback) is send back between the interpreters - and a CallFailedError is raised, much like what happens with - Interpreter.exec(). + and an ExecutionFailed exception is raised, much like what + happens with Interpreter.exec(). """ # XXX Support args and kwargs. # XXX Support arbitrary callables. # XXX Support returning the return value (e.g. via pickle). excinfo = _interpreters.call(self._id, callable) if excinfo is not None: - raise CallFailure(excinfo) + raise ExecutionFailed(excinfo) def call_in_thread(self, callable, /): """Return a new thread that calls the object in the interpreter. diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index cad37512d55ac0..363143fa810f35 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -503,9 +503,9 @@ def test_not_shareable(self): interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'}) # Make sure neither was actually bound. - with self.assertRaises(interpreters.ExecFailure): + with self.assertRaises(interpreters.ExecutionFailed): interp.exec('print(foo)') - with self.assertRaises(interpreters.ExecFailure): + with self.assertRaises(interpreters.ExecutionFailed): interp.exec('print(spam)') @@ -522,7 +522,7 @@ def test_success(self): def test_failure(self): interp = interpreters.create() - with self.assertRaises(interpreters.ExecFailure): + with self.assertRaises(interpreters.ExecutionFailed): interp.exec('raise Exception') def test_display_preserved_exception(self): @@ -555,8 +555,8 @@ def script(): interp.exec(script) ~~~~~~~~~~~^^^^^^^^ {interpmod_line.strip()} - raise ExecFailure(excinfo) - test.support.interpreters.ExecFailure: RuntimeError: uh-oh! + raise ExecutionFailed(excinfo) + test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh! Uncaught in the interpreter: @@ -822,7 +822,7 @@ def test_call(self): raise Exception((args, kwargs)) interp.call(callable) - with self.assertRaises(interpreters.CallFailure): + with self.assertRaises(interpreters.ExecutionFailed): interp.call(call_func_failure) def test_call_in_thread(self): From 8c25b2ff21be0868e4c0993259728bca74db7322 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 15 Feb 2024 18:32:57 -0700 Subject: [PATCH 16/19] Drop an errant line. --- Lib/test/support/interpreters/queues.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index c746378fdb13df..266fbee05ec8e8 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -153,7 +153,6 @@ def put_nowait(self, obj, *, sharedonly=None): fmt = self._fmt else: fmt = _SHARED_ONLY if sharedonly else _PICKLED - fmt = _SHARED_ONLY if sharedonly else _PICKLED if fmt is _PICKLED: obj = pickle.dumps(obj) try: From ebf0a5a34595158ac8032029188a9fd5f2ac1550 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 15 Feb 2024 18:32:06 -0700 Subject: [PATCH 17/19] sharedonly -> strictequiv --- Lib/test/support/interpreters/queues.py | 46 +++++++++++----- Lib/test/test_interpreters/test_queues.py | 66 +++++++++++------------ 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 266fbee05ec8e8..1583be638e26aa 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -35,14 +35,15 @@ class QueueFull(_queues.QueueFull, queue.Full): _SHARED_ONLY = 0 _PICKLED = 1 -def create(maxsize=0, *, sharedonly=False): +def create(maxsize=0, *, strictequiv=False): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. - "sharedonly" sets the default for Queue.put() and Queue.put_nowait(). + "strictequiv" sets the default for Queue.put() + and Queue.put_nowait(). """ - fmt = _SHARED_ONLY if sharedonly else _PICKLED + fmt = _SHARED_ONLY if strictequiv else _PICKLED qid = _queues.create(maxsize, fmt) return Queue(qid, _fmt=fmt) @@ -114,22 +115,41 @@ def qsize(self): return _queues.get_count(self._id) def put(self, obj, timeout=None, *, - sharedonly=None, + strictequiv=None, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. This blocks while the queue is full. - If "sharedonly" is true then the object must be "shareable". - It will be passed through the queue efficiently. If false then - all objects are supported, at the expense of worse performance. - If None (the default) then it uses the queue's default. + If "strictequiv" is None (the default) then it uses the + queue's default, set with create_queue().. + + If "strictequiv" is false then all objects are supported, + at the expense of worse performance. + + If "strictequiv" is true then the corresponding object returned + from Queue.get() will be strictly equivalent to the given obj. + In other words, the two objects will be indistinguishable from + each other, even if the object is mutable. The received object + may actually be the same object, or a copy (immutable values + only), or a proxy. + + Regardless, the received object should be treated as though + the original has been shared directly, whether or not it + actually is. That’s a slightly different and stronger promise + than just equality. + + This stricter guarantee requires that the provided object + must be "shareable". Examples of "shareable" types include + the builtin singletons, str, and memoryview. An additional + benefit is that such objects will be passed through the queue + efficiently. """ - if sharedonly is None: + if strictequiv is None: fmt = self._fmt else: - fmt = _SHARED_ONLY if sharedonly else _PICKLED + fmt = _SHARED_ONLY if strictequiv else _PICKLED if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -148,11 +168,11 @@ def put(self, obj, timeout=None, *, else: break - def put_nowait(self, obj, *, sharedonly=None): - if sharedonly is None: + def put_nowait(self, obj, *, strictequiv=None): + if strictequiv is None: fmt = self._fmt else: - fmt = _SHARED_ONLY if sharedonly else _PICKLED + fmt = _SHARED_ONLY if strictequiv else _PICKLED if fmt is _PICKLED: obj = pickle.dumps(obj) try: diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 4a8ea7d4de72a5..c5c44991187733 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -58,13 +58,13 @@ def test_shareable(self): with self.subTest('same interpreter'): queue2 = queues.create() - queue1.put(queue2, sharedonly=True) + queue1.put(queue2, strictequiv=True) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): queue4 = queues.create() - queue1.put(queue4, sharedonly=True) + queue1.put(queue4, strictequiv=True) out = _run_output(interp, dedent(""" queue4 = queue1.get() print(queue4.id) @@ -75,7 +75,7 @@ def test_shareable(self): with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" queue5 = queues.create() - queue1.put(queue5, sharedonly=True) + queue1.put(queue5, strictequiv=True) print(queue5.id) """)) qid = int(out) @@ -118,7 +118,7 @@ class TestQueueOps(TestBase): def test_empty(self): queue = queues.create() before = queue.empty() - queue.put(None, sharedonly=True) + queue.put(None, strictequiv=True) during = queue.empty() queue.get() after = queue.empty() @@ -133,7 +133,7 @@ def test_full(self): queue = queues.create(3) for _ in range(3): actual.append(queue.full()) - queue.put(None, sharedonly=True) + queue.put(None, strictequiv=True) actual.append(queue.full()) for _ in range(3): queue.get() @@ -147,16 +147,16 @@ def test_qsize(self): queue = queues.create() for _ in range(3): actual.append(queue.qsize()) - queue.put(None, sharedonly=True) + queue.put(None, strictequiv=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) - queue.put(None, sharedonly=True) + queue.put(None, strictequiv=True) actual.append(queue.qsize()) for _ in range(3): queue.get() actual.append(queue.qsize()) - queue.put(None, sharedonly=True) + queue.put(None, strictequiv=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) @@ -165,9 +165,9 @@ def test_qsize(self): def test_put_get_main(self): expected = list(range(20)) - for sharedonly in (True, False): - kwds = dict(sharedonly=sharedonly) - with self.subTest(f'sharedonly={sharedonly}'): + for strictequiv in (True, False): + kwds = dict(strictequiv=strictequiv) + with self.subTest(f'strictequiv={strictequiv}'): queue = queues.create() for i in range(20): queue.put(i, **kwds) @@ -176,9 +176,9 @@ def test_put_get_main(self): self.assertEqual(actual, expected) def test_put_timeout(self): - for sharedonly in (True, False): - kwds = dict(sharedonly=sharedonly) - with self.subTest(f'sharedonly={sharedonly}'): + for strictequiv in (True, False): + kwds = dict(strictequiv=strictequiv) + with self.subTest(f'strictequiv={strictequiv}'): queue = queues.create(2) queue.put(None, **kwds) queue.put(None, **kwds) @@ -188,9 +188,9 @@ def test_put_timeout(self): queue.put(None, **kwds) def test_put_nowait(self): - for sharedonly in (True, False): - kwds = dict(sharedonly=sharedonly) - with self.subTest(f'sharedonly={sharedonly}'): + for strictequiv in (True, False): + kwds = dict(strictequiv=strictequiv) + with self.subTest(f'strictequiv={strictequiv}'): queue = queues.create(2) queue.put_nowait(None, **kwds) queue.put_nowait(None, **kwds) @@ -199,7 +199,7 @@ def test_put_nowait(self): queue.get() queue.put_nowait(None, **kwds) - def test_put_sharedonly(self): + def test_put_strictequiv(self): for obj in [ None, True, @@ -210,7 +210,7 @@ def test_put_sharedonly(self): ]: with self.subTest(repr(obj)): queue = queues.create() - queue.put(obj, sharedonly=True) + queue.put(obj, strictequiv=True) obj2 = queue.get() self.assertEqual(obj2, obj) @@ -221,9 +221,9 @@ def test_put_sharedonly(self): with self.subTest(repr(obj)): queue = queues.create() with self.assertRaises(interpreters.NotShareableError): - queue.put(obj, sharedonly=True) + queue.put(obj, strictequiv=True) - def test_put_not_sharedonly(self): + def test_put_not_strictequiv(self): for obj in [ None, True, @@ -237,7 +237,7 @@ def test_put_not_sharedonly(self): ]: with self.subTest(repr(obj)): queue = queues.create() - queue.put(obj, sharedonly=False) + queue.put(obj, strictequiv=False) obj2 = queue.get() self.assertEqual(obj2, obj) @@ -251,9 +251,9 @@ def test_get_nowait(self): with self.assertRaises(queues.QueueEmpty): queue.get_nowait() - def test_put_get_default_sharedonly(self): + def test_put_get_default_strictequiv(self): expected = list(range(20)) - queue = queues.create(sharedonly=True) + queue = queues.create(strictequiv=True) for i in range(20): queue.put(i) actual = [queue.get() for _ in range(20)] @@ -264,9 +264,9 @@ def test_put_get_default_sharedonly(self): with self.assertRaises(interpreters.NotShareableError): queue.put(obj) - def test_put_get_default_not_sharedonly(self): + def test_put_get_default_not_strictequiv(self): expected = list(range(20)) - queue = queues.create(sharedonly=False) + queue = queues.create(strictequiv=False) for i in range(20): queue.put(i) actual = [queue.get() for _ in range(20)] @@ -285,7 +285,7 @@ def test_put_get_same_interpreter(self): from test.support.interpreters import queues queue = queues.create() orig = b'spam' - queue.put(orig, sharedonly=True) + queue.put(orig, strictequiv=True) obj = queue.get() assert obj == orig, 'expected: obj == orig' assert obj is not orig, 'expected: obj is not orig' @@ -298,7 +298,7 @@ def test_put_get_different_interpreters(self): self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' - queue1.put(obj1, sharedonly=True) + queue1.put(obj1, strictequiv=True) out = _run_output( interp, @@ -315,7 +315,7 @@ def test_put_get_different_interpreters(self): obj2 = b'eggs' print(id(obj2)) assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0' - queue2.put(obj2, sharedonly=True) + queue2.put(obj2, strictequiv=True) assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1' """)) self.assertEqual(len(queues.list_all()), 2) @@ -337,8 +337,8 @@ def test_put_cleared_with_subinterpreter(self): queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' - queue.put(obj1, sharedonly=True) - queue.put(obj2, sharedonly=True) + queue.put(obj1, strictequiv=True) + queue.put(obj2, strictequiv=True) """)) self.assertEqual(queue.qsize(), 2) @@ -360,12 +360,12 @@ def f(): break except queues.QueueEmpty: continue - queue2.put(obj, sharedonly=True) + queue2.put(obj, strictequiv=True) t = threading.Thread(target=f) t.start() orig = b'spam' - queue1.put(orig, sharedonly=True) + queue1.put(orig, strictequiv=True) obj = queue2.get() t.join() From 38f0754050e5a04618a0cdfcdbde519d137cbd95 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 15 Feb 2024 18:35:51 -0700 Subject: [PATCH 18/19] strictequiv -> syncobj --- Lib/test/support/interpreters/queues.py | 51 +++++++++--------- Lib/test/test_interpreters/test_queues.py | 66 +++++++++++------------ 2 files changed, 58 insertions(+), 59 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 1583be638e26aa..2cc616be337a50 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -35,15 +35,15 @@ class QueueFull(_queues.QueueFull, queue.Full): _SHARED_ONLY = 0 _PICKLED = 1 -def create(maxsize=0, *, strictequiv=False): +def create(maxsize=0, *, syncobj=False): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. - "strictequiv" sets the default for Queue.put() + "syncobj" sets the default for Queue.put() and Queue.put_nowait(). """ - fmt = _SHARED_ONLY if strictequiv else _PICKLED + fmt = _SHARED_ONLY if syncobj else _PICKLED qid = _queues.create(maxsize, fmt) return Queue(qid, _fmt=fmt) @@ -115,41 +115,40 @@ def qsize(self): return _queues.get_count(self._id) def put(self, obj, timeout=None, *, - strictequiv=None, + syncobj=None, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. This blocks while the queue is full. - If "strictequiv" is None (the default) then it uses the + If "syncobj" is None (the default) then it uses the queue's default, set with create_queue().. - If "strictequiv" is false then all objects are supported, + If "syncobj" is false then all objects are supported, at the expense of worse performance. - If "strictequiv" is true then the corresponding object returned - from Queue.get() will be strictly equivalent to the given obj. - In other words, the two objects will be indistinguishable from - each other, even if the object is mutable. The received object - may actually be the same object, or a copy (immutable values - only), or a proxy. - + If "syncobj" is true then the object must be "shareable". + Examples of "shareable" objects include the builtin singletons, + str, and memoryview. One benefit is that such objects are + passed through the queue efficiently. + + The key difference, though, is conceptual: the corresponding + object returned from Queue.get() will be strictly equivalent + to the given obj. In other words, the two objects will be + effectively indistinguishable from each other, even if the + object is mutable. The received object may actually be the + same object, or a copy (immutable values only), or a proxy. Regardless, the received object should be treated as though the original has been shared directly, whether or not it - actually is. That’s a slightly different and stronger promise - than just equality. - - This stricter guarantee requires that the provided object - must be "shareable". Examples of "shareable" types include - the builtin singletons, str, and memoryview. An additional - benefit is that such objects will be passed through the queue - efficiently. + actually is. That's a slightly different and stronger promise + than just (initial) equality, which is all "syncobj=False" + can promise. """ - if strictequiv is None: + if syncobj is None: fmt = self._fmt else: - fmt = _SHARED_ONLY if strictequiv else _PICKLED + fmt = _SHARED_ONLY if syncobj else _PICKLED if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -168,11 +167,11 @@ def put(self, obj, timeout=None, *, else: break - def put_nowait(self, obj, *, strictequiv=None): - if strictequiv is None: + def put_nowait(self, obj, *, syncobj=None): + if syncobj is None: fmt = self._fmt else: - fmt = _SHARED_ONLY if strictequiv else _PICKLED + fmt = _SHARED_ONLY if syncobj else _PICKLED if fmt is _PICKLED: obj = pickle.dumps(obj) try: diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index c5c44991187733..65b5435fb00b04 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -58,13 +58,13 @@ def test_shareable(self): with self.subTest('same interpreter'): queue2 = queues.create() - queue1.put(queue2, strictequiv=True) + queue1.put(queue2, syncobj=True) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): queue4 = queues.create() - queue1.put(queue4, strictequiv=True) + queue1.put(queue4, syncobj=True) out = _run_output(interp, dedent(""" queue4 = queue1.get() print(queue4.id) @@ -75,7 +75,7 @@ def test_shareable(self): with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" queue5 = queues.create() - queue1.put(queue5, strictequiv=True) + queue1.put(queue5, syncobj=True) print(queue5.id) """)) qid = int(out) @@ -118,7 +118,7 @@ class TestQueueOps(TestBase): def test_empty(self): queue = queues.create() before = queue.empty() - queue.put(None, strictequiv=True) + queue.put(None, syncobj=True) during = queue.empty() queue.get() after = queue.empty() @@ -133,7 +133,7 @@ def test_full(self): queue = queues.create(3) for _ in range(3): actual.append(queue.full()) - queue.put(None, strictequiv=True) + queue.put(None, syncobj=True) actual.append(queue.full()) for _ in range(3): queue.get() @@ -147,16 +147,16 @@ def test_qsize(self): queue = queues.create() for _ in range(3): actual.append(queue.qsize()) - queue.put(None, strictequiv=True) + queue.put(None, syncobj=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) - queue.put(None, strictequiv=True) + queue.put(None, syncobj=True) actual.append(queue.qsize()) for _ in range(3): queue.get() actual.append(queue.qsize()) - queue.put(None, strictequiv=True) + queue.put(None, syncobj=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) @@ -165,9 +165,9 @@ def test_qsize(self): def test_put_get_main(self): expected = list(range(20)) - for strictequiv in (True, False): - kwds = dict(strictequiv=strictequiv) - with self.subTest(f'strictequiv={strictequiv}'): + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): queue = queues.create() for i in range(20): queue.put(i, **kwds) @@ -176,9 +176,9 @@ def test_put_get_main(self): self.assertEqual(actual, expected) def test_put_timeout(self): - for strictequiv in (True, False): - kwds = dict(strictequiv=strictequiv) - with self.subTest(f'strictequiv={strictequiv}'): + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): queue = queues.create(2) queue.put(None, **kwds) queue.put(None, **kwds) @@ -188,9 +188,9 @@ def test_put_timeout(self): queue.put(None, **kwds) def test_put_nowait(self): - for strictequiv in (True, False): - kwds = dict(strictequiv=strictequiv) - with self.subTest(f'strictequiv={strictequiv}'): + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): queue = queues.create(2) queue.put_nowait(None, **kwds) queue.put_nowait(None, **kwds) @@ -199,7 +199,7 @@ def test_put_nowait(self): queue.get() queue.put_nowait(None, **kwds) - def test_put_strictequiv(self): + def test_put_syncobj(self): for obj in [ None, True, @@ -210,7 +210,7 @@ def test_put_strictequiv(self): ]: with self.subTest(repr(obj)): queue = queues.create() - queue.put(obj, strictequiv=True) + queue.put(obj, syncobj=True) obj2 = queue.get() self.assertEqual(obj2, obj) @@ -221,9 +221,9 @@ def test_put_strictequiv(self): with self.subTest(repr(obj)): queue = queues.create() with self.assertRaises(interpreters.NotShareableError): - queue.put(obj, strictequiv=True) + queue.put(obj, syncobj=True) - def test_put_not_strictequiv(self): + def test_put_not_syncobj(self): for obj in [ None, True, @@ -237,7 +237,7 @@ def test_put_not_strictequiv(self): ]: with self.subTest(repr(obj)): queue = queues.create() - queue.put(obj, strictequiv=False) + queue.put(obj, syncobj=False) obj2 = queue.get() self.assertEqual(obj2, obj) @@ -251,9 +251,9 @@ def test_get_nowait(self): with self.assertRaises(queues.QueueEmpty): queue.get_nowait() - def test_put_get_default_strictequiv(self): + def test_put_get_default_syncobj(self): expected = list(range(20)) - queue = queues.create(strictequiv=True) + queue = queues.create(syncobj=True) for i in range(20): queue.put(i) actual = [queue.get() for _ in range(20)] @@ -264,9 +264,9 @@ def test_put_get_default_strictequiv(self): with self.assertRaises(interpreters.NotShareableError): queue.put(obj) - def test_put_get_default_not_strictequiv(self): + def test_put_get_default_not_syncobj(self): expected = list(range(20)) - queue = queues.create(strictequiv=False) + queue = queues.create(syncobj=False) for i in range(20): queue.put(i) actual = [queue.get() for _ in range(20)] @@ -285,7 +285,7 @@ def test_put_get_same_interpreter(self): from test.support.interpreters import queues queue = queues.create() orig = b'spam' - queue.put(orig, strictequiv=True) + queue.put(orig, syncobj=True) obj = queue.get() assert obj == orig, 'expected: obj == orig' assert obj is not orig, 'expected: obj is not orig' @@ -298,7 +298,7 @@ def test_put_get_different_interpreters(self): self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' - queue1.put(obj1, strictequiv=True) + queue1.put(obj1, syncobj=True) out = _run_output( interp, @@ -315,7 +315,7 @@ def test_put_get_different_interpreters(self): obj2 = b'eggs' print(id(obj2)) assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0' - queue2.put(obj2, strictequiv=True) + queue2.put(obj2, syncobj=True) assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1' """)) self.assertEqual(len(queues.list_all()), 2) @@ -337,8 +337,8 @@ def test_put_cleared_with_subinterpreter(self): queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' - queue.put(obj1, strictequiv=True) - queue.put(obj2, strictequiv=True) + queue.put(obj1, syncobj=True) + queue.put(obj2, syncobj=True) """)) self.assertEqual(queue.qsize(), 2) @@ -360,12 +360,12 @@ def f(): break except queues.QueueEmpty: continue - queue2.put(obj, strictequiv=True) + queue2.put(obj, syncobj=True) t = threading.Thread(target=f) t.start() orig = b'spam' - queue1.put(orig, strictequiv=True) + queue1.put(orig, syncobj=True) obj = queue2.get() t.join() From bd06ba09fd154d6b4312e14e01ce44981fbda424 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 28 Feb 2024 09:19:01 -0700 Subject: [PATCH 19/19] Fix some tests. --- Lib/test/test_sys.py | 4 ++-- Lib/test/test_threading.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 71671a5a984256..38dcabd84d8170 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -729,7 +729,7 @@ def test_subinterp_intern_dynamically_allocated(self): self.assertIs(t, s) interp = interpreters.create() - interp.exec_sync(textwrap.dedent(f''' + interp.exec(textwrap.dedent(f''' import sys t = sys.intern({s!r}) assert id(t) != {id(s)}, (id(t), {id(s)}) @@ -744,7 +744,7 @@ def test_subinterp_intern_statically_allocated(self): t = sys.intern(s) interp = interpreters.create() - interp.exec_sync(textwrap.dedent(f''' + interp.exec(textwrap.dedent(f''' import sys t = sys.intern({s!r}) assert id(t) == {id(t)}, (id(t), {id(t)}) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 1ab223b81e939e..3b5c37c948c8c3 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1478,7 +1478,7 @@ def test_threads_join_with_no_main(self): DONE = b'D' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading import time