From 3031b6c8707695af784fd5e075862622d430e294 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 21 Sep 2023 14:06:45 -0600 Subject: [PATCH 1/6] Add _channels.get_info(). --- Lib/test/support/interpreters.py | 4 + Modules/_xxinterpchannelsmodule.c | 155 ++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index f8f42c0e02479c..83523e18c8c2e2 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -161,6 +161,10 @@ def __eq__(self, other): def id(self): return self._id + @property + def _info(self): + return _channels.get_info(self._id) + _NOT_SET = object() diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index b618592bf00279..7f823d2276153d 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -402,6 +402,7 @@ typedef struct { PyTypeObject *recv_channel_type; /* heap types */ + PyTypeObject *ChannelInfoType; PyTypeObject *ChannelIDType; PyTypeObject *XIBufferViewType; @@ -441,6 +442,7 @@ static int traverse_module_state(module_state *state, visitproc visit, void *arg) { /* heap types */ + Py_VISIT(state->ChannelInfoType); Py_VISIT(state->ChannelIDType); Py_VISIT(state->XIBufferViewType); @@ -457,10 +459,12 @@ traverse_module_state(module_state *state, visitproc visit, void *arg) static int clear_module_state(module_state *state) { + /* external types */ Py_CLEAR(state->send_channel_type); Py_CLEAR(state->recv_channel_type); /* heap types */ + Py_CLEAR(state->ChannelInfoType); if (state->ChannelIDType != NULL) { (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType); } @@ -2088,6 +2092,117 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid, } +/* channel info */ + +struct channel_info { + struct { + // 1: closed; -1: closing + int closed; + } status; + Py_ssize_t count; +}; + +static int +_channel_get_info(_channels *channels, int64_t cid, struct channel_info *info) +{ + int err = 0; + *info = (struct channel_info){0}; + + // Hold the global lock until we're done. + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + + // Find the channel. + _channelref *ref = _channelref_find(channels->head, cid, NULL); + if (ref == NULL) { + err = ERR_CHANNEL_NOT_FOUND; + goto finally; + } + _channel_state *chan = ref->chan; + + // Check if open. + if (chan == NULL) { + info->status.closed = 1; + goto finally; + } + if (!chan->open) { + assert(chan->queue->count == 0); + info->status.closed = 1; + goto finally; + } + if (chan->closing != NULL) { + assert(chan->queue->count > 0); + info->status.closed = -1; + } + else { + info->status.closed = 0; + } + + // Get the number of queued objects. + info->count = chan->queue->count; + +finally: + PyThread_release_lock(channels->mutex); + return err; +} + +PyDoc_STRVAR(channel_info_doc, +"ChannelInfo\n\ +\n\ +A named tuple of a channel's state."); + +static PyStructSequence_Field channel_info_fields[] = { + {"open", "both ends are open"}, + {"closing", "send is closed, recv is non-empty"}, + {"closed", "both ends are closed"}, + {"count", "queued objects"}, + {0} +}; + +static PyStructSequence_Desc channel_info_desc = { + .name = "ChannelInfo", + .doc = channel_info_doc, + .fields = channel_info_fields, + .n_in_sequence = 4, +}; + +static PyObject * +new_channel_info(PyObject *mod, struct channel_info *info) +{ + module_state *state = get_module_state(mod); + if (state == NULL) { + return NULL; + } + + assert(state->ChannelInfoType != NULL); + PyObject *self = PyStructSequence_New(state->ChannelInfoType); + if (self == NULL) { + return NULL; + } + + int pos = 0; +#define SET_BOOL(val) \ + PyStructSequence_SET_ITEM(self, pos++, \ + Py_NewRef(val ? Py_True : Py_False)) +#define SET_COUNT(val) \ + do { \ + PyObject *obj = PyLong_FromLongLong(val); \ + if (obj == NULL) { \ + Py_CLEAR(info); \ + return NULL; \ + } \ + PyStructSequence_SET_ITEM(self, pos++, obj); \ + } while(0) + SET_BOOL(info->status.closed == 0); + SET_BOOL(info->status.closed == -1); + SET_BOOL(info->status.closed == 1); + SET_COUNT(info->count); +#undef SET_COUNT +#undef SET_BOOL + assert(!PyErr_Occurred()); + return self; +} + + /* ChannelID class */ typedef struct channelid { @@ -3079,6 +3194,33 @@ Close the channel for the current interpreter. 'send' and 'recv'\n\ (bool) may be used to indicate the ends to close. By default both\n\ ends are closed. Closing an already closed end is a noop."); +static PyObject * +channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", NULL}; + struct channel_id_converter_data cid_data = { + .module = self, + }; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&:_get_info", kwlist, + channel_id_converter, &cid_data)) { + return NULL; + } + int64_t cid = cid_data.cid; + + struct channel_info info; + int err = _channel_get_info(&_globals.channels, cid, &info); + if (handle_channel_error(err, self, cid)) { + return NULL; + } + return new_channel_info(self, &info); +} + +PyDoc_STRVAR(channelsmod_get_info_doc, +"get_info(cid)\n\ +\n\ +Return details about the channel."); + static PyObject * channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds) { @@ -3143,6 +3285,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc}, {"release", _PyCFunction_CAST(channelsmod_release), METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc}, + {"get_info", _PyCFunction_CAST(channelsmod_get_info), + METH_VARARGS | METH_KEYWORDS, channelsmod_get_info_doc}, {"_channel_id", _PyCFunction_CAST(channelsmod__channel_id), METH_VARARGS | METH_KEYWORDS, NULL}, {"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types), @@ -3179,6 +3323,15 @@ module_exec(PyObject *mod) /* Add other types */ + // ChannelInfo + state->ChannelInfoType = PyStructSequence_NewType(&channel_info_desc); + if (state->ChannelInfoType == NULL) { + goto error; + } + if (PyModule_AddType(mod, state->ChannelInfoType) < 0) { + goto error; + } + // ChannelID state->ChannelIDType = add_new_type( mod, &channelid_typespec, _channelid_shared, xid_classes); @@ -3186,12 +3339,14 @@ module_exec(PyObject *mod) goto error; } + // XIBufferView state->XIBufferViewType = add_new_type(mod, &XIBufferViewType_spec, NULL, xid_classes); if (state->XIBufferViewType == NULL) { goto error; } + // Register external types. if (register_builtin_xid_types(xid_classes) < 0) { goto error; } From 7932bf8aff85471cc9426c619a2fffc3c29d4c66 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 21 Sep 2023 14:57:43 -0600 Subject: [PATCH 2/6] Add current interpreter info. --- Modules/_xxinterpchannelsmodule.c | 38 ++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index 7f823d2276153d..4eb4a75051663c 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -2098,6 +2098,11 @@ struct channel_info { struct { // 1: closed; -1: closing int closed; + struct { + // 1: associated; -1: released + int send; + int recv; + } cur; } status; Py_ssize_t count; }; @@ -2108,6 +2113,13 @@ _channel_get_info(_channels *channels, int64_t cid, struct channel_info *info) int err = 0; *info = (struct channel_info){0}; + // Get the current interpreter. + PyInterpreterState *interp = _get_current_interp(); + if (interp == NULL) { + return -1; + } + Py_ssize_t interpid = PyInterpreterState_GetID(interp); + // Hold the global lock until we're done. PyThread_acquire_lock(channels->mutex, WAIT_LOCK); @@ -2140,6 +2152,22 @@ _channel_get_info(_channels *channels, int64_t cid, struct channel_info *info) // Get the number of queued objects. info->count = chan->queue->count; + // Get the current ends status. + _channelend *send = _channelend_find(chan->ends->send, interpid, NULL); + if (send == NULL) { + info->status.cur.send = 0; + } + else { + info->status.cur.send = send->open ? 1 : -1; + } + _channelend *recv = _channelend_find(chan->ends->recv, interpid, NULL); + if (recv == NULL) { + info->status.cur.recv = 0; + } + else { + info->status.cur.recv = recv->open ? 1 : -1; + } + finally: PyThread_release_lock(channels->mutex); return err; @@ -2155,6 +2183,10 @@ static PyStructSequence_Field channel_info_fields[] = { {"closing", "send is closed, recv is non-empty"}, {"closed", "both ends are closed"}, {"count", "queued objects"}, + {"send_associated", "current interpreter is bound to the send end"}, + {"send_released", "current interpreter *was* bound to the send end"}, + {"recv_associated", "current interpreter is bound to the recv end"}, + {"recv_released", "current interpreter *was* bound to the recv end"}, {0} }; @@ -2162,7 +2194,7 @@ static PyStructSequence_Desc channel_info_desc = { .name = "ChannelInfo", .doc = channel_info_doc, .fields = channel_info_fields, - .n_in_sequence = 4, + .n_in_sequence = 8, }; static PyObject * @@ -2196,6 +2228,10 @@ new_channel_info(PyObject *mod, struct channel_info *info) SET_BOOL(info->status.closed == -1); SET_BOOL(info->status.closed == 1); SET_COUNT(info->count); + SET_BOOL(info->status.cur.send == 1); + SET_BOOL(info->status.cur.send == -1); + SET_BOOL(info->status.cur.recv == 1); + SET_BOOL(info->status.cur.recv == -1); #undef SET_COUNT #undef SET_BOOL assert(!PyErr_Occurred()); From 06de92bfcad17288685dc645590f370f7d821b53 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 21 Sep 2023 15:27:15 -0600 Subject: [PATCH 3/6] Add the counts for all ends. --- Modules/_xxinterpchannelsmodule.c | 109 ++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 13 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index 4eb4a75051663c..3b8dd427958526 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -2098,6 +2098,16 @@ struct channel_info { struct { // 1: closed; -1: closing int closed; + struct { + Py_ssize_t nsend_only; // not released + Py_ssize_t nsend_only_released; + Py_ssize_t nrecv_only; // not released + Py_ssize_t nrecv_only_released; + Py_ssize_t nboth; // not released + Py_ssize_t nboth_released; + Py_ssize_t nboth_send_released; + Py_ssize_t nboth_recv_released; + } all; struct { // 1: associated; -1: released int send; @@ -2152,20 +2162,68 @@ _channel_get_info(_channels *channels, int64_t cid, struct channel_info *info) // Get the number of queued objects. info->count = chan->queue->count; - // Get the current ends status. - _channelend *send = _channelend_find(chan->ends->send, interpid, NULL); - if (send == NULL) { - info->status.cur.send = 0; - } - else { - info->status.cur.send = send->open ? 1 : -1; - } - _channelend *recv = _channelend_find(chan->ends->recv, interpid, NULL); - if (recv == NULL) { - info->status.cur.recv = 0; + // Get the ends statuses. + assert(info->status.cur.send == 0); + assert(info->status.cur.recv == 0); + _channelend *send = chan->ends->send; + while (send != NULL) { + if (send->interpid == interpid) { + info->status.cur.send = send->open ? 1 : -1; + } + + if (send->open) { + info->status.all.nsend_only += 1; + } + else { + info->status.all.nsend_only_released += 1; + } + send = send->next; } - else { - info->status.cur.recv = recv->open ? 1 : -1; + _channelend *recv = chan->ends->recv; + while (recv != NULL) { + if (recv->interpid == interpid) { + info->status.cur.recv = recv->open ? 1 : -1; + } + + // XXX This is O(n*n). Why do we have 2 linked lists? + _channelend *send = chan->ends->send; + while (send != NULL) { + if (send->interpid == recv->interpid) { + break; + } + send = send->next; + } + if (send == NULL) { + if (recv->open) { + info->status.all.nrecv_only += 1; + } + else { + info->status.all.nrecv_only_released += 1; + } + } + else { + if (recv->open) { + if (send->open) { + info->status.all.nboth += 1; + info->status.all.nsend_only -= 1; + } + else { + info->status.all.nboth_recv_released += 1; + info->status.all.nsend_only_released -= 1; + } + } + else { + if (send->open) { + info->status.all.nboth_send_released += 1; + info->status.all.nsend_only -= 1; + } + else { + info->status.all.nboth_released += 1; + info->status.all.nsend_only_released -= 1; + } + } + } + recv = recv->next; } finally: @@ -2183,6 +2241,23 @@ static PyStructSequence_Field channel_info_fields[] = { {"closing", "send is closed, recv is non-empty"}, {"closed", "both ends are closed"}, {"count", "queued objects"}, + + {"num_interp_send", "interpreters bound to the send end"}, + {"num_interp_send_released", + "interpreters bound to the send end and released"}, + + {"num_interp_recv", "interpreters bound to the send end"}, + {"num_interp_recv_released", + "interpreters bound to the send end and released"}, + + {"num_interp_both", "interpreters bound to both ends"}, + {"num_interp_both_released", + "interpreters bound to both ends and released_from_both"}, + {"num_interp_both_send_released", + "interpreters bound to both ends and released_from_the send end"}, + {"num_interp_both_recv_released", + "interpreters bound to both ends and released_from_the recv end"}, + {"send_associated", "current interpreter is bound to the send end"}, {"send_released", "current interpreter *was* bound to the send end"}, {"recv_associated", "current interpreter is bound to the recv end"}, @@ -2228,6 +2303,14 @@ new_channel_info(PyObject *mod, struct channel_info *info) SET_BOOL(info->status.closed == -1); SET_BOOL(info->status.closed == 1); SET_COUNT(info->count); + SET_COUNT(info->status.all.nsend_only); + SET_COUNT(info->status.all.nsend_only_released); + SET_COUNT(info->status.all.nrecv_only); + SET_COUNT(info->status.all.nrecv_only_released); + SET_COUNT(info->status.all.nboth); + SET_COUNT(info->status.all.nboth_released); + SET_COUNT(info->status.all.nboth_send_released); + SET_COUNT(info->status.all.nboth_recv_released); SET_BOOL(info->status.cur.send == 1); SET_BOOL(info->status.cur.send == -1); SET_BOOL(info->status.cur.recv == 1); From a70509a9250ed295792987551268f593e6795bc0 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 21 Sep 2023 15:34:29 -0600 Subject: [PATCH 4/6] Add *Channel.is_closed(). --- Lib/test/support/interpreters.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 83523e18c8c2e2..860b2bb927f61c 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -165,6 +165,10 @@ def id(self): def _info(self): return _channels.get_info(self._id) + @property + def is_closed(self): + return self._info.closed + _NOT_SET = object() @@ -217,6 +221,11 @@ class SendChannel(_ChannelEnd): _end = 'send' + @property + def is_closed(self): + info = self._info + return info.closed or info.closing + def send(self, obj, timeout=None): """Send the object (i.e. its data) to the channel's receiving end. From 09b7812df2bd38067511f2204050e574025fe4b2 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 21 Sep 2023 16:09:02 -0600 Subject: [PATCH 5/6] Make sure __module__ gets set. --- Modules/_xxinterpchannelsmodule.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index 3b8dd427958526..8dd8050752a717 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -223,8 +223,8 @@ static PyTypeObject * add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared, struct xid_class_registry *classes) { - PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass( - NULL, mod, spec, NULL); + PyTypeObject *cls = (PyTypeObject *)PyType_FromModuleAndSpec( + mod, spec, NULL); if (cls == NULL) { return NULL; } @@ -2266,7 +2266,7 @@ static PyStructSequence_Field channel_info_fields[] = { }; static PyStructSequence_Desc channel_info_desc = { - .name = "ChannelInfo", + .name = MODULE_NAME ".ChannelInfo", .doc = channel_info_doc, .fields = channel_info_fields, .n_in_sequence = 8, From c3555304e11d9b1ecff93648c58fc3df6f9ec15f Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 17 Oct 2023 17:02:29 -0600 Subject: [PATCH 6/6] Add a test. --- Lib/test/test_interpreters.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py index d2d52ec9a7808f..63d41cb264d966 100644 --- a/Lib/test/test_interpreters.py +++ b/Lib/test/test_interpreters.py @@ -850,6 +850,19 @@ def test_shareable(self): self.assertEqual(rch2, rch) self.assertEqual(sch2, sch) + def test_is_closed(self): + rch, sch = interpreters.create_channel() + rbefore = rch.is_closed + sbefore = sch.is_closed + rch.close() + rafter = rch.is_closed + safter = sch.is_closed + + self.assertFalse(rbefore) + self.assertFalse(sbefore) + self.assertTrue(rafter) + self.assertTrue(safter) + class TestRecvChannelAttrs(TestBase):