Skip to content

bpo-38880: List interpreters associated with a channel end #17323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ec5aa60
Attempt at implementing channel_list_interpreters()
LewisGaul Nov 12, 2019
ee78133
Fix some error cases
LewisGaul Nov 12, 2019
433a5b0
Fix issue with decreasing list items' references
LewisGaul Nov 12, 2019
64c1661
Fix issue with reference to interpreter
LewisGaul Nov 20, 2019
2c5c6df
Add ability to list interpreters on send or receive channels
LewisGaul Nov 20, 2019
7f439fa
Fix arg parsing in C code
LewisGaul Nov 20, 2019
d86ae97
Minor changes
LewisGaul Nov 20, 2019
c529c88
Add channel_list_interpreters() tests
Nov 21, 2019
168ebf3
Fix docstring
LewisGaul Nov 21, 2019
2a7b508
Improve naming in tests
LewisGaul Nov 21, 2019
4d6ac6e
Add macros for int64_t max etc
LewisGaul Nov 21, 2019
a44c2d2
Fix error handling
LewisGaul Nov 21, 2019
215300b
Expose _PyInterpreterState_LookUpID
LewisGaul Nov 21, 2019
9b124db
Fix bad git rebase (use _xxsubinterpreters module name)
LewisGaul Nov 21, 2019
76621c7
Remove decref of NULL
LewisGaul Nov 21, 2019
d4c51bf
Fix leftover broken code from nested loop
LewisGaul Nov 21, 2019
f3c2b34
Remove whitespace from tests
LewisGaul Nov 21, 2019
af10d1f
Add to Misc/ACKS
LewisGaul Nov 21, 2019
ac09b6c
Remove PY_INT64_T_MAX etc. and use stdint.h
LewisGaul Nov 22, 2019
8e69864
Remove whitespace
LewisGaul Nov 22, 2019
0546702
📜🤖 Added by blurb_it.
blurb-it[bot] Nov 22, 2019
ad511ef
Update news entry to remove mention of PEP 554
LewisGaul Nov 23, 2019
a1b7c3a
Remove unnecessary asserts
LewisGaul Nov 23, 2019
d9b3e27
Use single 'send' argument in channel_list_interpreters() API
LewisGaul Nov 23, 2019
f4990b0
Tidy up _channelends_list_interpreters() function
LewisGaul Nov 23, 2019
c7dbb04
Move variable declarations inline
LewisGaul Nov 23, 2019
6202ecd
Use _PyInterpreterID_New() instead of getting existing objects
LewisGaul Nov 23, 2019
a7cf61b
Merge branch 'list-channel-interps' of github.com:LewisGaul/cpython i…
LewisGaul Nov 23, 2019
357101f
Markups - remove check for number of open channels and improve test s…
LewisGaul Dec 14, 2019
faca1df
Add more listing subinterpreter tests
LewisGaul Jan 21, 2020
75e791f
Implementation rewrite upon Eric's suggestion. Just one testcase now …
LewisGaul Apr 18, 2020
79f5d35
Fix issue with ChannelClosedError not being raised when 'send' end of…
LewisGaul Apr 28, 2020
5481e82
Add testcase for listing associated interpreters with basic closed ch…
LewisGaul Apr 28, 2020
79be8a0
Fix a refleak.
ericsnowcurrently Apr 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions Lib/test/test__xxsubinterpreters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,185 @@ def test_ids_global(self):

self.assertEqual(cid2, int(cid1) + 1)

def test_channel_list_interpreters_none(self):
"""Test listing interpreters for a channel with no associations."""
# Test for channel with no associated interpreters.
cid = interpreters.channel_create()
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [])
self.assertEqual(recv_interps, [])

def test_channel_list_interpreters_basic(self):
"""Test basic listing channel interpreters."""
interp0 = interpreters.get_main()
cid = interpreters.channel_create()
interpreters.channel_send(cid, "send")
# Test for a channel that has one end associated to an interpreter.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [])

interp1 = interpreters.create()
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
# Test for channel that has boths ends associated to an interpreter.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [interp1])

def test_channel_list_interpreters_multiple(self):
"""Test listing interpreters for a channel with many associations."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
interp2 = interpreters.create()
interp3 = interpreters.create()
cid = interpreters.channel_create()

interpreters.channel_send(cid, "send")
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.channel_send({cid}, "send")
"""))
_run_output(interp2, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
_run_output(interp3, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(set(send_interps), {interp0, interp1})
self.assertEqual(set(recv_interps), {interp2, interp3})

def test_channel_list_interpreters_destroyed(self):
"""Test listing channel interpreters with a destroyed interpreter."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
cid = interpreters.channel_create()
interpreters.channel_send(cid, "send")
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
# Should be one interpreter associated with each end.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [interp1])

interpreters.destroy(interp1)
# Destroyed interpreter should not be listed.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [])

def test_channel_list_interpreters_released(self):
"""Test listing channel interpreters with a released channel."""
# Set up one channel with main interpreter on the send end and two
# subinterpreters on the receive end.
interp0 = interpreters.get_main()
interp1 = interpreters.create()
interp2 = interpreters.create()
cid = interpreters.channel_create()
interpreters.channel_send(cid, "data")
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
interpreters.channel_send(cid, "data")
_run_output(interp2, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
# Check the setup.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 2)

# Release the main interpreter from the send end.
interpreters.channel_release(cid, send=True)
# Send end should have no associated interpreters.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 0)
self.assertEqual(len(recv_interps), 2)

# Release one of the subinterpreters from the receive end.
_run_output(interp2, dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.channel_release({cid})
"""))
# Receive end should have the released interpreter removed.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 0)
self.assertEqual(recv_interps, [interp1])

