diff --git a/Doc/library/asyncio-tutorial/async-functions.rst b/Doc/library/asyncio-tutorial/async-functions.rst new file mode 100644 index 00000000000000..259de714e7174e --- /dev/null +++ b/Doc/library/asyncio-tutorial/async-functions.rst @@ -0,0 +1,485 @@ +Async Functions, And Other Syntax Features +========================================== + +Regular Python functions are created with the keyword ``def``, +and look like this: + +.. code-block:: python3 + + def f(x, y): + print(x + y) + + # Evaluate the function + f(1, 2) + +Async functions are different in two respects: + +.. code-block:: python3 + + import asyncio + + async def f(x, y): + print(x + y) + + # Execute the *async* function above + asyncio.run(f(1, 2)) + +The first difference is that the function declaration is spelled +``async def``. The second difference is that async functions cannot be +executed by simply evaluating them. Instead, we use the ``run()`` function +from the ``asyncio`` module. + +Executing Async Functions +------------------------- + +The ``run`` function is only good for executing an async function +from "synchronous" code; and this is usually only used to execute +a "main" async function, from which others can be called in a simpler +way. You will not see ``asyncio.run()`` being called from inside an +``async def`` function. + +That means the following: + +.. code-block:: python3 + + import asyncio + + async def f(x, y): + print(x + y) + + async def main(): + await f(1, 2) + + asyncio.run(main()) + + +The execution of the the async function ``main()`` is performed +with ``run()``, but once you're inside an ``async def`` function, then +calling another async function is done with the ``await`` keyword. + +Ordinary (Sync) Functions Cannot Await Async Functions +------------------------------------------------------ + +If you're starting inside a normal function, you cannot call an +async function using the ``await`` keyword. Doing so will produce +a syntax error: + +.. code-block:: python3 + + >>> async def f(x, y): + ... pass + ... + >>> def g(): + ... await f(1, 2) + ... + File "", line 2 + SyntaxError: 'await' outside async function + +To fix this, either ``asyncio.run()`` must be used, or the function +``g`` must be changed to be an ``async def`` function. The ``run()`` +function is tricky: you can't call it in a nested way, because +internally it creates an event loop and you cannot have two event +loops running at the same time (in the same thread). So the following +is also illegal: + +.. code-block:: python3 + + >>> import asyncio + >>> async def f(x, y): + ... pass + ... + >>> def g(): + ... asyncio.run(f(1, 2)) + ... + >>> async def main(): + ... g() + ... + >>> asyncio.run(main()) + Traceback (most recent call last): + + + + File "G:\Programs\Python37\lib\asyncio\runners.py", line 34, in run + "asyncio.run() cannot be called from a running event loop") + RuntimeError: asyncio.run() cannot be called from a running event loop + +So ``asyncio.run()`` is really intended only for launching your *first* +async function; after that, every other async function should be +executed using the ``await`` keyword, and the task-based strategies which +we've not yet discussed. + +Async Functions Can Call Sync Functions +--------------------------------------- + +The inverse works perfectly fine: calling ordinary Python functions +from inside ``async def`` functions. Here's an example: + +.. code-block:: python3 + + >>> import asyncio + >>> import time + >>> async def f(): + ... print(time.ctime()) + ... + >>> asyncio.run(f()) + Sun Nov 4 15:04:45 2018 + +One of the benefits of ``asyncio`` is that you can see at a glance +which code inside a function is subject to a context switch. In the +following code example, we have two kinds of ``sleep()``: a blocking +version from the ``time`` module, and an async version from ``asyncio``: + +.. code-block:: python3 + + >>> import time, asyncio + >>> def func1(): + ... time.sleep(0) + ... + >>> async def func2(): + ... await asyncio.sleep(0) + ... + >>> async def main(): + ... await func2() # (1) + ... func1() + ... func1() + ... func1() + ... func1() + ... func1() + ... func1() + ... await func2() # (2) + ... + >>> asyncio.run(main()) + +At (1), the underlying event loop is given the opportunity to switch from +``main()`` to any other tasks that are waiting to run, and after line (1) +returns, a series of calls to the sync function ``func1()`` occurs before +the next allowable context switch on the event loop at (2). While the +series of sync calls are running, *no other code* will execute in the +current thread, until you get to the next ``await``. This guarantee applies +a dramatic simplifying effect on your code, because now you can modify +data shared between multiple async tasks without fear of introducing +a race condition. + +.. note:: In programs using ``asyncio``, you should never use ``time.sleep()``. + The correct way to "sleep" is with ``await asyncio.sleep()``. This is + because ``time.sleep()`` is a *blocking* call that will prevent the + ``asyncio`` event loop from processing events. The only safe way to + use ``time.sleep()`` is within a thread, or a subprocess, or with a + value of zero! + +Accurate Terminology For Async Functions +---------------------------------------- + +So far in this tutorial we've been intentionally sloppy with how +we refer to things like *async functions* or *async def* functions, +and *normal Python functions* and so on. It's time to get more +specific about what to call each of these things. It's important +because we need to be able to understand the difference between +a **coroutine** and a **coroutine function**, and a few other things +still to be introduced. + +So let's do that now, using the ``inspect`` module. First let's look +at the two kinds of functions: + +.. code-block:: python3 + + >>> import inspect + >>> def f1(): + ... pass + ... + >>> inspect.isfunction(f1) + True + >>> inspect.iscoroutinefunction(f1) + False + +This is an ordinary Python function, and the ``inspect`` module +confirms that, but we've included another test to see if the function +is a *coroutine function*, which is ``False`` as expected. Let's do +the same on an ``async def`` function: + +.. code-block:: python3 + + >>> async def f2(): + ... pass + ... + >>> inspect.isfunction(f2) + True + >>> inspect.iscoroutinefunction(f2) + True + +According to Python, ``f2`` is also considered to be a function, but +more specifically, it is a *coroutine function*, and this is the +specific name we will be using for *async def* functions. + +Why does it matter? Well, when you evaluate a coroutine function, it'll +return something: + +.. code-block:: python3 + + >>> async def f2(): + ... pass + ... + >>> result = f2() + >>> type(result) + + >>> inspect.iscoroutine(result) + True + +The point we're trying to make here is that an *async def* function +is not yet a coroutine, but rather only a *coroutine function*; only +when you *evaluate* the coroutine function, will a coroutine +object be returned. The ``await`` keyword, which we showed in +previous examples, is acting on *coroutine* objects, not +the coroutine functions that create them. + +This can be made clear in the following example: + +.. code-block:: python3 + + >>> async def f3(): + ... return 123 + ... + >>> async def main(): + ... obj = f3() + ... result = await obj + ... print(result) + ... + >>> asyncio.run(main()) + 123 + +In the code above, the value of ``obj`` is *not* ``123`` when +coroutine function ``f3`` is evaluated. Instead, ``obj`` is a +*coroutine* object, and it will only get executed when the +``await`` keyword is used. Of course, you don't have to write +code like this where you first get the coroutine and then +use ``await`` on the object; simply evaluate the +coroutine function and use ``await`` all in the same line. + +An Aside: Similarity To Generator Functions +------------------------------------------- + +This has nothing to do with asyncio, but you might be interested +to see how this difference between a function and a +coroutine function is quite similar to the difference between +functions and generator functions: + +.. code-block:: python3 + + >>> def g(): + ... yield 123 + ... + >>> inspect.isfunction(g) + True + >>> inspect.isgeneratorfunction(g) + True + +If a function uses the ``yield`` keyword anywhere inside the function +body, that function becomes a *generator function*, very similar to +how a function declared with ``async def`` becomes a +*coroutine function*. And, completing the comparison, if you +evaluate a generator function, a *generator* object is returned, similar +to how a coroutine function, when evaluated, returns a coroutine +object: + +.. code-block:: python3 + + >>> def g(): + ... yield 123 + ... + >>> obj = g() + >>> type(obj) + + >>> inspect.isgenerator(obj) + True + +Again, this doesn't have anything to do with asyncio, but +the loose similarity between generator functions and +coroutine functions might give you a useful framework for understanding +the new coroutine functions. + +Terminology For Async Generators +-------------------------------- + +The previous section was useful for giving you a basic framework +for understanding how coroutines and generator have similar +characteristics. Here, we show how we can also make asynchronous +generator functions! It sounds much more complicated than it +really is, so let's jump directly to some examples: + +.. code-block:: python3 + + >>> import asyncio + >>> async def ag(): + ... yield 1 + ... yield 2 + ... yield 3 + ... + >>> async def main(): + ... async for value in ag(): + ... print(value) + ... + >>> asyncio.run(main()) + 1 + 2 + 3 + +If you pretend for a second that the word "async" is temporarily +removed from the code above, the behaviour of the generator +should look very familiar to you (assuming you already know how +Python's generators work). The generator function yields out +values and these values are obtained by iterating over the +generator. + +The difference now is of course the presence of those "async" +words. The code sample doesn't show a good reason *why* an async +generator is being used here: that comes a bit further down. +All we want to discuss here is what these kinds of +functions and objects should be called. + +Let's have a close look at the function `ag`: + +.. code-block:: python3 + + >>> async def ag(): + ... yield 1 + ... + >>> inspect.isfunction(ag) + True + + # Ok, so it's a function... + + >>> inspect.iscoroutinefunction(ag) + False + + # ...but it's not a coroutine function, despite "async def" + + >>> inspect.isasyncgenfunction(ag) + True + + # Aha, so this is an "async generator function"... + + >>> inspect.isasyncgen(ag()) + True + + # ...and when evaluated, it returns an "async generator" + +Hopefully you're comfortable now with how async generators look. Let's +briefly discuss why you might want to use them. In the examples given +above, there was no good reason to make our generator an ``async def`` +function; an ordinary generator function would have been fine. Async +generators are useful when you need to ``await`` on another coroutine +either before, or after, each ``yield``. + +One example might be receiving network data from a ``StreamReader`` +instance: + +.. code-block:: python3 + + async def new_messages(reader: StreamReader): + while True: + data = await reader.read(1024) + yield data + +This pattern makes for a very clean consumer of the received data: + +.. code-block:: python3 + + async def get_data(): + reader, writer = await asyncio.open_connection(...) + async for data in new_messages(reader): + do_something_with(data) + +Async generators allow you to improve your abstractions: for +example, you can go one level higher and handle reconnection +while still propagating received data out to a consumer: + +.. code-block:: python3 + + async def new_messages(reader: StreamReader): + while True: + data = await reader.read(1024) + yield data # (1) + + async def get_data(host, port): + while True: + try: + reader, writer = await asyncio.open_connection(host, port) + async for data in new_messages(reader): + if not data: + continue + yield data # (2) + except OSError: + continue + except asyncio.CancelledError: + return + + async def main(host, port): + async for data in get_data(host, port): + do_something_with(data) # (3) + + if __name__ == '__main__': + asyncio.run(main(host, port)) + +The async generator at ``(1)`` provides results back to an intermediate +async generator at ``(2)``, which does *the same thing* but also handles +reconnection events in its local scope. Finally, at ``(3)``, The async +iterator elegantly produces the received data, and internal reconnection +events (and any other lower level state management) are hidden from the +high-level logic of the application. + +Async Context Managers +---------------------- + +In the previous section we showed how async generators can be driven +with the new ``async for`` syntax. There is also a version of +a *context manager* that can be used with ``asyncio``. + +.. note:: There is a common misconception that one **must** use + async context managers in ``asyncio`` applications. This is not the + case. Async context managers are needed only if you need to ``await`` + a coroutine in the *enter* or *exit* parts of the context manager. + You are *not* required to use an async context manager if there are ``await`` + statements inside only the *body* of the context manager. + +Just as the ``contextlib`` library provides the ``@contextmanager`` +decorator to let us easily make context managers, so does the +``@asynccontextmanager`` let us do that for async context managers. + +Imagine a very simple example where we might want to have a +connection closed during cancellation, and how about adding some +logging around the connection lifecycle events: + +.. code-block:: python3 + + import asyncio + import logging + from contextlib import asynccontextmanager + + @asynccontextmanager + async def open_conn_logged(*args, **kwargs): + logging.info('Opening connection...') + reader, writer = await asyncio.open_connection(*args, **kwargs) + logging.info('Connection opened.') + try: + yield reader, writer # (1) + finally: + logging.info('Cleaning up connection...') + if not writer.is_closing(): + await writer.close() + logging.info('Connection closed.') + + async def echo(): + async with open_conn_logged('localhost', 8000) as (reader, writer): + data = await reader.read(1024) + await writer.write(data) + + if __name__ == '__main__': + asyncio.run(echo()) + +At line marked ``(1)``, data is provided to the context inside the ``echo()`` +function. You can see how the ``async with`` keywords are required to +work with the async context manager. + +Async context managers are likely to appear in projects using +``asyncio`` because the need to safely close or dispose of resources is +very common in network programming. diff --git a/Doc/library/asyncio-tutorial/asyncio-cookbook.rst b/Doc/library/asyncio-tutorial/asyncio-cookbook.rst new file mode 100644 index 00000000000000..935f672d34edf7 --- /dev/null +++ b/Doc/library/asyncio-tutorial/asyncio-cookbook.rst @@ -0,0 +1,258 @@ +Asyncio Cookbook +================ + +Let's look at a few common situations that will come up in your +``asyncio`` programs, and how best to tackle them. + +[There's a lot more we can do if we're able to refer to +3rd party packages here. We could show a websockets example, +and other things.] + +Using A Queue To Move Data Between Long-Lived Tasks +--------------------------------------------------- + +TODO + +Using A Queue To Control A Pool of Resources +-------------------------------------------- + +- show example with a pool of workers +- show example with a connection pool + +Best Practices For Timeouts +--------------------------- + +- start with ``asyncio.wait_for()`` +- also look at ``asyncio.wait()``, and what to do if not all tasks + are finished when the timeout happens. Also look at the different + termination conditions of ``asyncio.wait()`` + +How To Handle Cancellation +-------------------------- + +- app shutdown +- how to handle CancelledError and then close sockets +- also, when waiting in a loop on ``await queue.get()`` is it better to + handle CancelledError, or use the idiom of putting ``None`` on the + queue? (``None`` would be better because it ensures the contents of the + queue get processed first, but I don't think we can prevent + CancelledError from getting raised so it must be handled anyway. I + can make an example to explain better.) + +Keeping Track Of Many Connections +--------------------------------- + +- example using a global dict +- show how a weakref container can simplify cleanup +- show how to access connection info e.g. ``get_extra_info()`` +- this kind of thing: + +.. code-block:: python3 + + import asyncio + from weakref import WeakValueDictionary + + CONNECTIONS = WeakValueDictionary() + + async def client_connected_cb(reader, writer): + + addr = writer.get_extra_info('peername') + print(f'New connection from {addr}') + + # Every new connection gets added to the global dict. + # Actually, *writer* objects get added. This makes + # it easy to look up a connection and immediately + # send data to it from other async functions. + CONNECTIONS[addr] = writer + ... + + async def main(): + server = await asyncio.start_server( + client_connected_cb=client_connected_db, + host='localhost', + port='9011', + ) + async with server: + await server.serve_forever() + + if __name__ == '__main__': + asyncio.run(main()) + +Handling Reconnection +--------------------- + +- Example is a client app that needs to reconnect to a server + if the server goes down, restarts, or there is a network partition + or other general kind of error + +Async File I/O +-------------- + +- mention that disk I/O is still IO +- Python file operations like ``open()``, etc. are blocking +- I think all we can do here is refer to the 3rd party *aiofiles* + package? +- I suppose we could show how to do file IO in thread, driven + by ``run_in_executor()``... + +Wait For Async Results In Parallel +---------------------------------- + +TODO + +- show an example with gather +- show another example with wait +- maybe throw in an example with gather that also uses + "wait_for" for timeout +- either include "return_exceptions" here or in a different question + +.. code-block:: python3 + + import asyncio + + async def slow_sum(x, y): + result = x + y + await asyncio.sleep(result) + return result + + async def main(): + results = await asyncio.gather( + slow_sum(1, 1), + slow_sum(2, 2), + ) + print(results) # "[2, 4]" + + if __name__ == '__main__': + asyncio.run(main()) + +- we should also include a brief discussion of "when to use asyncio.gather and + when to use asyncio.wait" + +Secure Client-Server Networking +------------------------------- + +- built-in support for secure sockets +- you have to make your own secret key, and server certificate + +.. code-block:: bash + :caption: Create a new private key and certificate + + $ openssl req -newkey rsa:2048 -nodes -keyout chat.key \ + -x509 -days 365 -out chat.crt + +This creates ``chat.key`` and ``chat.crt`` in the current dir. + +.. code-block:: python3 + :caption: Secure server + + import asyncio + import ssl + + async def main(): + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ctx.check_hostname = False + + # These must have been created earlier with openssl + ctx.load_cert_chain('chat.crt', 'chat.key') + + server = await asyncio.start_server( + client_connected_cb=client_connected_cb, + host='localhost', + port=9011, + ssl=ctx, + ) + async with server: + await server.serve_forever() + + async def client_connected_cb(reader, writer): + print('Client connected') + received = await reader.read(1024) + while received: + print(f'received: {received}') + received = await reader.read(1024) + + if __name__ == '__main__': + asyncio.run(main()) + + +.. code-block:: python3 + :caption: Secure client + + import asyncio + import ssl + + async def main(): + print('Connecting...') + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) + ctx.check_hostname = False + + # The client must only have access to the cert *not* the key + ctx.load_verify_locations('chat.crt') + reader, writer = await asyncio.open_connection( + host='localhost', + port=9011, + ssl=ctx + ) + + writer.write(b'blah blah blah') + await writer.drain() + writer.close() + await writer.wait_closed() + + if __name__ == '__main__': + asyncio.run(main()) + +Correctly Closing Connections +----------------------------- + +- from the client side +- from the server side +- Yury I need your help here. What is the actual "correct" way + to do this? Streams API preferable, if possible. + +Handling Typical Socket Errors +------------------------------ + +- Maybe describe the situations in which they can occur? Not sure. + +- ``ConnectionError`` +- ``ConnectionResetError`` +- ``ConnectionAbortedError`` +- ``ConnectionRefusedError`` + +Might also want to show some examples of ``asyncio.IncompleteReadError``. + +Also link/refer to the socket programming HOWTO in the docs. + +Graceful Shutdown on Windows +---------------------------- + +TODO + + +Run A Blocking Call In An Executor +---------------------------------- + +- show example with default executor +- show example with a custom executor (thread-based) +- show example with a custom executor (process-based) + + +Adding Asyncio To An Existing Sync (Threaded) Application +--------------------------------------------------------- + +- Imagine an existing app that uses threading for concurrency, + but we want to make use of asyncio only for, say, a large + number of concurrent GET requests, but leave the rest of the + app unchanged. +- Plan would be to run the asyncio loop in another thread +- Can show how to safely communicate between that thread and + the main thread (or others). + + + +Notes: + +- My thinking here was a Q&A style, and then each section has + a code snippet demonstrating the answer. + diff --git a/Doc/library/asyncio-tutorial/case-study-chat-client-cli.rst b/Doc/library/asyncio-tutorial/case-study-chat-client-cli.rst new file mode 100644 index 00000000000000..37c1ae9ef6eb0c --- /dev/null +++ b/Doc/library/asyncio-tutorial/case-study-chat-client-cli.rst @@ -0,0 +1,19 @@ +Asyncio Case Study: Chat Application (Client) +============================================= + +WIP + +.. literalinclude:: client05.py + :caption: client.py + :language: python3 + + +TODO + +Notes: + +- using the streams API +- first show the client. +- will have to explain a message protocol. (But that's easy) +- then show the server +- spend some time on clean shutdown. diff --git a/Doc/library/asyncio-tutorial/case-study-chat-server.rst b/Doc/library/asyncio-tutorial/case-study-chat-server.rst new file mode 100644 index 00000000000000..38f4155106b8f9 --- /dev/null +++ b/Doc/library/asyncio-tutorial/case-study-chat-server.rst @@ -0,0 +1,372 @@ +Asyncio Case Study: Chat Application (Server) +============================================= + +We're going to build a chat application. Users will be able to +connect to a central server and send messages to "chat rooms", which +all the other users in those rooms will be able to see. This case study +gives us an opportunity to show how to use the various features of +``asyncio`` in a "real world" application. + +This will be a client-server application. The server application +will run on a central host, and the chat application will run on each +user's computer or device. The server is the easier of the two, for +interesting reasons that we'll get into later. + +We're going to start with a basic application layout and build up from +there. + +Starting Code Layout +-------------------- + +The code below shows the basic starting template for an asyncio +service. In these kinds of applications, the service will typically +run for a long time, serving clients that may connect, +disconnect, and then later reconnect multiple times. + +.. literalinclude:: server01.py + :language: python3 + +As explained earlier, ``main`` itself is a *coroutine function*, and +when evaluated, i.e., ``main()``, it returns a *coroutine* object +which the ``asyncio.run()`` function knows how to execute. + +.. note:: + The ``asyncio.run()`` function waits for ``main()`` to complete. + When ``main()`` returns, the ``run()`` function will then cancel + all tasks that are still around. This means, precisely, that the + ``asyncio.CancelledError`` exception will get raised in all such + pending tasks. This gives you a way to deal with an "application + shutdown" scenario: you just need to handle the ``CancelledError`` + exception in places where you need a controlled way of terminating + tasks. + +There isn't much more to say about the basic template, so let's add +the actual server. + +Server +------ + +We can use the *Streams API* to create a TCP server very +easily: + +.. literalinclude:: server02.py + :language: python3 + :linenos: + +We've added the ``start_server()`` call on line 5, and this call takes +not only the ``host`` and ``port`` parameters you'd expect, but also a +*callback* function that will be called for each new connection. This +is coroutine function ``client_connected()``, on line 13. + +The callback is provided with ``reader`` and ``writer`` parameters. +These give access to two streams for this new connection. It is here +that data will be received from, and sent to clients. + +Printing out "New client connected!" is obviously going to be quite +useless. We're going to want to receive chat messages from a client, +and we also want to send these messages to all the other clients in the +same "chat room". We don't yet have the concept of "rooms" defined +anywhere yet, but that's ok. Let's first focus on what must be sent +and received between server and client. + +Let's sketch out a basic design of the communication pattern: + +#. Client connects to server +#. Client sends a message to server to announce themselves, and join + a room +#. Client sends a message to a room +#. Server relays that message to all other clients in the same room +#. Eventually, client disconnects. + +These actions suggest a few different kinds of information that need to +be sent between server and client. We need to create a *protocol* +that both server and client can use to communicate. + +How about we use JSON messages? Here is an example of the payload a +client needs to provide immediately after connection: + +.. code-block:: json + :caption: Client payload after connection + + { + "action": "connect", + "username": "" + } + +Here are example messages for joining and leaving rooms: + +.. code-block:: json + :caption: Client payload to join a room + + { + "action": "joinroom", + "room": "" + } + +.. code-block:: json + :caption: Client payload to leave a room + + { + "action": "leaveroom", + "room": "" + } + +And here's an example of a client payload for sending a chat +message to a room: + +.. code-block:: json + :caption: Client payload to send a message to a room + + { + "action": "chat", + "room": "", + "message": "I'm reading the asyncio tutorial!" + } + +All of the JSON examples above are for payloads that will be received +from clients, but remember that the server must also send messages +to all clients in a room. That message might look something like this: + +.. code-block:: json + :caption: Server payload to update all clients in a room + :linenos: + + { + "action": "chat", + "room": "", + "message": "I'm reading the asyncio tutorial!", + "from": "" + } + +The message is similar to the one received by a client, but on line 5 +we now need to indicate from whom the message was sent. + +Message Structure +----------------- + +Now we have a rough idea about what messages between client and +server will look like. The Streams API, like the +underlying TCP protocol it wraps, does not give us message-handling +built-in. All we get is a stream of bytes. It is up to us to +decide how to break up the stream of bytes into recognizable messages. + +The most common raw message structure is based on the idea of a +*size prefix*: + +.. code-block:: text + :caption: Simple message structure, in bytes + + [3 bytes (header)][n bytes (payload)] + +The 3-byte header is an encoded 24-bit integer, which must be the size +of the following payload (in bytes). Using 3 bytes is arbitrary: I +chose it here because it allows a message size of up to 16 MB, which +is way, way larger than what we're going to need for this tutorial. + +Receiving A Message +^^^^^^^^^^^^^^^^^^^ + +Imagine we receive a message over a ``StreamReader`` instance. +Remember, this is what we get from the ``client_connected()`` +callback function. This is how we pull the bytes off the stream +to reconstruct the message: + +.. code-block:: python3 + :linenos: + + from asyncio import StreamReader + from typing import AsyncGenerator + import json + + async def new_messages(reader: StreamReader) -> AsyncGenerator[Dict, None]: + while True: + size_prefix = await reader.readexactly(3) + size = int.from_bytes(size_prefix, byteorder='little') + message = await reader.readexactly(size) + yield json.loads(message) + +This code shows the following: + +- line 5: This is an async function, but it's also a *generator*. All + that means is that there is a ``yield`` inside, that will produce + each new message as it comes in. +- line 6: A typical "infinite loop"; this async function will keep + running, and keep providing newly received messages back to the + caller code. +- line 7: We read exactly 3 bytes off the stream. +- line 8: Convert those 3 bytes into an integer object. Note that the + byteorder here, "little", must also be used in client code that + will be connecting to this server. +- line 9: Using the size calculated above, read exactly that number of + bytes off the stream +- line 10: decode the bytes from JSON and yield that dict out. + +The code sample above is pretty simple, but a little naive. There +are several ways in which it can fail and end the ``while True`` +loop. Let's add a bit more error handling: + +.. code-block:: python3 + :caption: Receiving JSON Messages over a Stream + :linenos: + + from asyncio import StreamReader, IncompleteReadError, CancelledError + from typing import AsyncGenerator + import json + + async def new_messages(reader: StreamReader) -> AsyncGenerator[Dict, None]: + try: + while True: + size_prefix = await reader.readexactly(3) + size = int.from_bytes(size_prefix, byteorder='little') + message = await reader.readexactly(size) + try: + yield json.loads(message) + except json.decoder.JSONDecodeError: + continue + except (OSError, IncompleteReadError, CancelledError): + # The connection is dead, leave. + return + +Now, we fail completely on a connection error, but if a particular +payload fails to deserialize via JSON, then we handle that too, but +allow the loop to continue listening for a new message. I included +handling for ``CancelledError``, which is how our application will +signal to this async function that the app is shutting down. + +There are many decisions that one can make here about how to deal +with errors: for example, you might perhaps +choose to terminate a connection if a particular payload fails to +deserialise properly. It seems unlikely that a client would send +through only a few invalid JSON messages, but the rest valid. For +simplicity, we'll keep what we have for now, and move onto + +.. note:: + There are several different kinds of connection-related errors, + like ``ConnectionError``, and ``ConnectionAbortedError`` and so on. + Unless you specifically want to know which kind of exception + occurred, it is safe to use ``OSError`` because all the connection-related + exceptions are subclasses of the built-in ``OSError``. The other + exception type to keep an eye on is ``IncompleteReadError`` which is + provided by the ``asyncio`` module, and is *not* a subclass of + ``OSError``. + +Sending A Message +^^^^^^^^^^^^^^^^^ + +This time, we use a ``StreamWriter`` instance. The code below is +very similar to what we saw in the receiver: + +.. code-block:: python3 + :caption: Sending JSON Messages over a Stream + :linenos: + + from asyncio import StreamWriter + from typing import Dict + import json + + async def send_message(writer: StreamWriter, message: Dict): + payload = json.dumps(message).encode() + size_prefix = len(payload).to_bytes(3, byteorder='little') + await writer.writelines([size_prefix, payload]) + +Let's step through the lines: + +- line 5: This async function must be called for each message that + must be sent. +- line 6: Serialize the message to bytes. +- line 7: Build the size header; remember, this needs to be sent + before the payload itself. +- line 8: Write both the size header and the payload to the stream; we + could have concatenated the bytes and simply used ``await writer.write()``, + that would make a full copy of the bytes in ``payload``, and for large + messages those extra copies will add up very quickly! + +We can place the two async functions above, ``new_messages()`` +and ``send_message()``, into their own module called ``utils.py`` +(Since we'll be using these function in both our server code and +our client code!). + +For completeness, here is that final utils module: + +.. literalinclude:: utils01.py + :caption: utils.py + :language: python3 + +Let's return to the main server application and see how to +incorporate our new utility functions into the code. + +Server: Message Handling +------------------------ + +Below, we import the new ``utils.py`` module and incorporate +some handling for receiving new messages: + +.. literalinclude:: server03.py + :caption: Server code with basic message handling + :linenos: + :language: python3 + +We've added a few new things inside the ``client_connected`` +callback function: + +- line 19: This is a handler function that will get called if a "connect" + message is received. We have similar handler functions for the other + action types. +- line 31: A simple dictionary that maps an action "name" to the handler + function for that action. +- line 38: Here you can see how our async generator ``new_messages()`` + gets used. We simply loop over it, as you would with any other generator, + and it will return a message only when one is received. Note the one + minor difference as compared to a regular generator: you have to iterate + over an async generator with ``async for``. +- line 39: Upon receiving a message, check which action must be taken. +- line 43: Look up the *handler function* that corresponds to the + action. We set up these handlers earlier at line 31. +- line 47: call the handler function. + +Our server code still doesn't do much; but at least it'll be testable +with a client sending a few different kinds of actions, and we'll be +able to see print output for each different kind of action received. + +The next thing we'll have to do is set up chat rooms. There's no point +receiving messages if there's nowhere to put them! + +Server: Room Handling +--------------------- + +We need collections to store which connections are active, and which +rooms each user has joined. We'll manage these events inside the callback +function ``client_connected_cb()``. Here's a snippet of just that, and +the global collections we'll use to track connections and room +membership: + +.. literalinclude:: server05.py + :caption: Joining and leaving rooms + :lines: 9-10,18-39 + :language: python3 + +When we receive a request to join +a room is received, that room is looked up by name (or created automatically +by the ``defaultdict``) and that connection is added to that room. The +inverse happens when a request is received to leave a room. When a new chat +message is received, it must be sent to all the other connections in that +room. The ``send_message()`` function is from our ``utils`` module shown +earlier. + +Note how we don't call ``await send_message()``, but instead we create +a separate task for sending to each connection. This is because we don't +want to delay any of these messages. If we used the ``await`` keyword, +then each send call would hold up the next one in the iteration. Using +``create_task()`` in this way allows us to run a coroutine +"in the background", without waiting for it to return before proceeding +with the next lines of code. + +Server: Final Version +--------------------- + +At this point the server part of our project is complete and we can show +the final server module: + +.. literalinclude:: server05.py + :caption: server.py + :language: python3 diff --git a/Doc/library/asyncio-tutorial/client05.py b/Doc/library/asyncio-tutorial/client05.py new file mode 100644 index 00000000000000..a89c2cb589df3a --- /dev/null +++ b/Doc/library/asyncio-tutorial/client05.py @@ -0,0 +1,36 @@ +import asyncio +from utils import new_messages, send_message + +from prompt_toolkit.eventloop.defaults import use_asyncio_event_loop +from prompt_toolkit.patch_stdout import patch_stdout +from prompt_toolkit.shortcuts import PromptSession + + +async def main(): + use_asyncio_event_loop() + + reader, writer = await asyncio.open_connection('localhost', '9011') + await send_message(writer, dict(action='connect', username='Eric')) + await send_message(writer, dict(action='joinroom', room='nonsense')) + + asyncio.create_task(enter_message(writer)) + + async for msg in new_messages(reader): + print(f"{msg['from']}: {msg['message']}") + + +async def enter_message(writer): + session = PromptSession('Send message: ', erase_when_done=True) + while True: + try: + msg = await session.prompt(async_=True) + if not msg: + continue + await send_message(writer, msg) + except (EOFError, asyncio.CancelledError): + return + + +if __name__ == '__main__': + with patch_stdout(): + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/index.rst b/Doc/library/asyncio-tutorial/index.rst new file mode 100644 index 00000000000000..202f2d0294f3dd --- /dev/null +++ b/Doc/library/asyncio-tutorial/index.rst @@ -0,0 +1,24 @@ +Asyncio Tutorial +================ + +Programming with ``async def`` functions is different to normal Python +functions; enough so that it is useful to explain a bit more +about what ``asyncio`` is for, and how to use it in typical +programs. + +This tutorial will focus on what an end-user of ``asyncio`` should +learn to get the most value out of it. Our focus is going to be +primarily on the "high-level" API, as described in the +:mod:`documentation `. + + +.. toctree:: + :maxdepth: 1 + + what-asyncio.rst + why-asyncio.rst + async-functions.rst + running-async-functions.rst + asyncio-cookbook.rst + case-study-chat-server.rst + case-study-chat-client-cli.rst diff --git a/Doc/library/asyncio-tutorial/pttest.py b/Doc/library/asyncio-tutorial/pttest.py new file mode 100644 index 00000000000000..b7fb5e82073ef8 --- /dev/null +++ b/Doc/library/asyncio-tutorial/pttest.py @@ -0,0 +1,32 @@ +import asyncio + +from prompt_toolkit.eventloop.defaults import use_asyncio_event_loop +from prompt_toolkit.patch_stdout import patch_stdout +from prompt_toolkit.shortcuts import PromptSession + + +async def blah(): + while True: + print('.') + await asyncio.sleep(10.0) + + +async def prompt(): + session = PromptSession('Message: ', erase_when_done=True) + while True: + try: + msg = await session.prompt(async_=True) + print(msg) + except (EOFError, asyncio.CancelledError): + return + + +async def main(): + use_asyncio_event_loop() + await asyncio.gather(blah(), prompt()) + # await asyncio.gather(blah()) + + +if __name__ == '__main__': + with patch_stdout(): + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/running-async-functions.rst b/Doc/library/asyncio-tutorial/running-async-functions.rst new file mode 100644 index 00000000000000..9937d9bf3ffa1a --- /dev/null +++ b/Doc/library/asyncio-tutorial/running-async-functions.rst @@ -0,0 +1,211 @@ +Three Ways To Execute Async Functions +===================================== + +In a previous section we looked at the difference between sync functions +and async functions, and other async language syntax features. +Here we focus specifically on how to execute async functions. + +Imagine we have an async function, called ``my_coro_fn()``, and we want to +run it. There are three ways: + +1. ``asyncio.run(my_coro_fn())`` +2. ``await my_coro_fn()`` +3. ``asyncio.create_task(my_coro_fn())`` + +The first, ``asyncio.run()`` will create a new event loop and is intended +to be called only from sync code. It is typically used to start off the +whole program. + +The second, ``await my_coro_fn()``, has already been covered in a previous +section and is used to both execute the async function, and wait for the +function to complete. The ``await`` keyword can only be used inside an +``async def`` function. It is expected that *most* of your async functions +will be executed with the ``await`` keyword. + +The third is something we haven't covered before: the ``asyncio.create_task()`` +function. This function will call your async function, and create an +``asyncio.Task`` object to wrap it. This means that your async function will +begin executing but the code in the calling context will *not* wait for your +async function before continuing with the next code. + +Let's have a look at that with an example: + +.. code-block:: python3 + + import asyncio + + async def f(): + await asyncio.sleep(10) + print('f is done') + + async def g(): + await asyncio.sleep(5) + print('g is done') + + async main(): + asyncio.create_task(f()) # (1) + await g() # (2) + + asyncio.run(main()) + +Looking at line (1), we see that async function ``f()`` is called and +passed to ``create_task()``, and immediately after, async function ``g()`` +is called with ``await g()``. + +Even though ``f()`` is called first, async function ``g()`` will finish +first (5 seconds is shorter than 10 seconds), and you'll see "g is done" +printed before "f is done". This is because +although ``create_task()`` does schedule the given async function to be +executed, it does not wait for the call to complete, unlike when the +``await`` keyword is used. + +However, note that the task returned by ``create_task()`` can indeed be +awaited, and this will make the order of calls sequential once again: + +.. code-block:: python3 + + import asyncio + + async def f(): + await asyncio.sleep(10) + print('f is done') + + async def g(): + await asyncio.sleep(5) + print('g is done') + + async main(): + task = asyncio.create_task(f()) # (1) + await task + await g() # (2) + + asyncio.run(main()) + +In the sample above, we specifically use the ``await`` keyword on the task +object returned by the ``create_task()`` function, and this means that +the execution of that task must complete before the next ``await g()`` call +can be started. + +There are a few other ways that async functions can be started, but they +are just decoration over the three ways discussed above. For example, the +``asyncio.gather()`` function can also receive async functions: + +.. code-block:: python3 + + import asyncio + + async def f(): + await asyncio.sleep(10) + print('f is done') + + async def g(): + await asyncio.sleep(5) + print('g is done') + + async main(): + await asyncio.gather( + asyncio.create_task(f()), + g() + ) + + asyncio.run(main()) + +In this example above, we didn't explicitly use the ``await`` keyword on +the async function ``g()``, but nevertheless it will still be executed. +Inside the ``gather()`` function, the coroutine object returned by ``g()`` +will be wrapped in a ``Task`` object, similar to what we're doing with +``f()``. The ``await gather()`` line above will only return once *both* +``f()`` and ``g()`` have completed (and in fact, it wasn't necessary to +wrap ``f()`` in a task at all here, but it was included just to show that +it works). + +.. note:: The ``create_task()`` API is useful to understand concurrency + features in modern JavaScript, or *vice-versa* if you're coming to + Python from the context of JavaScript. JS also has ``async`` + and ``await`` keywords, and they work *almost* exactly the same as + described in this Python tutorial! There is however one big + difference: In JavaScript, all async functions, when called, behave + like ``asyncio.create_task()`` calls. Consider the following + JavaScript code: + + .. code-block:: javascript + + async func1 () { + return await http.get('http://example.com/1') + } + async func2 () { + return await http.get('http://example.com/2') + } + async main () { + task1 = func1() // In Python: task1 = create_task(func1()) + task2 = func2() // In Python: task2 = create_task(func2()) + [result1, result2] = [await task1, await task2] + } + + In Python, when you see two ``await`` keywords in series, it usually + reads as "first the one, then the other". This is because the ``await`` + keyword suspends the calling context until the coroutine returns. + In the JavaScript shown above, that is not the case, both ``task1`` + *and* ``task2`` will run concurrently, although ``result1`` and + ``result2`` will only be set when both tasks have completed. + + A naive translation of the JavaScript code to Python might look + like this: + + .. code-block:: python3 + + async def func1(): + return await http.get('http://example.com/1') + + async func2(): + return await http.get('http://example.com/2') + + async def main(): + coro1 = func1() + coro2 = func2() + [result1, result2] = [await coro1, await coro2] + } + + However, this will *not* behave the same: ``coro2`` will begin + running only after ``coro1`` has completed! Instead, one can use + Python's ``create_task()`` to more closely mimic the JavaScript + behaviour: + + .. code-block:: python3 + + async def func1(): + return await http.get('http://example.com/1') + + async func2(): + return await http.get('http://example.com/2') + + async def main(): + task1 = asyncio.create_task(func1()) + task2 = asyncio.create_task(func2()) + [result1, result2] = [await task1, await task2] + } + + Now ``task1`` and ``task2`` will run concurrently, and the results + will be assigned only after both tasks are complete. Of course, this is + not idiomatic in Python: the more common pattern for waiting on + several coroutines concurrently is with the ``gather`` API, which + includes a highly-recommended error-handling feature: + + .. code-block:: python3 + + async def main(): + [result1, result2] = await asyncio.gather( + func1(), func2(), return_exceptions=True + ) + + Setting ``return_exceptions=True`` makes raised exceptions from + any of the given coroutines become "returned" values instead, and + then it is up to you to check whether either of ``result1`` or + ``result2`` is an ``Exception`` type. + + The documentation for ``asyncio.gather()`` has an important warning: + if ``return_exceptions=False``, any exception raised from one of the + coroutines will bubble up into your calling code. This will cause + the ``gather`` call to terminate, but the *other* coroutines supplied + to the ``gather()`` call will **not** be affected, and will continue + to run. diff --git a/Doc/library/asyncio-tutorial/server01.py b/Doc/library/asyncio-tutorial/server01.py new file mode 100644 index 00000000000000..7c938e422a283d --- /dev/null +++ b/Doc/library/asyncio-tutorial/server01.py @@ -0,0 +1,7 @@ +import asyncio + +async def main(): + ... + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/server02.py b/Doc/library/asyncio-tutorial/server02.py new file mode 100644 index 00000000000000..554721ab3ca8eb --- /dev/null +++ b/Doc/library/asyncio-tutorial/server02.py @@ -0,0 +1,17 @@ +import asyncio +from asyncio import StreamReader, StreamWriter + +async def main(): + server = await asyncio.start_server( + client_connected_cb=client_connected, + host='localhost', + port='9011', + ) + async with server: + await server.serve_forever() + +async def client_connected(reader: StreamReader, writer: StreamWriter): + print('New client connected!') + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/server03.py b/Doc/library/asyncio-tutorial/server03.py new file mode 100644 index 00000000000000..e562668ffaa829 --- /dev/null +++ b/Doc/library/asyncio-tutorial/server03.py @@ -0,0 +1,51 @@ +import asyncio +from asyncio import StreamReader, StreamWriter +from typing import Dict, Callable +from utils import new_messages + + +async def main(): + server = await asyncio.start_server( + client_connected_cb=client_connected, + host='localhost', + port='9011', + ) + async with server: + await server.serve_forever() + + +async def client_connected(reader: StreamReader, writer: StreamWriter): + + def connect(msg): + print(msg.get('username')) + + def joinroom(msg): + print('joining room:', msg.get('room')) + + def leaveroom(msg): + print('leaving room:', msg.get('room')) + + def chat(msg): + print(f'chat sent to room {msg.get("room")}: {msg.get("message")}') + + handlers: Dict[str, Callable] = dict( + connect=connect, + joinroom=joinroom, + leaveroom=leaveroom, + chat=chat, + ) + + async for msg in new_messages(reader): + action = msg.get('action') + if not action: + continue + + handler = handlers.get(action) + if not handler: + continue + + handler(msg) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/server04.py b/Doc/library/asyncio-tutorial/server04.py new file mode 100644 index 00000000000000..611ce55cdf572d --- /dev/null +++ b/Doc/library/asyncio-tutorial/server04.py @@ -0,0 +1,65 @@ +import asyncio +from asyncio import StreamReader, StreamWriter +from collections import defaultdict +from weakref import WeakValueDictionary, WeakSet +from typing import Dict, Callable, Set +from utils import new_messages + +WRITERS: Dict[str, StreamWriter] = WeakValueDictionary() +ROOMS: Dict[str, Set[StreamWriter]] = defaultdict(WeakSet) + + +async def main(): + server = await asyncio.start_server( + client_connected_cb=client_connected, + host='localhost', + port='9011', + ) + async with server: + await server.serve_forever() + + +async def client_connected(reader: StreamReader, writer: StreamWriter): + addr = writer.get_extra_info('peername') + WRITERS[addr] = writer + + def connect(msg): + print(f"User connected: {msg.get('username')}") + + def joinroom(msg): + room_name = msg["room"] + print('joining room:', room_name) + room = ROOMS[room_name] + room.add(writer) + + def leaveroom(msg): + room_name = msg["room"] + print('leaving room:', msg.get('room')) + room = ROOMS[room_name] + room.discard(writer) + + def chat(msg): + print(f'chat sent to room {msg.get("room")}: {msg.get("message")}') + # TODO: distribute the message + + handlers: Dict[str, Callable] = dict( + connect=connect, + joinroom=joinroom, + leaveroom=leaveroom, + chat=chat, + ) + + async for msg in new_messages(reader): + action = msg.get('action') + if not action: + continue + + handler = handlers.get(action) + if not handler: + continue + + handler(msg) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/server05.py b/Doc/library/asyncio-tutorial/server05.py new file mode 100644 index 00000000000000..ca542f5ef0ce93 --- /dev/null +++ b/Doc/library/asyncio-tutorial/server05.py @@ -0,0 +1,54 @@ +import asyncio +from asyncio import StreamReader, StreamWriter +from collections import defaultdict +from weakref import WeakValueDictionary, WeakSet +from typing import Dict, Callable, Set, MutableMapping, DefaultDict +from utils import new_messages, send_message +import json + +ROOMS: DefaultDict[str, Set[StreamWriter]] = defaultdict(WeakSet) + + +async def main(): + server = await asyncio.start_server(client_connected, 'localhost', '9011') + async with server: + await server.serve_forever() + + +async def client_connected(reader: StreamReader, writer: StreamWriter): + def connect(msg): + print(f"User connected: {msg.get('username')}") + + def joinroom(msg): + room_name = msg["room"] + print('joining room:', room_name) + room = ROOMS[room_name] + room.add(writer) + + def leaveroom(msg): + room_name = msg["room"] + print('leaving room:', msg.get('room')) + room = ROOMS[room_name] + room.discard(writer) + + def chat(msg): + print(f'chat sent to room {msg.get("room")}: {msg.get("message")}') + room = ROOMS[msg["room"]] + for friend in room: + asyncio.create_task(send_message(friend, msg)) + + handlers: Dict[str, Callable[[Dict], None]] = dict( + connect=connect, + joinroom=joinroom, + leaveroom=leaveroom, + chat=chat, + ) + + async for msg in new_messages(reader): + action = msg.get('action') + handler = handlers.get(action) + handler(msg) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/server20.py b/Doc/library/asyncio-tutorial/server20.py new file mode 100644 index 00000000000000..1385615bf0b04a --- /dev/null +++ b/Doc/library/asyncio-tutorial/server20.py @@ -0,0 +1,73 @@ +import asyncio +from collections import defaultdict +from weakref import WeakValueDictionary +import json +import utils +import ssl + + +WRITERS = WeakValueDictionary() +ROOMS = defaultdict(WeakValueDictionary) + + +async def sender(addr, writer, room, msg): + try: + await utils.send_message( + writer, + json.dumps(dict(room=room, msg=msg)).encode() + ) + except OSError: + """ Connection is dead, remove it.""" + if addr in WRITERS: + del WRITERS[addr] + if addr in ROOMS[room]: + del ROOMS[room][addr] + + +def send_to_room(from_addr, room: str, msg: str): + """Send the message to all clients in the room.""" + for addr, writer in ROOMS[room].items(): + print(f'Sending message to {addr} in room {room}: {msg}') + asyncio.create_task(sender(addr, writer, room, msg)) + + +async def client_connected_cb(reader, writer): + addr = writer.get_extra_info('peername') + print(f'New connection from {addr}') + WRITERS[addr] = writer + async for msg in utils.messages(reader): + print(f'Received bytes: {msg}') + d = json.loads(msg) + if d.get('action') == 'join': + ROOMS[d['room']][addr] = writer + elif d.get('action') == 'leave': + del ROOMS[d['room']][addr] + else: + d['from'] = addr + send_to_room(addr, d['room'], d['msg']) + + +async def main(): + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ctx.check_hostname = False + ctx.load_cert_chain('chat.crt', 'chat.key') + server = await asyncio.start_server( + client_connected_cb=client_connected_cb, + host='localhost', + port='9011', + ssl=ctx, + ) + shutdown = asyncio.Future() + utils.install_signal_handling(shutdown) + print('listening...') + async with server: + done, pending = await asyncio.wait( + [server.serve_forever(), shutdown], + return_when=asyncio.FIRST_COMPLETED + ) + if shutdown.done(): + return + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/Doc/library/asyncio-tutorial/utils01.py b/Doc/library/asyncio-tutorial/utils01.py new file mode 100644 index 00000000000000..e6a4b4eebf3af4 --- /dev/null +++ b/Doc/library/asyncio-tutorial/utils01.py @@ -0,0 +1,26 @@ +import json +from asyncio import ( + StreamReader, StreamWriter, IncompleteReadError, CancelledError +) +from typing import AsyncGenerator, Dict + + +async def new_messages(reader: StreamReader) -> AsyncGenerator[Dict, None]: + try: + while True: + size_prefix = await reader.readexactly(3) + size = int.from_bytes(size_prefix, byteorder='little') + message = await reader.readexactly(size) + try: + yield json.loads(message) + except json.decoder.JSONDecodeError: + continue + except (OSError, IncompleteReadError, CancelledError): + # The connection is dead, leave. + return + + +async def send_message(writer: StreamWriter, message: Dict): + payload = json.dumps(message).encode() + size_prefix = len(payload).to_bytes(3, byteorder='little') + await writer.writelines([size_prefix, payload]) diff --git a/Doc/library/asyncio-tutorial/utils20.py b/Doc/library/asyncio-tutorial/utils20.py new file mode 100644 index 00000000000000..7c8f708a0a26b5 --- /dev/null +++ b/Doc/library/asyncio-tutorial/utils20.py @@ -0,0 +1,56 @@ +import sys +from asyncio import (StreamReader, StreamWriter, IncompleteReadError, Future, + get_running_loop, CancelledError) + +if sys.platform == 'win32': + from signal import signal, SIGBREAK, SIGTERM, SIGINT +else: + SIGBREAK = None + from signal import signal, SIGTERM, SIGINT + +from typing import AsyncGenerator + + +async def messages(reader: StreamReader) -> AsyncGenerator[bytes, None]: + """Async generator to return messages as they come in.""" + try: + while True: + size_prefix = await reader.readexactly(4) + size = int.from_bytes(size_prefix, byteorder='little') + message = await reader.readexactly(size) + yield message + except (OSError, IncompleteReadError, CancelledError): + return + + +async def send_message(writer: StreamWriter, message: bytes): + """To close the connection, use an empty message.""" + if not message: + await writer.close() + return + size_prefix = len(message).to_bytes(4, byteorder='little') + await writer.writelines([size_prefix, message]) + + +def install_signal_handling(fut: Future): + """Given future will be set a signal is received. This + can be used to control the shutdown sequence.""" + if sys.platform == 'win32': + sigs = SIGBREAK, SIGINT + loop = get_running_loop() + + def busyloop(): + """Required to handle CTRL-C quickly on Windows + https://bugs.python.org/issue23057 """ + loop.call_later(0.1, busyloop) + + loop.call_later(0.1, busyloop) + else: + sigs = SIGTERM, SIGINT + + # Signal handlers. Windows is a bit tricky + for s in sigs: + signal( + s, + lambda *args: loop.call_soon_threadsafe(fut.set_result, None) + ) diff --git a/Doc/library/asyncio-tutorial/what-asyncio.rst b/Doc/library/asyncio-tutorial/what-asyncio.rst new file mode 100644 index 00000000000000..e3d21c638f05ae --- /dev/null +++ b/Doc/library/asyncio-tutorial/what-asyncio.rst @@ -0,0 +1,161 @@ +What Does "Async" Mean? +======================= + +Let's make a function that communicates over the network: + +.. code-block:: python3 + + import socket + + def greet(host, port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((host, host)) + s.sendall(b'Hello, world') + reply = s.recv(1024) + + print('Reply:', repr(reply)) + +This function will: + +#. make a socket connection to a host, +#. send ``b'Hello, world'``, and +#. **wait** for a reply. + +The key point here is about the word *wait*: in the code, execution proceeds line-by-line +until the line ``reply = s.recv(1024)``. While these lines +of code are executing, no other code (in the same thread) can run. We call this behaviour +*synchronous*. At the point where we wait, execution pauses +until we get a reply from the host. + +Now the question comes up: what if you need to send a greeting to +*multiple* hosts? You could just call it twice, right? + +.. code-block:: python3 + + import socket + + def greet(host, port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((host, host)) + s.sendall(b'Hello, world') + reply = s.recv(1024) + + print('Reply:', repr(reply)) + + greet(host1, port1) + greet(host2, port2) + +This works fine, but see that the first call to ``greet`` will completely +finish before the second call (to ``host2``) can begin. + +Computers process things sequentially, which is why programming languages +like Python also work sequentially; but what we see here is that our +code is also going to **wait** sequentially. This is, quite literally, +a waste of time. What we would really like to do here is wait for +all the replies *concurrently*, i.e., at the same time. + +Preemptive Concurrency +---------------------- + +Operating systems like Windows, Mac and Linux, and others, understand +this problem deeply. If you're reading this on a computer or even on your +mobile, there will be tens or hundreds of processes running at the same +time on your device. At least, they will appear to be running +concurrently. What is really happening is that the operating system +is sharing little slices of processor (CPU) time among all the +processes. If we started two completely separate instances of our ``greet`` program at the same time, they *would* run (and therefore wait) +concurrently which is exactly what we want. + +However, there is a price for that: each new process consumes extra resources +from the operating system; it's not just that we wait in parallel, but +*everything* is now in parallel, a full copy of our program. +But more than that, there is another tricky +problem about *how* the operating system knows when to allocate +execution time between each copy of our process. The answer: it doesn't! +This means that the operating system can decide when to give processor +time to each process. Your code, and therefore you, will not know when +these switches occur. This is called "preemption". From +`Wikipedia `_: +*In computing, preemption is the act of temporarily interrupting a +task being carried out by a computer system, without requiring +its cooperation, and with the intention of resuming the task +at a later time*. + +Operating Systems do this kind of preemptive switching for both +processes and threads. A simplistic but useful description of the +difference is that one process can have multiple threads, and those +threads share all the memory in their parent process. + +Because of this preemptive switching, you will never be sure of +when each of your processes and threads is *actually* executing on +a CPU *relative to each other*. For processes, this is quite safe because +their memory spaces are isolated from each other; however, +**threads** are not isolated from each other (within the same process). +In fact, the primary feature of threads over processes is that +multiple threads within a single process can access the same memory. +And this is where all the problems begin. + +Jumping back to our code sample further up: we may also choose to run the +``greet()`` function in multiple threads; and then +they will also wait for replies concurrently. However, now you have +two threads that are allowed to access the same objects in memory, +with little control over +how execution will switch between the two threads (unless you +use the synchronization primitives in the ``threading`` module) . This +situation can result in *race conditions* in how objects are modified, +and these bugs can be very difficult to fix. + +Cooperative Concurrency +----------------------- + +This is where "async" programming comes in. It provides a way to manage +multiple socket connections all in a single thread; and the best part +is that you get to control *when* execution is allowed to switch between +these different contexts. + +We will explain more of the details throughout this tutorial, +but briefly, our earlier example becomes something like the following +pseudocode: + +.. code-block:: python3 + + import asyncio + + async def greet(host, port): + reader, writer = await asyncio.open_connection(host, port) + writer.write(b'Hello, world') + reply = await reader.recv(1024) + writer.close() + + print('Reply:', repr(reply)) + + async def main(): + # Both calls run at the same time + await asyncio.gather( + greet(host1, port1), + greet(host2, port2) + ) + + asyncio.run(main()) + +In this code, the two instances of the ``greet()`` function will +run concurrently. + +There are a couple of new things here, but I want you to focus +on the new keyword ``await``. Unlike threads, execution is allowed to +switch between concurrent tasks **only** at places where the +``await`` keyword appears. On all other lines, execution is exactly the +same as normal Python, and will not be preempted by thread switching (there's +typically only a single thread in most ``asyncio`` programs). +These ``async def`` functions are called +"asynchronous" because execution does not pass through the function +top-down, but instead can suspend in the middle of a function at the +``await`` keyword, and allow another function to execute while +*this function* is waiting for I/O (usually network) data. + +An additional advantage of the *async* style above is that it lets us +manage many thousands of concurrent, long-lived socket connections in a simple way. +One *can* also use threads to manage concurrent long-lived socket connections, +but it gets difficult to go past a few thousand because the creation +of operating system threads, just like processes, consumes additional +resources from the operating system. diff --git a/Doc/library/asyncio-tutorial/why-asyncio.rst b/Doc/library/asyncio-tutorial/why-asyncio.rst new file mode 100644 index 00000000000000..5a88bccc8afe07 --- /dev/null +++ b/Doc/library/asyncio-tutorial/why-asyncio.rst @@ -0,0 +1,35 @@ +Why Asyncio? +============ + +There are two very specific reasons for using ``async def`` functions: + +#. Safety: easier reasoning about concurrency, and virtually + eliminate `memory races `_ + in concurrent network code +#. High concurrency: huge number of open socket connections + +Safety +------ + +- async/await makes all context switches visible; that makes it easy + to spot race conditions and + `reason about your code `_ + +- in general, all datastructures are safe for async (we cannot say same + for threads) + +- an async/await library means that it's safe to use it in concurrent + async/await code (you can never be sure if some library is thread-safe, + even if it claims that) + +- language constructs like 'async for' and 'async with' enable structured + concurrency + +High Concurrency +---------------- + +- high-throughput IO or 1000s of long-living connections are only + doable with asyncio + +- if you don't need to scale your code right now but might need + in near future investing in async/await is wise diff --git a/Doc/library/asyncio.rst b/Doc/library/asyncio.rst index 6990adb21e3603..233a7f05520ab9 100644 --- a/Doc/library/asyncio.rst +++ b/Doc/library/asyncio.rst @@ -88,6 +88,7 @@ Additionally, there are **low-level** APIs for :caption: Guides and Tutorials :maxdepth: 1 + asyncio-tutorial/index.rst asyncio-api-index.rst asyncio-llapi-index.rst asyncio-dev.rst diff --git a/Misc/NEWS.d/next/Documentation/2019-06-15-14-58-28.bpo-34831.mFkyqe.rst b/Misc/NEWS.d/next/Documentation/2019-06-15-14-58-28.bpo-34831.mFkyqe.rst new file mode 100644 index 00000000000000..71ae600ad89544 --- /dev/null +++ b/Misc/NEWS.d/next/Documentation/2019-06-15-14-58-28.bpo-34831.mFkyqe.rst @@ -0,0 +1 @@ +Add asyncio tutorial