Skip to content

Locking issue with asyncio StreamWriter.drain() with SSL Transport #102792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vEpiphyte opened this issue Mar 17, 2023 · 5 comments
Closed

Locking issue with asyncio StreamWriter.drain() with SSL Transport #102792

vEpiphyte opened this issue Mar 17, 2023 · 5 comments
Labels
stdlib Python modules in the Lib dir topic-asyncio topic-SSL type-bug An unexpected behavior, bug, or error

Comments

@vEpiphyte
Copy link

Bug report

Hello,

I'm writing about a change in behavior for StreamReader/StreamWriter objects with SSL transports in Python 3.11.x. When assessing an application support for Python 3.11 ( vertexproject/synapse#3025 ) I ran across an issue with some of our application tests locking up. This was occurring with some tests which relied on behavior where we are creating an asyncio task to read data from a socket, and write that data back to a StreamWriter. This relay task was blocking a drain() call.

We wrap the StreamReader/StreamWriter objects into our own Link class, which provides some common configuration. In this, we do two notable thing: We set the StreamWriter transport write_buffer_limits high water mark to 0; and we ensure that all of our writer.write() calls have a corresponding drain() call. We do this to ensure that we've written the application buffer into the kernel socket buffer; as we've had issues in the past with the Python application buffer failing to actually flush data to the OS socket after drain() calls have returned ( this is described in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#bug-3-closing-time ).

The applications main use of this code is to relay data between the main process, and subprocess via a socket ( since a socket file descriptor can be shared between the two processes ). I've removed this multiprocess aspect from the reproduction, and I'm still seeing my relay task blocking. I have only been able to reproduce this with a server-side task which uses a SSL transport.

I've attached two files that can be used to recreate this issue:

  1. make_certs.sh - this is a bash file that uses openssl to create a CA certificate, server cert and a client certificate.
  2. ssllink_repro.py - this file contains the client and server code. The server is a simple echo server that uses a relay task as described earlier to run the echo'd data. The client just sends N bytes through a link and tries to read N bytes back.

To reproduce this, do the following:

  1. run ./make_certs.sh to make some ssl certificates.
  2. using Python 3.11.2, you can run the following: python ssllink_repro.py server to start the server.
  3. using Python 3.11.2, you can run the following: python ssllink_repro.py client to run the client. This client accepts a -n argument to control the number of bytes it sends. It defaults to 1024 bytes ( which always works ). Providing values > 1024 usually fails as the relay locks up. This behavior is non-deterministic - sometimes the client runs, but the majority of the time it fails.

This locking behavior seems to happen only when the server code is running python 3.11. When using Python 3.10, this locking is not observed.

Client Server May Lock
3.10.10 3.10.10 No
3.10.10 3.11.2 Yes
3.11.2 3.10.10 No
3.11.2 3.11.2 Yes

Things I have experimented with that seems to unblock the relay loop:

  1. Remove the calls to drain()

That seems dangerous and against the reccomendation for using the StreamWriter.write().

  1. Remove the call to write_buffer_limits(high=0).

This removes the blocking behavior in the example code; but re-introduces the problem of drain() leaving data in the python application buffer, which doens't always end up in the OS socket buffer. That leads to some race-condition behavior that our unit tests expose.

There is also a --no-ssl option on the client and server as well, which runs the code without an SSL link.

Reproduction Code

make_certs.sh

#!/bin/bash
set -e
set -x

rm -rf ./certs
mkdir ./certs

# Make and self sign our CA
openssl genrsa -out certs/rootCAKey.pem 4096
openssl req -x509 -sha256 -new -nodes -key ./certs/rootCAKey.pem -days 365 -out ./certs/rootCACert.pem

# Make a server certificate for localhost signed by the CA
openssl genrsa -out certs/server.pem 4096
openssl req -new -key ./certs/server.pem -out ./certs/server.csr -sha256 -subj '/CN=localhost'
openssl x509 -req -days 365 -in ./certs/server.csr -sha256 -CA ./certs/rootCACert.pem -CAkey ./certs/rootCAKey.pem -CAcreateserial -out ./certs/server.crt -extensions server

# Make a user certificate signed by the CA
openssl genrsa -out certs/user.pem 4096
openssl req -new -key ./certs/user.pem -out ./certs/user.csr -sha256 -subj '/CN=user'
openssl x509 -req -days 365 -in ./certs/user.csr -sha256 -CA ./certs/rootCACert.pem -CAkey ./certs/rootCAKey.pem -CAcreateserial -out ./certs/user.crt

ssllink_repro.py

import os
import ssl
import sys
import signal
import socket
import asyncio
import inspect
import binascii
import argparse

# Assorted constants
DEFPORT = 12345

CERT_DIRECTORY = './certs'

CA_CERT = CERT_DIRECTORY + '/rootCACert.pem'
SERVER_KEY = CERT_DIRECTORY + '/server.pem'
SERVER_CERT = CERT_DIRECTORY + '/server.crt'
USER_KEY = CERT_DIRECTORY + '/user.pem'
USER_CERT = CERT_DIRECTORY + '/user.crt'

def getServerSSLContext() -> ssl.SSLContext:
    """
    Get a server SSLContext object.
    """
    sslctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    sslctx.minimum_version = ssl.TLSVersion.TLSv1_2
    sslctx.options |= getattr(ssl, "OP_NO_RENEGOTIATION", 0)
    sslctx.load_cert_chain(certfile=SERVER_CERT, keyfile=SERVER_KEY)
    return sslctx

def getClientSSLContext() -> ssl.SSLContext:
    '''
    Get a client SSLContext object
    '''
    sslctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    sslctx.minimum_version = ssl.TLSVersion.TLSv1_2
    sslctx.load_verify_locations(CA_CERT)
    sslctx.check_hostname = False  # Disable cert / hostname checking
    return sslctx

def guid():
    return binascii.hexlify(os.urandom(16)).decode('utf8')

class Base:
    '''
    A generic container helper class for managing free-running coroutines,
    resource shutdown, and main() functionality
    '''
    def __init__(self):
        self._active_tasks = set()
        self.isfini = False
        self._fini_funcs = []
        self.finievt = None

    def onfini(self, func):
        self._fini_funcs.append(func)

    async def fini(self):

        if self.isfini:
            return

        self.isfini = True

        await self._kill_active_tasks()

        for func in self._fini_funcs:
            try:
                valu = func()
                if inspect.iscoroutine(valu):
                    await valu
            except asyncio.CancelledError:  # pragma: no cover  TODO:  remove once >= py 3.8 only
                raise
            except Exception:
                print(f'{self} - fini function failed: {func}')

        self._fini_funcs.clear()

        fevt = self.finievt

        if fevt is not None:
            fevt.set()

        return 0

    async def _kill_active_tasks(self):

        if not self._active_tasks:
            return

        for task in self._active_tasks.copy():

            task.cancel()
            try:
                await task
            except (asyncio.CancelledError, Exception):
                # The taskDone callback will emit the exception.  No need to repeat
                pass

    def schedCoro(self, coro):
        '''
        Schedules a free-running coroutine to run on this base's event loop.  Kills the coroutine if Base is fini'd.
        '''

        task = asyncio.create_task(coro)

        def taskDone(task):
            self._active_tasks.remove(task)
            try:
                if not task.done():
                    task.result()
            except asyncio.CancelledError:  # pragma: no cover  TODO:  remove once >= py 3.8 only
                pass
            except Exception as e:
                print('Task %s scheduled through Base.schedCoro raised exception %s', task, e)

        self._active_tasks.add(task)
        task.add_done_callback(taskDone)

        return task

    async def waitfini(self, timeout=None):
        '''
        Wait for the base to fini()
        '''

        if self.isfini:
            return True

        if self.finievt is None:
            self.finievt = asyncio.Event()

        if timeout is None:
            await self.finievt.wait()
            return True

        try:
            await asyncio.wait_for(self.finievt.wait(), timeout)
        except asyncio.TimeoutError:
            return False
        return True

    async def addSignalHandlers(self):
        '''
        Register SIGTERM/SIGINT signal handlers with the ioloop to fini this object.
        '''

        def sigterm():
            print('Caught SIGTERM, shutting down.')
            asyncio.create_task(self.fini())

        def sigint():
            print('Caught SIGINT, shutting down.')
            asyncio.create_task(self.fini())

        loop = asyncio.get_running_loop()
        loop.add_signal_handler(signal.SIGINT, sigint)
        loop.add_signal_handler(signal.SIGTERM, sigterm)

    async def main(self):
        '''
        Helper function to setup signal handlers for this base as the main object.
        '''
        await self.addSignalHandlers()
        return await self.waitfini()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc, cls, tb):
        await self.fini()

