diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 5a250477d1e7..7fb3dd1b80a3 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -232,9 +232,17 @@ - [Async](async.md) - [async/await](async/async-await.md) - [Async Blocks](async/async-blocks.md) -- [Futures](async/futures.md) -- [Runtimes](async/runtimes.md) -- [Tasks](async/tasks.md) + - [Futures](async/futures.md) + - [Runtimes](async/runtimes.md) + - [Tasks](async/tasks.md)ures](async/futures.md) + - [Async Channels](async/channels.md) + - [Futures Control Flow](async/control-flow.md) + - [Daemon](async/control-flow/daemon.md) + - [Join](async/control-flow/join_all.md) + - [Select](async/control-flow/select.md) + - [Pitfalls](async/pitfalls.md) + - [Blocking the executor](async/pitfalls/blocking-executor.md) + - [Pin](async/pitfalls/pin.md) - [Exercises](exercises/day-4/async.md) # Final Words diff --git a/src/async/channels.md b/src/async/channels.md new file mode 100644 index 000000000000..bd3df621360d --- /dev/null +++ b/src/async/channels.md @@ -0,0 +1,36 @@ +# Async Channels + +Multiple Channels crates have support for `async`/`await`. For instance `tokio` channels: + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; + +async fn ping_handler(mut input: Receiver<()>) { + let mut count: usize = 0; + + while let Some(_) = input.recv().await { + count += 1; + println!("Received {count} pings so far."); + } +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + let ping_handler_task = tokio::spawn(ping_handler(receiver)); + for _ in 0..10 { + sender.send(()).await.expect("Failed to send ping."); + } + + std::mem::drop(sender); + ping_handler_task.await.expect("Something went wrong in ping handler task."); +} +``` + +
+ +- Overall, the interface is similar to the `sync` channels as seen in the [morning class](concurrency/channels.md). +- The `Flume` crate has channels that implement both `sync` and `async` `send` and `recv`. This can be convenient for complex application with both IO and heavy CPU processing tasks. +- What makes working with `async` channels preferable is the ability to combine them with other `future`s to combine them and create complex control flow. + +
diff --git a/src/async/control-flow.md b/src/async/control-flow.md new file mode 100644 index 000000000000..b0158d87b04d --- /dev/null +++ b/src/async/control-flow.md @@ -0,0 +1,9 @@ +# Futures Control Flow + +Futures can be combined together to produce concurrent compute flow graphs. We will cover multiple common operations: + +---- + +- [Daemon](control-flow/daemon.md) +- [Join](control-flow/join_all.md) +- [Select](control-flow/select.md) diff --git a/src/async/control-flow/daemon.md b/src/async/control-flow/daemon.md new file mode 100644 index 000000000000..eeb0425f55e9 --- /dev/null +++ b/src/async/control-flow/daemon.md @@ -0,0 +1,33 @@ +# Daemon + +Tasks can be spawned without having to be awaited. They will be scheduled like any other tasks by the executor but won't block any running task. This can be useful for tasks that function like actors, receiving messages and sending messages to other tasks through channels. It can also be useful to log metrics or ping system's health. + +```rust,editable,compile_fail + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() { + let seconds_since_beginning = Arc::new(AtomicUsize::from(0)); + let counter = Arc::clone(&seconds_since_beginning); + tokio::spawn(async move { + loop { + sleep(Duration::from_secs(1)).await; + counter.fetch_add(1, Ordering::SeqCst); + } + }); + + sleep(Duration::from_millis(4500)).await; + assert_eq!(seconds_since_beginning.load(Ordering::Relaxed), 4); +} + + +``` + +
+ +* It is good practice to make your deamons exit because some other blocking task might depend on them. Which would prevent your main thread from ever closing. You can use a `oneshot` channel to signal the task to terminate. You can also use the `ctrl+c` signal handler from `tokio` as an interrupt signal. + +
diff --git a/src/async/control-flow/join_all.md b/src/async/control-flow/join_all.md new file mode 100644 index 000000000000..65dca5c65cc9 --- /dev/null +++ b/src/async/control-flow/join_all.md @@ -0,0 +1,48 @@ +# join_all + +Futures can be combined together to produce concurrent compute flow graphs. + +## Run a group of futures concurrently until they all resolve: `join_all` + +### Equivalents: + +- JS: `Promise.all` +- Python: `asyncio.gather` + +```rust,editable,compile_fail +use anyhow::Result; +use futures::future; +use reqwest; +use std::collections::HashMap; + +async fn size_of_page(url: &str) -> Result { + let resp = reqwest::get(url).await?; + Ok(resp.text().await?.len()) +} + +#[tokio::main] +async fn main() { + let urls: [&str; 4] = [ + "https://google.com", + "https://httpbin.org/ip", + "https://play.rust-lang.org/", + "BAD_URL", + ]; + let futures_iter = urls.into_iter().map(size_of_page); + let results = future::join_all(futures_iter).await; + let page_sizes_dict: HashMap<&str, Result> = + urls.into_iter().zip(results.into_iter()).collect(); + println!("{:?}", page_sizes_dict); +} +``` + +
+ +* `join_all` should soon be stabilized as part of the standard library in `std::future`. +* For multiple futures of disjoint types, you can use `join!` but you must know how many futures you will have at compile time. +* You can also combine `join_all` with `join!` for instance to join all requests to an http service as well as a database query. +* The risk of `join` is that one of the future could never resolve, this would cause your program to stall. +* Try adding a timeout to the future. + +
+ diff --git a/src/async/control-flow/select.md b/src/async/control-flow/select.md new file mode 100644 index 000000000000..6f53383be0d0 --- /dev/null +++ b/src/async/control-flow/select.md @@ -0,0 +1,63 @@ +# Select + +## Run multiple futures concurrently until the first one resolves + +### Equivalents: + +- JS: `Promise.race` +- Python: `asyncio.new_event_loop().run_until_complete(asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED))` + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, PartialEq)] +enum Animal { + Cat { name: String }, + Dog { name: String }, +} + +async fn first_animal_to_finish_race( + mut cat_rcv: Receiver, + mut dog_rcv: Receiver, +) -> Option { + tokio::select! { + cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }), + dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? }) + } +} + +#[tokio::main] +async fn main() { + let (cat_sender, cat_receiver) = mpsc::channel(32); + let (dog_sender, dog_receiver) = mpsc::channel(32); + tokio::spawn(async move { + sleep(Duration::from_secs(10)).await; + cat_sender + .send(String::from("Felix")) + .await + .expect("Failed to send cat."); + }); + tokio::spawn(async move { + sleep(Duration::from_secs(5)).await; + dog_sender + .send(String::from("Rex")) + .await + .expect("Failed to send cat."); + }); + + let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) + .await + .expect("Failed to receive winner"); + + assert_eq!(winner, Animal::Dog {name: String::from("Rex")}); +} +``` + +
+ +* In this example, we have a race between a cat and a dog. `first_animal_to_finish_race` listens to both channels and will pick whichever arrives first. Since the dog takes 5 seconds, it wins against the cat that take 10 seconds. +* You can use `oneshot` channels in this example as the channels are supposed to receive only one `send`. +* You can try adding more contestants to the race and return a leaderboard. Also, you can add a deadline after which contestants get eliminated. + +
diff --git a/src/async/pitfalls.md b/src/async/pitfalls.md new file mode 100644 index 000000000000..6fb8b1e73003 --- /dev/null +++ b/src/async/pitfalls.md @@ -0,0 +1,8 @@ +# Pitfalls of async/await + +Async / await provides convenient and efficient abstraction for concurrent asynchronous programming. However, the async/await model in Rust also comes with its share of pitfalls and footguns. We illustrate some of them in this chapter: + +--- + +- [Blocking the executor](pitfalls/blocking-executor.md) +- [Pin](pitfalls/pin.md) diff --git a/src/async/pitfalls/blocking-executor.md b/src/async/pitfalls/blocking-executor.md new file mode 100644 index 000000000000..8371855854ce --- /dev/null +++ b/src/async/pitfalls/blocking-executor.md @@ -0,0 +1,60 @@ +# Blocking the executor + +Most async runtimes only allow IO tasks to run concurrently. +This means that CPU blocking tasks will block the executor and prevent other tasks from being executed. +An easy workaround is to use async equivalent methods where possible. + +```rust,editable,compile_fail +use futures::future::join_all; +use std::time::Instant; + +// Uncomment to try with `spawn_blocking` around `std::thread::sleep`. +const USE_SPAWN_BLOCKING: bool = false; + +async fn std_sleep_ms(duration_ms: u64) { + if USE_SPAWN_BLOCKING { + tokio::task::spawn_blocking(move || { + std::thread::sleep(std::time::Duration::from_millis(duration_ms)); + }) + .await + .unwrap(); + } else { + std::thread::sleep(std::time::Duration::from_millis(duration_ms)); + } +} + +async fn tokio_sleep_ms(duration_ms: u64) { + tokio::time::sleep(tokio::time::Duration::from_millis(duration_ms)).await; +} + +// Single threaded executor for better reproducibility in runtime. +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +async fn main() { + let std_sleep_futures = (1..=100).map(std_sleep_ms); + let tokio_sleep_futures = (1..=100).map(tokio_sleep_ms); + + let now = Instant::now(); + join_all(std_sleep_futures).await; + assert!(now.elapsed().as_millis() >= 5050); + + let now = Instant::now(); + join_all(tokio_sleep_futures).await; + let runtime = now.elapsed().as_millis(); + assert!((100..150).contains(&runtime)); +} + +``` + +
+ +- Using `std::thread::sleep` blocks the thread, so it prevents the executor from running. It means that while all futures are spawned at the same time, they all run one after the other. The runtime is the sum of all the `sleep` times. Try changing the runtime to `multi_thread` in a multi core environment to see how it impacts the run time. +- A simple fix is to use `tokio::time::sleep`. Now, the `sleep` calls are `async` and they are properly scheduled by the executor. +- Another fix would be to `tokio::task::spawn_blocking` which spawns an actual thread and transforms its handle into a future without blocking the executor. This thread is also scheduled as part of the executor's threadpool to grant better performance. + +- You should not think of tasks as OS threads. They do not map 1 to 1 and most executors will allow many tasks to run on a single OS thread. This creates multiple gotchas: + - For instance, using `std::sync::mutex` in an `async` runtime is very dangerous. When you lock the mutex in a thread then yield the executor using `.await` the thread might try to lock the mutex once more in a different task. Hence, prefer `async` alternatives like `tokio::sync::mutex`. + - Thread-local storage should also be used with care in async contexts as it doesn't map to specific tasks. + - Device drivers sometimes map to specific OS threads (for instance CUDA.) Prefer `tokio::task::spawn_blocking` when dealing with those. + - Some C libraries rely on thread local storage as well. + +
diff --git a/src/async/pitfalls/pin.md b/src/async/pitfalls/pin.md new file mode 100644 index 000000000000..cd99897eb4e3 --- /dev/null +++ b/src/async/pitfalls/pin.md @@ -0,0 +1,108 @@ +# Pin + +When you await a future, you effectively move the whole stack frame from which you called `.await` to an internal data structure of your executor. If your future has pointers to data on the stack, the addresses might get invalidated. This is extremely unsafe. Therefore, you want to guarantee that the addresses your future point to don't change. That is why we need to `pin` futures. In most cases, you won't have to think about it when using futures from common libraries unless you use `select` in a loop (which is a pretty common use case). If, you implement your own future, you will likely run into this issue. + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, PartialEq)] +struct Runner { + name: String, +} + +async fn race_finish_line(mut rcv: Receiver, timeout: Duration) -> Option> { + let mut performances: Vec = Vec::new(); + let timeout_sleep = sleep(timeout); + // Pinning here allows us to await `timeout_sleep` multiple times. + tokio::pin!(timeout_sleep); + + loop { + tokio::select! { + // Rcv.recv() returns a new future every time, hence it does not need to be pinned. + name = rcv.recv() => performances.push(Runner { name: name? }), + _ = timeout_sleep.as_mut() => break + } + } + Some(performances) +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + + let names_and_time = [ + ("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11), + ]; + + let finish_line_future = race_finish_line(receiver, Duration::from_secs(6)); + + for (name, duration_secs) in names_and_time { + let sender = sender.clone(); + tokio::spawn(async move { + sleep(Duration::from_secs(duration_secs)).await; + sender.send(String::from(name)).await.expect("Failed to send runner"); + }); + } + + println!("{:?}", finish_line_future.await.expect("Failed to collect finish line")); + // [Runner { name: "Milo" }, Runner { name: "Oliver" }] +} +``` + + +
+ +* `tokio::pin!` only works on futures that implement `Unpin`. Other futures need to use `box::pin`. +* Another alternative is to not use `tokio::pin!` at all but spawn another task that will send to a `oneshot` channel after the end of the `sleep` call. + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; +use tokio::sync::oneshot; + +#[derive(Debug, PartialEq)] +struct Runner { + name: String, +} + +async fn race_finish_line(mut rcv: Receiver, mut timeout: oneshot::Receiver<()>) -> Option> { + let mut performances: Vec = Vec::new(); + loop { + tokio::select! { + name = rcv.recv() => performances.push(Runner { name: name? }), + _ = &mut timeout => break + } + } + Some(performances) +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + let (os_sender, os_receiver) = oneshot::channel(); + + let names_and_time = [ + ("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11), + ]; + + tokio::spawn(async move { + sleep(Duration::from_secs(5)).await; + os_sender.send(()).expect("Failed to send oneshot."); + }); + let finish_line_future = race_finish_line(receiver, os_receiver); + + for (name, duration_secs) in names_and_time { + let sender = sender.clone(); + tokio::spawn(async move { + sleep(Duration::from_secs(duration_secs)).await; + sender.send(String::from(name)).await.expect("Failed to send runner"); + }); + } + + println!("{:?}", finish_line_future.await.expect("Failed to collect finish line")); + // [Runner { name: "Milo" }, Runner { name: "Oliver" }] +} +``` + +