Skip to content

Commit f7bbf58

Browse files
authored
bpo-38880: List interpreters associated with a channel end (GH-17323)
This PR adds the functionality requested by ericsnowcurrently/multi-core-python#52. Automerge-Triggered-By: @ericsnowcurrently
1 parent 49f70db commit f7bbf58

File tree

4 files changed

+282
-6
lines changed

4 files changed

+282
-6
lines changed

Lib/test/test__xxsubinterpreters.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,185 @@ def test_ids_global(self):
12071207

12081208
self.assertEqual(cid2, int(cid1) + 1)
12091209

1210+
def test_channel_list_interpreters_none(self):
1211+
"""Test listing interpreters for a channel with no associations."""
1212+
# Test for channel with no associated interpreters.
1213+
cid = interpreters.channel_create()
1214+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1215+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1216+
self.assertEqual(send_interps, [])
1217+
self.assertEqual(recv_interps, [])
1218+
1219+
def test_channel_list_interpreters_basic(self):
1220+
"""Test basic listing channel interpreters."""
1221+
interp0 = interpreters.get_main()
1222+
cid = interpreters.channel_create()
1223+
interpreters.channel_send(cid, "send")
1224+
# Test for a channel that has one end associated to an interpreter.
1225+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1226+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1227+
self.assertEqual(send_interps, [interp0])
1228+
self.assertEqual(recv_interps, [])
1229+
1230+
interp1 = interpreters.create()
1231+
_run_output(interp1, dedent(f"""
1232+
import _xxsubinterpreters as _interpreters
1233+
obj = _interpreters.channel_recv({cid})
1234+
"""))
1235+
# Test for channel that has boths ends associated to an interpreter.
1236+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1237+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1238+
self.assertEqual(send_interps, [interp0])
1239+
self.assertEqual(recv_interps, [interp1])
1240+
1241+
def test_channel_list_interpreters_multiple(self):
1242+
"""Test listing interpreters for a channel with many associations."""
1243+
interp0 = interpreters.get_main()
1244+
interp1 = interpreters.create()
1245+
interp2 = interpreters.create()
1246+
interp3 = interpreters.create()
1247+
cid = interpreters.channel_create()
1248+
1249+
interpreters.channel_send(cid, "send")
1250+
_run_output(interp1, dedent(f"""
1251+
import _xxsubinterpreters as _interpreters
1252+
_interpreters.channel_send({cid}, "send")
1253+
"""))
1254+
_run_output(interp2, dedent(f"""
1255+
import _xxsubinterpreters as _interpreters
1256+
obj = _interpreters.channel_recv({cid})
1257+
"""))
1258+
_run_output(interp3, dedent(f"""
1259+
import _xxsubinterpreters as _interpreters
1260+
obj = _interpreters.channel_recv({cid})
1261+
"""))
1262+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1263+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1264+
self.assertEqual(set(send_interps), {interp0, interp1})
1265+
self.assertEqual(set(recv_interps), {interp2, interp3})
1266+
1267+
def test_channel_list_interpreters_destroyed(self):
1268+
"""Test listing channel interpreters with a destroyed interpreter."""
1269+
interp0 = interpreters.get_main()
1270+
interp1 = interpreters.create()
1271+
cid = interpreters.channel_create()
1272+
interpreters.channel_send(cid, "send")
1273+
_run_output(interp1, dedent(f"""
1274+
import _xxsubinterpreters as _interpreters
1275+
obj = _interpreters.channel_recv({cid})
1276+
"""))
1277+
# Should be one interpreter associated with each end.
1278+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1279+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1280+
self.assertEqual(send_interps, [interp0])
1281+
self.assertEqual(recv_interps, [interp1])
1282+
1283+
interpreters.destroy(interp1)
1284+
# Destroyed interpreter should not be listed.
1285+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1286+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1287+
self.assertEqual(send_interps, [interp0])
1288+
self.assertEqual(recv_interps, [])
1289+
1290+
def test_channel_list_interpreters_released(self):
1291+
"""Test listing channel interpreters with a released channel."""
1292+
# Set up one channel with main interpreter on the send end and two
1293+
# subinterpreters on the receive end.
1294+
interp0 = interpreters.get_main()
1295+
interp1 = interpreters.create()
1296+
interp2 = interpreters.create()
1297+
cid = interpreters.channel_create()
1298+
interpreters.channel_send(cid, "data")
1299+
_run_output(interp1, dedent(f"""
1300+
import _xxsubinterpreters as _interpreters
1301+
obj = _interpreters.channel_recv({cid})
1302+
"""))
1303+
interpreters.channel_send(cid, "data")
1304+
_run_output(interp2, dedent(f"""
1305+
import _xxsubinterpreters as _interpreters
1306+
obj = _interpreters.channel_recv({cid})
1307+
"""))
1308+
# Check the setup.
1309+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1310+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1311+
self.assertEqual(len(send_interps), 1)
1312+
self.assertEqual(len(recv_interps), 2)
1313+
1314+
# Release the main interpreter from the send end.
1315+
interpreters.channel_release(cid, send=True)
1316+
# Send end should have no associated interpreters.
1317+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1318+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1319+
self.assertEqual(len(send_interps), 0)
1320+
self.assertEqual(len(recv_interps), 2)
1321+
1322+
# Release one of the subinterpreters from the receive end.
1323+
_run_output(interp2, dedent(f"""
1324+
import _xxsubinterpreters as _interpreters
1325+
_interpreters.channel_release({cid})
1326+
"""))
1327+
# Receive end should have the released interpreter removed.
1328+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1329+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1330+
self.assertEqual(len(send_interps), 0)
1331+
self.assertEqual(recv_interps, [interp1])
1332+
1333+
def test_channel_list_interpreters_closed(self):
1334+
"""Test listing channel interpreters with a closed channel."""
1335+
interp0 = interpreters.get_main()
1336+
interp1 = interpreters.create()
1337+
cid = interpreters.channel_create()
1338+
# Put something in the channel so that it's not empty.
1339+
interpreters.channel_send(cid, "send")
1340+
1341+
# Check initial state.
1342+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1343+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1344+
self.assertEqual(len(send_interps), 1)
1345+
self.assertEqual(len(recv_interps), 0)
1346+
1347+
# Force close the channel.
1348+
interpreters.channel_close(cid, force=True)
1349+
# Both ends should raise an error.
1350+
with self.assertRaises(interpreters.ChannelClosedError):
1351+
interpreters.channel_list_interpreters(cid, send=True)
1352+
with self.assertRaises(interpreters.ChannelClosedError):
1353+
interpreters.channel_list_interpreters(cid, send=False)
1354+
1355+
def test_channel_list_interpreters_closed_send_end(self):
1356+
"""Test listing channel interpreters with a channel's send end closed."""
1357+
interp0 = interpreters.get_main()
1358+
interp1 = interpreters.create()
1359+
cid = interpreters.channel_create()
1360+
# Put something in the channel so that it's not empty.
1361+
interpreters.channel_send(cid, "send")
1362+
1363+
# Check initial state.
1364+
send_interps = interpreters.channel_list_interpreters(cid, send=True)
1365+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1366+
self.assertEqual(len(send_interps), 1)
1367+
self.assertEqual(len(recv_interps), 0)
1368+
1369+
# Close the send end of the channel.
1370+
interpreters.channel_close(cid, send=True)
1371+
# Send end should raise an error.
1372+
with self.assertRaises(interpreters.ChannelClosedError):
1373+
interpreters.channel_list_interpreters(cid, send=True)
1374+
# Receive end should not be closed (since channel is not empty).
1375+
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1376+
self.assertEqual(len(recv_interps), 0)
1377+
1378+
# Close the receive end of the channel from a subinterpreter.
1379+
_run_output(interp1, dedent(f"""
1380+
import _xxsubinterpreters as _interpreters
1381+
_interpreters.channel_close({cid}, force=True)
1382+
"""))
1383+
# Both ends should raise an error.
1384+
with self.assertRaises(interpreters.ChannelClosedError):
1385+
interpreters.channel_list_interpreters(cid, send=True)
1386+
with self.assertRaises(interpreters.ChannelClosedError):
1387+
interpreters.channel_list_interpreters(cid, send=False)
1388+
12101389
####################
12111390