class Link(Base):
    '''
    Helper class to wrap a StreamReader / StreamWriter pair.
    '''
    def __init__(self,
                 reader: asyncio.StreamReader,
                 writer: asyncio.StreamWriter,
                 info: dict =None,
                 ):
        super().__init__()

        if info is None:
            info = {}

        self.iden = guid()

        # https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#bug-3-closing-time
        # Set this to ensure that any link objects send() has writen the
        # python userspace buffer entirely to the kernel before a call
        # to drain() returns.
        writer.transport.set_write_buffer_limits(high=0)

        self.reader = reader
        self.writer = writer
        self.sock = self.writer.get_extra_info('socket')

        if not info.get('unix'):
            # disable nagle ( to minimize latency for small xmit ), enable TCP keepalives, keepidle
            self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
            if hasattr(socket, 'TCP_KEEPIDLE'):
                self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 3)
                self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 3)
                self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)

        self.info = info

        async def fini():
            self.writer.close()
            try:
                await self.writer.wait_closed()
            except (BrokenPipeError, ConnectionResetError) as e:
                print('Link error waiting on close: %s', str(e))

        self.onfini(fini)

    async def send(self, byts):
        self.writer.write(byts)
        await self.writer.drain()

    async def recv(self, size):
        byts = await self.reader.read(size)
        return byts

    async def getSpawnInfo(self):
        '''
        Called on a server side link.

        This creates an asyncio Task which relays data from a local socket
        to the writer stream. The normal use case for this is passing the
        socket to a subprocess since that is a pickleable object. That allows
        a subprocess to write data back up to the parent stream object.
        '''
        info = {'unix': True}
        link0, sock = await linksock()
        link0.onfini(sock.close)

        async def relay(link):
            async with link:
                while True:
                    byts = await link.recv(1024)
                    print(f'Relay: read: {len(byts)} bytes')
                    if not byts:
                        break
                    await self.send(byts)  # This tends to block indefinitely....
                    print(f'Relay: sent: {len(byts)} bytes')

        self.schedCoro(relay(link0))

        return {
            'info': info,
            'sock': sock,
        }

