Skip to content

gh-76785: Expand How Interpreter Channels Handle Interpreter Finalization #121805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions Lib/test/support/interpreters/_crossinterp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Common code between queues and channels."""


class ItemInterpreterDestroyed(Exception):
"""Raised when trying to get an item whose interpreter was destroyed."""


class classonly:
"""A non-data descriptor that makes a value only visible on the class.

This is like the "classmethod" builtin, but does not show up on
instances of the class. It may be used as a decorator.
"""

def __init__(self, value):
self.value = value
self.getter = classmethod(value).__get__
self.name = None

def __set_name__(self, cls, name):
if self.name is not None:
raise TypeError('already used')
self.name = name

def __get__(self, obj, cls):
if obj is not None:
raise AttributeError(self.name)
# called on the class
return self.getter(None, cls)


class UnboundItem:
"""Represents a cross-interpreter item no longer bound to an interpreter.

An item is unbound when the interpreter that added it to the
cross-interpreter container is destroyed.
"""

__slots__ = ()

@classonly
def singleton(cls, kind, module, name='UNBOUND'):
doc = cls.__doc__.replace('cross-interpreter container', kind)
doc = doc.replace('cross-interpreter', kind)
subclass = type(
f'Unbound{kind.capitalize()}Item',
(cls,),
dict(
_MODULE=module,
_NAME=name,
__doc__=doc,
),
)
return object.__new__(subclass)

_MODULE = __name__
_NAME = 'UNBOUND'

def __new__(cls):
raise Exception(f'use {cls._MODULE}.{cls._NAME}')

def __repr__(self):
return f'{self._MODULE}.{self._NAME}'
# return f'interpreters.queues.UNBOUND'


UNBOUND = object.__new__(UnboundItem)
UNBOUND_ERROR = object()
UNBOUND_REMOVE = object()

_UNBOUND_CONSTANT_TO_FLAG = {
UNBOUND_REMOVE: 1,
UNBOUND_ERROR: 2,
UNBOUND: 3,
}
_UNBOUND_FLAG_TO_CONSTANT = {v: k
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}


def serialize_unbound(unbound):
op = unbound
try:
flag = _UNBOUND_CONSTANT_TO_FLAG[op]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
return flag,


def resolve_unbound(flag, exctype_destroyed):
try:
op = _UNBOUND_FLAG_TO_CONSTANT[flag]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
if op is UNBOUND_REMOVE:
# "remove" not possible here
raise NotImplementedError
elif op is UNBOUND_ERROR:
raise exctype_destroyed("item's original interpreter destroyed")
elif op is UNBOUND:
return UNBOUND
else:
raise NotImplementedError(repr(op))
110 changes: 93 additions & 17 deletions Lib/test/support/interpreters/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,68 @@

import time
import _interpchannels as _channels
from . import _crossinterp

# aliases:
from _interpchannels import (
ChannelError, ChannelNotFoundError, ChannelClosedError,
ChannelEmptyError, ChannelNotEmptyError,
)
from ._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
)


__all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
'create', 'list_all',
'SendChannel', 'RecvChannel',
'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError',
'ItemInterpreterDestroyed',
]


def create():
class ItemInterpreterDestroyed(ChannelError,
_crossinterp.ItemInterpreterDestroyed):
"""Raised from get() and get_nowait()."""


UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)


def _serialize_unbound(unbound):
if unbound is UNBOUND:
unbound = _crossinterp.UNBOUND
return _crossinterp.serialize_unbound(unbound)


def _resolve_unbound(flag):
resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
if resolved is _crossinterp.UNBOUND:
resolved = UNBOUND
return resolved


def create(*, unbounditems=UNBOUND):
"""Return (recv, send) for a new cross-interpreter channel.

The channel may be used to pass data safely between interpreters.

"unbounditems" sets the default for the send end of the channel.
See SendChannel.send() for supported values. The default value
is UNBOUND, which replaces the unbound item when received.
"""
cid = _channels.create()
recv, send = RecvChannel(cid), SendChannel(cid)
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
cid = _channels.create(unboundop)
recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
return recv, send


def list_all():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid))
for cid in _channels.list_all()]
return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
for cid, unbound in _channels.list_all()]