def test_channel_list_interpreters_closed(self):
"""Test listing channel interpreters with a closed channel."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
cid = interpreters.channel_create()
# Put something in the channel so that it's not empty.
interpreters.channel_send(cid, "send")

# Check initial state.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 0)

# Force close the channel.
interpreters.channel_close(cid, force=True)
# Both ends should raise an error.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=False)

def test_channel_list_interpreters_closed_send_end(self):
"""Test listing channel interpreters with a channel's send end closed."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
cid = interpreters.channel_create()
# Put something in the channel so that it's not empty.
interpreters.channel_send(cid, "send")

# Check initial state.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 0)

# Close the send end of the channel.
interpreters.channel_close(cid, send=True)
# Send end should raise an error.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)
# Receive end should not be closed (since channel is not empty).
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(recv_interps), 0)

# Close the receive end of the channel from a subinterpreter.
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.channel_close({cid}, force=True)
"""))
# Both ends should raise an error.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=False)

####################

def test_send_recv_main(self):
Expand Down Expand Up @@ -1515,6 +1694,23 @@ def test_close_used_multiple_times_by_single_user(self):
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_recv(cid)

def test_channel_list_interpreters_invalid_channel(self):
cid = interpreters.channel_create()
# Test for invalid channel ID.
with self.assertRaises(interpreters.ChannelNotFoundError):
interpreters.channel_list_interpreters(1000, send=True)

interpreters.channel_close(cid)
# Test for a channel that has been closed.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)

def test_channel_list_interpreters_invalid_args(self):
# Tests for invalid arguments passed to the API.
cid = interpreters.channel_create()
with self.assertRaises(TypeError):
interpreters.channel_list_interpreters(cid)


class ChannelReleaseTests(TestBase):

Expand Down
2 changes: 2 additions & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ Rodolpho Eckhardt
Ulrich Eckhardt
David Edelsohn
John Edmonds
Benjamin Edwards
Grant Edwards
Zvi Effron
John Ehresman
Expand Down Expand Up @@ -557,6 +558,7 @@ Lars Marius Garshol
Jake Garver
Dan Gass
Andrew Gaul
Lewis Gaul
Matthieu Gautier
Stephen M. Gava
Xavier de Gaye
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the ability to list interpreters associated with channel ends in the internal subinterpreters module.
89 changes: 83 additions & 6 deletions Modules/_xxsubinterpretersmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)

typedef struct _channelassociations {
// Note that the list entries are never removed for interpreter
// for which the channel is closed. This should be a problem in
// for which the channel is closed. This should not be a problem in
// practice. Also, a channel isn't automatically closed when an
// interpreter is destroyed.
int64_t numsendopen;
Expand Down Expand Up @@ -1177,11 +1177,6 @@ _channels_list_all(_channels *channels, int64_t *count)
{
int64_t *cids = NULL;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
int64_t numopen = channels->numopen;
if (numopen >= PY_SSIZE_T_MAX) {
PyErr_SetString(PyExc_RuntimeError, "too many channels open");
goto done;
}
int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
if (ids == NULL) {
goto done;
Expand Down Expand Up @@ -1393,6 +1388,24 @@ _channel_close(_channels *channels, int64_t id, int end, int force)
return _channels_close(channels, id, NULL, end, force);
}

static int
_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
int send)
{
_PyChannelState *chan = _channels_lookup(channels, cid, NULL);
if (chan == NULL) {
return -1;
} else if (send && chan->closing != NULL) {
PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
return -1;
}

_channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
interp, NULL);

return (end != NULL && end->open);
}

/* ChannelID class */

static PyTypeObject ChannelIDtype;
Expand Down Expand Up @@ -2323,6 +2336,68 @@ PyDoc_STRVAR(channel_list_all_doc,
\n\
Return the list of all IDs for active channels.");

static PyObject *
channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "send", NULL};
int64_t cid; /* Channel ID */
int send = 0; /* Send or receive end? */
int64_t id;
PyObject *ids, *id_obj;
PyInterpreterState *interp;

if (!PyArg_ParseTupleAndKeywords(
args, kwds, "O&$p:channel_list_interpreters",
kwlist, channel_id_converter, &cid, &send)) {
return NULL;
}

ids = PyList_New(0);
if (ids == NULL) {
goto except;
}

interp = PyInterpreterState_Head();
while (interp != NULL) {
id = PyInterpreterState_GetID(interp);
assert(id >= 0);
int res = _channel_is_associated(&_globals.channels, cid, id, send);
if (res < 0) {
goto except;
}
if (res) {
id_obj = _PyInterpreterState_GetIDObject(interp);
if (id_obj == NULL) {
goto except;
}
res = PyList_Insert(ids, 0, id_obj);
Py_DECREF(id_obj);
if (res < 0) {
goto except;
}
}
interp = PyInterpreterState_Next(interp);
}

goto finally;

except:
Py_XDECREF(ids);
ids = NULL;

finally:
return ids;
}

PyDoc_STRVAR(channel_list_interpreters_doc,
"channel_list_interpreters(cid, *, send) -> [id]\n\
\n\
Return the list of all interpreter IDs associated with an end of the channel.\n\
\n\
The 'send' argument should be a boolean indicating whether to use the send or\n\
receive end.");


static PyObject *
channel_send(PyObject *self, PyObject *args, PyObject *kwds)
{
Expand Down Expand Up @@ -2476,6 +2551,8 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
{"channel_list_all", channel_list_all,
METH_NOARGS, channel_list_all_doc},
{"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters,
METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
{"channel_send", (PyCFunction)(void(*)(void))channel_send,
METH_VARARGS | METH_KEYWORDS, channel_send_doc},
{"channel_recv", (PyCFunction)(void(*)(void))channel_recv,
Expand Down