# Helper functions for listening
async def listen(host, port, onlink, ssl=None) -> asyncio.AbstractServer:
    '''
    Async listener. This creates a server side Link for any incoming
    connection, which calls the onlink() callback with.
    '''
    async def onconn(reader, writer):
        info = {'tls': bool(ssl)}  # Mark the server side link as tls link when ssl context is provided
        link = Link(reader, writer, info=info)
        link.schedCoro(onlink(link))

    server = await asyncio.start_server(onconn, host=host, port=port, ssl=ssl)
    return server

async def linksock():
    '''
    Connect a Link, socket pair.
    '''
    sock0, sock1 = socket.socketpair()
    reader, writer = await asyncio.open_connection(sock=sock0)
    link0 = Link(reader, writer, info={'unix': True})
    return link0, sock1

async def fromspawn(spawninfo):
    '''
    Create a Link from spawninfo.
    '''
    sock = spawninfo.get('sock')
    info = spawninfo.get('info', {})
    info['spawn'] = True
    reader, writer = await asyncio.open_connection(sock=sock)
    return Link(reader, writer, info=info)

# Simple Echoserver implementation
class Server(Base):
    def __init__(self):
        super().__init__()
        self.server = None  # type: asyncio.AbstractServer

    async def listen(self, port: int, ssl: bool):
        srv_sslctx = None
        if ssl:
            srv_sslctx = getServerSSLContext()
        self.server = await listen('127.0.0.1', port, onlink=self.echoServer, ssl=srv_sslctx)
        _, lport = self.server.sockets[0].getsockname()
        print(f'Server listening on port: {lport}')

    async def echoServer(self, link: Link):
        '''
        A simple echo server callback.
        The data is read from the original StreamReader, and written
        written back to the spawned link ( which pumps the data through
        a local socket pair ) which then sends data to the StreamWriter.
        '''
        spawn_info = await link.getSpawnInfo()
        spawn_link = await fromspawn(spawn_info)
        while not link.isfini:
            print('echoServer: recv()')
            data = await link.recv(1024)
            if data:
                print(f'echoServer: Read {len(data)} byts')
            else:
                print(f'echoServer: Read got {data!r}')
                break
            await spawn_link.send(data)
            await link.waitfini(timeout=0.1)
        print('echoServer: left loop ')
        await spawn_link.fini()
        print('echoServer: done!')

async def main_server(opts: argparse.Namespace):
    print('Starting server')
    async with Server() as server:
        await server.listen(opts.port, ssl=opts.ssl)
        await server.main()
    print('Server done')
    return 0

async def main_client(opts: argparse.Namespace):

    sslctx = None
    if opts.ssl:
        sslctx = getClientSSLContext()

    reader, writer = await asyncio.open_connection('127.0.0.1', opts.port, ssl=sslctx)
    link = Link(reader, writer, info={'ssl': sslctx})

    print(f'Connected link {link=} {link.iden}')

    N = opts.num_bytes

    print(f'Sending {N=} bytes')
    await link.send(b'V' * N)

    buf = b''
    while len(buf) < N:
        print(f'Read: {len(buf)=}')
        dsize = 128
        rem = N - len(buf)
        if rem < dsize:
            dsize = rem
        print(f'Reading {dsize} bytes..')
        rx = await link.recv(dsize)
        if not rx:
            print('Got {rx!r}, breaking loop')
            break
        buf = buf + rx

    print(f'total len: {len(buf)}, expected {N}')

    assert buf == b'V' * N, "Buffer mismatch :("

    await link.fini()

    return 0