class _ChannelEnd:
Expand Down Expand Up @@ -106,12 +139,15 @@ def recv(self, timeout=None, *,
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
obj = _channels.recv(self._id, _sentinel)
obj, unboundop = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
if timeout is not None and time.time() >= end:
raise TimeoutError
obj = _channels.recv(self._id, _sentinel)
obj, unboundop = _channels.recv(self._id, _sentinel)
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
return obj

def recv_nowait(self, default=_NOT_SET):
Expand All @@ -122,9 +158,13 @@ def recv_nowait(self, default=_NOT_SET):
is the same as recv().
"""
if default is _NOT_SET:
return _channels.recv(self._id)
obj, unboundop = _channels.recv(self._id)
else:
return _channels.recv(self._id, default)
obj, unboundop = _channels.recv(self._id, default)
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
return obj

def close(self):
_channels.close(self._id, recv=True)
Expand All @@ -135,43 +175,79 @@ class SendChannel(_ChannelEnd):

_end = 'send'

def __new__(cls, cid, *, _unbound=None):
if _unbound is None:
try:
op = _channels.get_channel_defaults(cid)
_unbound = (op,)
except ChannelNotFoundError:
_unbound = _serialize_unbound(UNBOUND)
self = super().__new__(cls, cid)
self._unbound = _unbound
return self

@property
def is_closed(self):
info = self._info
return info.closed or info.closing

def send(self, obj, timeout=None):
def send(self, obj, timeout=None, *,
unbound=None,
):
"""Send the object (i.e. its data) to the channel's receiving end.

This blocks until the object is received.
"""
_channels.send(self._id, obj, timeout=timeout, blocking=True)
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
_channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)

def send_nowait(self, obj):
def send_nowait(self, obj, *,
unbound=None,
):
"""Send the object to the channel's receiving end.

If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj, blocking=False)
return _channels.send(self._id, obj, unboundop, blocking=False)

def send_buffer(self, obj, timeout=None):
def send_buffer(self, obj, timeout=None, *,
unbound=None,
):
"""Send the object's buffer to the channel's receiving end.

This blocks until the object is received.
"""
_channels.send_buffer(self._id, obj, timeout=timeout, blocking=True)
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
_channels.send_buffer(self._id, obj, unboundop,
timeout=timeout, blocking=True)

def send_buffer_nowait(self, obj):
def send_buffer_nowait(self, obj, *,
unbound=None,
):
"""Send the object's buffer to the channel's receiving end.

If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
return _channels.send_buffer(self._id, obj, blocking=False)
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
return _channels.send_buffer(self._id, obj, unboundop, blocking=False)

def close(self):
_channels.close(self._id, send=True)
Expand Down
60 changes: 14 additions & 46 deletions Lib/test/support/interpreters/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
import time
import weakref
import _interpqueues as _queues
from . import _crossinterp

# aliases:
from _interpqueues import (
QueueError, QueueNotFoundError,
)
from ._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
)

__all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
Expand All @@ -34,65 +38,29 @@ class QueueFull(QueueError, queue.Full):
"""


class ItemInterpreterDestroyed(QueueError):
class ItemInterpreterDestroyed(QueueError,
_crossinterp.ItemInterpreterDestroyed):
"""Raised from get() and get_nowait()."""


_SHARED_ONLY = 0
_PICKLED = 1


class UnboundItem:
"""Represents a Queue item no longer bound to an interpreter.

An item is unbound when the interpreter that added it to the queue
is destroyed.
"""

__slots__ = ()

def __new__(cls):
return UNBOUND

def __repr__(self):
return f'interpreters.queues.UNBOUND'


UNBOUND = object.__new__(UnboundItem)
UNBOUND_ERROR = object()
UNBOUND_REMOVE = object()
UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)

_UNBOUND_CONSTANT_TO_FLAG = {
UNBOUND_REMOVE: 1,
UNBOUND_ERROR: 2,
UNBOUND: 3,
}
_UNBOUND_FLAG_TO_CONSTANT = {v: k
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}

def _serialize_unbound(unbound):
op = unbound
try:
flag = _UNBOUND_CONSTANT_TO_FLAG[op]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
return flag,
if unbound is UNBOUND:
unbound = _crossinterp.UNBOUND
return _crossinterp.serialize_unbound(unbound)


def _resolve_unbound(flag):
try:
op = _UNBOUND_FLAG_TO_CONSTANT[flag]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
if op is UNBOUND_REMOVE:
# "remove" not possible here
raise NotImplementedError
elif op is UNBOUND_ERROR:
raise ItemInterpreterDestroyed("item's original interpreter destroyed")
elif op is UNBOUND:
return UNBOUND
else:
raise NotImplementedError(repr(op))
resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
if resolved is _crossinterp.UNBOUND:
resolved = UNBOUND
return resolved


def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
Expand Down
Loading
Loading