12121391
def test_send_recv_main(self):
@@ -1540,6 +1719,23 @@ def test_close_used_multiple_times_by_single_user(self):
15401719
with self.assertRaises(interpreters.ChannelClosedError):
15411720
interpreters.channel_recv(cid)
15421721

1722+
def test_channel_list_interpreters_invalid_channel(self):
1723+
cid = interpreters.channel_create()
1724+
# Test for invalid channel ID.
1725+
with self.assertRaises(interpreters.ChannelNotFoundError):
1726+
interpreters.channel_list_interpreters(1000, send=True)
1727+
1728+
interpreters.channel_close(cid)
1729+
# Test for a channel that has been closed.
1730+
with self.assertRaises(interpreters.ChannelClosedError):
1731+
interpreters.channel_list_interpreters(cid, send=True)
1732+
1733+
def test_channel_list_interpreters_invalid_args(self):
1734+
# Tests for invalid arguments passed to the API.
1735+
cid = interpreters.channel_create()
1736+
with self.assertRaises(TypeError):
1737+
interpreters.channel_list_interpreters(cid)
1738+
15431739

15441740
class ChannelReleaseTests(TestBase):
15451741

Misc/ACKS

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ Rodolpho Eckhardt
456456
Ulrich Eckhardt
457457
David Edelsohn
458458
John Edmonds
459+
Benjamin Edwards
459460
Grant Edwards
460461
Zvi Effron
461462
John Ehresman
@@ -570,6 +571,7 @@ Jake Garver
570571
Dan Gass
571572
Tim Gates
572573
Andrew Gaul
574+
Lewis Gaul
573575
Matthieu Gautier
574576
Stephen M. Gava
575577
Xavier de Gaye
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added the ability to list interpreters associated with channel ends in the internal subinterpreters module.