def getPars() -> argparse.ArgumentParser:
    pars = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    pars.add_argument('-p', '--port', type=int, help='port to listen / connect to', default=DEFPORT)
    pars.add_argument('--no-ssl', default=True, dest='ssl', help='disable ssl', action='store_false')
    subp = pars.add_subparsers(title='commands', dest='command')
    do_server = subp.add_parser('server', help='run the echo server code.')
    do_server.set_defaults(func=main_server)
    do_client = subp.add_parser('client', help='run the client code.')
    do_client.set_defaults(func=main_client)
    do_client.add_argument('-n', '--num-bytes', help='Number of bytes to send to the echo server', type=int,
                           default=1024)
    return pars

async def main(argv):
    pars = getPars()
    opts = pars.parse_args(argv)
    return await opts.func(opts)

if __name__ == '__main__':
    asyncio.run(main(sys.argv[1:]))

Expected output for a successful run

Server:

$ ~/.pyenv/versions/3.11.2/bin/python ssllink_repro.py server
Starting server
Server listening on port: 12345
echoServer: recv()
echoServer: Read 1024 byts
Relay: read: 1024 bytes
Relay: sent: 1024 bytes
echoServer: recv()
echoServer: Read got b''
echoServer: left loop 
echoServer: done!
Relay: read: 0 bytes

client

$ ~/.pyenv/versions/3.11.2/bin/python ssllink_repro.py client -n 1024
Connected link link=<__main__.Link object at 0x7f9d7e7f34d0> 145e05778a609e09b67899d043054572
Sending N=1024 bytes
Read: len(buf)=0
Reading 128 bytes..
Read: len(buf)=128
Reading 128 bytes..
Read: len(buf)=256
Reading 128 bytes..
Read: len(buf)=384
Reading 128 bytes..
Read: len(buf)=512
Reading 128 bytes..
Read: len(buf)=640
Reading 128 bytes..
Read: len(buf)=768
Reading 128 bytes..
Read: len(buf)=896
Reading 128 bytes..
total len: 1024, expected 1024

Expected failure case

server

$ ~/.pyenv/versions/3.11.2/bin/python ssllink_repro.py server
Starting server
Server listening on port: 12345
echoServer: recv()
echoServer: Read 1024 byts
Relay: read: 1024 bytes
echoServer: recv()
echoServer: Read 1 byts
echoServer: recv()  # This is where the hang may happen

client

$ ~/.pyenv/versions/3.11.2/bin/python ssllink_repro.py client -n 1025
Connected link link=<__main__.Link object at 0x7f4c231f3910> de2f3f6ec13b910ac4f5624df2e80687
Sending N=1025 bytes
Read: len(buf)=0
Reading 128 bytes..
Read: len(buf)=128
Reading 128 bytes..
Read: len(buf)=256
Reading 128 bytes..
Read: len(buf)=384
Reading 128 bytes..
Read: len(buf)=512
Reading 128 bytes..
Read: len(buf)=640
Reading 128 bytes..
Read: len(buf)=768
Reading 128 bytes..
Read: len(buf)=896
Reading 128 bytes..
Read: len(buf)=1024
Reading 1 bytes..
^CTraceback ( ...

Your environment

  • CPython versions tested on:

CPython 3.10.6, 3.10.10, 3.11.1, 3.11.2

  • Operating system and architecture:
    Ubuntu 22.04, Debian Bookworm, x86_64
@vEpiphyte vEpiphyte added the type-bug An unexpected behavior, bug, or error label Mar 17, 2023
@vEpiphyte
Copy link
Author

This may be related since its a pretty big change to the ssl support #88177

@arhadthedev arhadthedev added stdlib Python modules in the Lib dir topic-SSL labels Mar 22, 2023
@vEpiphyte
Copy link
Author

@arhadthedev maybe expert-asyncio would apply here too?

@kumaraditya303
Copy link
Contributor

You should try to reduce the code, the current reproducer code is very large and I won't be debugging that much code.

@gvanrossum
Copy link
Member

@vEpiphyte I agree with Kumar that the reproducer is too large to work with. Large reproducers are sometimes a sign that there is a subtle bug in the user code.

@kumaraditya303
Copy link
Contributor

Please create a new issue with a minimal reproducer, the code in its current form is impossible to debug and likely the bug is in the user code.

@kumaraditya303 kumaraditya303 closed this as not planned Won't fix, can't repro, duplicate, stale Aug 2, 2023
@github-project-automation github-project-automation bot moved this from Todo to Done in asyncio Aug 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stdlib Python modules in the Lib dir topic-asyncio topic-SSL type-bug An unexpected behavior, bug, or error
Projects
Status: Done
Development

No branches or pull requests

4 participants