Modules/_xxsubinterpretersmodule.c

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
538538

539539
typedef struct _channelassociations {
540540
// Note that the list entries are never removed for interpreter
541-
// for which the channel is closed. This should be a problem in
541+
// for which the channel is closed. This should not be a problem in
542542
// practice. Also, a channel isn't automatically closed when an
543543
// interpreter is destroyed.
544544
int64_t numsendopen;
@@ -1179,11 +1179,6 @@ _channels_list_all(_channels *channels, int64_t *count)
11791179
{
11801180
int64_t *cids = NULL;
11811181
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1182-
int64_t numopen = channels->numopen;
1183-
if (numopen >= PY_SSIZE_T_MAX) {
1184-
PyErr_SetString(PyExc_RuntimeError, "too many channels open");
1185-
goto done;
1186-
}
11871182
int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
11881183
if (ids == NULL) {
11891184
goto done;
@@ -1392,6 +1387,24 @@ _channel_close(_channels *channels, int64_t id, int end, int force)
13921387
return _channels_close(channels, id, NULL, end, force);
13931388
}
13941389

1390+
static int
1391+
_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
1392+
int send)
1393+
{
1394+
_PyChannelState *chan = _channels_lookup(channels, cid, NULL);
1395+
if (chan == NULL) {
1396+
return -1;
1397+
} else if (send && chan->closing != NULL) {
1398+
PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1399+
return -1;
1400+
}
1401+
1402+
_channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
1403+
interp, NULL);
1404+
1405+
return (end != NULL && end->open);
1406+
}
1407+
13951408
/* ChannelID class */
13961409

13971410
static PyTypeObject ChannelIDtype;
@@ -2323,6 +2336,68 @@ PyDoc_STRVAR(channel_list_all_doc,
23232336
\n\
23242337
Return the list of all IDs for active channels.");
23252338

2339+
static PyObject *
2340+
channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
2341+
{
2342+
static char *kwlist[] = {"cid", "send", NULL};
2343+
int64_t cid; /* Channel ID */
2344+
int send = 0; /* Send or receive end? */
2345+
int64_t id;
2346+
PyObject *ids, *id_obj;
2347+
PyInterpreterState *interp;
2348+
2349+
if (!PyArg_ParseTupleAndKeywords(
2350+
args, kwds, "O&$p:channel_list_interpreters",
2351+
kwlist, channel_id_converter, &cid, &send)) {
2352+
return NULL;
2353+
}
2354+
2355+
ids = PyList_New(0);
2356+
if (ids == NULL) {
2357+
goto except;
2358+
}
2359+
2360+
interp = PyInterpreterState_Head();
2361+
while (interp != NULL) {
2362+
id = PyInterpreterState_GetID(interp);
2363+
assert(id >= 0);
2364+
int res = _channel_is_associated(&_globals.channels, cid, id, send);
2365+
if (res < 0) {
2366+
goto except;
2367+
}
2368+
if (res) {
2369+
id_obj = _PyInterpreterState_GetIDObject(interp);
2370+
if (id_obj == NULL) {
2371+
goto except;
2372+
}
2373+
res = PyList_Insert(ids, 0, id_obj);
2374+
Py_DECREF(id_obj);
2375+
if (res < 0) {
2376+
goto except;
2377+
}
2378+
}
2379+
interp = PyInterpreterState_Next(interp);
2380+
}
2381+
2382+
goto finally;
2383+
2384+
except:
2385+
Py_XDECREF(ids);
2386+
ids = NULL;
2387+
2388+
finally:
2389+
return ids;
2390+
}
2391+
2392+
PyDoc_STRVAR(channel_list_interpreters_doc,
2393+
"channel_list_interpreters(cid, *, send) -> [id]\n\
2394+
\n\
2395+
Return the list of all interpreter IDs associated with an end of the channel.\n\
2396+
\n\
2397+
The 'send' argument should be a boolean indicating whether to use the send or\n\
2398+
receive end.");
2399+
2400+
23262401
static PyObject *
23272402
channel_send(PyObject *self, PyObject *args, PyObject *kwds)
23282403
{
@@ -2493,6 +2568,8 @@ static PyMethodDef module_functions[] = {
24932568
METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
24942569
{"channel_list_all", channel_list_all,
24952570
METH_NOARGS, channel_list_all_doc},
2571+
{"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters,
2572+
METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
24962573
{"channel_send", (PyCFunction)(void(*)(void))channel_send,
24972574
METH_VARARGS | METH_KEYWORDS, channel_send_doc},
24982575
{"channel_recv", (PyCFunction)(void(*)(void))channel_recv,

0 commit comments

Comments
 (0)