Skip to content

Async chapter #524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,17 @@
- [Async](async.md)
- [async/await](async/async-await.md)
- [Async Blocks](async/async-blocks.md)
- [Futures](async/futures.md)
- [Runtimes](async/runtimes.md)
- [Tasks](async/tasks.md)
- [Futures](async/futures.md)
- [Runtimes](async/runtimes.md)
- [Tasks](async/tasks.md)ures](async/futures.md)
- [Async Channels](async/channels.md)
- [Futures Control Flow](async/control-flow.md)
- [Daemon](async/control-flow/daemon.md)
- [Join](async/control-flow/join_all.md)
- [Select](async/control-flow/select.md)
- [Pitfalls](async/pitfalls.md)
- [Blocking the executor](async/pitfalls/blocking-executor.md)
- [Pin](async/pitfalls/pin.md)
- [Exercises](exercises/day-4/async.md)

# Final Words
Expand Down
36 changes: 36 additions & 0 deletions src/async/channels.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Async Channels

Multiple Channels crates have support for `async`/`await`. For instance `tokio` channels:

```rust,editable,compile_fail
use tokio::sync::mpsc::{self, Receiver};

async fn ping_handler(mut input: Receiver<()>) {
let mut count: usize = 0;

while let Some(_) = input.recv().await {
count += 1;
println!("Received {count} pings so far.");
}
}

#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(32);
let ping_handler_task = tokio::spawn(ping_handler(receiver));
for _ in 0..10 {
sender.send(()).await.expect("Failed to send ping.");
}

std::mem::drop(sender);
ping_handler_task.await.expect("Something went wrong in ping handler task.");
}
```

<details>

- Overall, the interface is similar to the `sync` channels as seen in the [morning class](concurrency/channels.md).
- The `Flume` crate has channels that implement both `sync` and `async` `send` and `recv`. This can be convenient for complex application with both IO and heavy CPU processing tasks.
- What makes working with `async` channels preferable is the ability to combine them with other `future`s to combine them and create complex control flow.

</details>
9 changes: 9 additions & 0 deletions src/async/control-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Futures Control Flow

Futures can be combined together to produce concurrent compute flow graphs. We will cover multiple common operations:

----

- [Daemon](control-flow/daemon.md)
- [Join](control-flow/join_all.md)
- [Select](control-flow/select.md)
33 changes: 33 additions & 0 deletions src/async/control-flow/daemon.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Daemon

Tasks can be spawned without having to be awaited. They will be scheduled like any other tasks by the executor but won't block any running task. This can be useful for tasks that function like actors, receiving messages and sending messages to other tasks through channels. It can also be useful to log metrics or ping system's health.

```rust,editable,compile_fail

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
let seconds_since_beginning = Arc::new(AtomicUsize::from(0));
let counter = Arc::clone(&seconds_since_beginning);
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
});

sleep(Duration::from_millis(4500)).await;
assert_eq!(seconds_since_beginning.load(Ordering::Relaxed), 4);
}


```

<details>

* It is good practice to make your deamons exit because some other blocking task might depend on them. Which would prevent your main thread from ever closing. You can use a `oneshot` channel to signal the task to terminate. You can also use the `ctrl+c` signal handler from `tokio` as an interrupt signal.

</details>
48 changes: 48 additions & 0 deletions src/async/control-flow/join_all.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# join_all

Futures can be combined together to produce concurrent compute flow graphs.

## Run a group of futures concurrently until they all resolve: `join_all`

### Equivalents:

- JS: `Promise.all`
- Python: `asyncio.gather`

```rust,editable,compile_fail
use anyhow::Result;
use futures::future;
use reqwest;
use std::collections::HashMap;

async fn size_of_page(url: &str) -> Result<usize> {
let resp = reqwest::get(url).await?;
Ok(resp.text().await?.len())
}

#[tokio::main]
async fn main() {
let urls: [&str; 4] = [
"https://google.com",
"https://httpbin.org/ip",
"https://play.rust-lang.org/",
"BAD_URL",
];
let futures_iter = urls.into_iter().map(size_of_page);
let results = future::join_all(futures_iter).await;
let page_sizes_dict: HashMap<&str, Result<usize>> =
urls.into_iter().zip(results.into_iter()).collect();
println!("{:?}", page_sizes_dict);
}
```

<details>

* `join_all` should soon be stabilized as part of the standard library in `std::future`.
* For multiple futures of disjoint types, you can use `join!` but you must know how many futures you will have at compile time.
* You can also combine `join_all` with `join!` for instance to join all requests to an http service as well as a database query.
* The risk of `join` is that one of the future could never resolve, this would cause your program to stall.
* Try adding a timeout to the future.

</details>

63 changes: 63 additions & 0 deletions src/async/control-flow/select.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Select

## Run multiple futures concurrently until the first one resolves

### Equivalents:

- JS: `Promise.race`
- Python: `asyncio.new_event_loop().run_until_complete(asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED))`

```rust,editable,compile_fail
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::{sleep, Duration};

#[derive(Debug, PartialEq)]
enum Animal {
Cat { name: String },
Dog { name: String },
}

async fn first_animal_to_finish_race(
mut cat_rcv: Receiver<String>,
mut dog_rcv: Receiver<String>,
) -> Option<Animal> {
tokio::select! {
cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }),
dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? })
}
}

#[tokio::main]
async fn main() {
let (cat_sender, cat_receiver) = mpsc::channel(32);
let (dog_sender, dog_receiver) = mpsc::channel(32);
tokio::spawn(async move {
sleep(Duration::from_secs(10)).await;
cat_sender
.send(String::from("Felix"))
.await
.expect("Failed to send cat.");
});
tokio::spawn(async move {
sleep(Duration::from_secs(5)).await;
dog_sender
.send(String::from("Rex"))
.await
.expect("Failed to send cat.");
});

let winner = first_animal_to_finish_race(cat_receiver, dog_receiver)
.await
.expect("Failed to receive winner");

assert_eq!(winner, Animal::Dog {name: String::from("Rex")});
}
```

<details>

* In this example, we have a race between a cat and a dog. `first_animal_to_finish_race` listens to both channels and will pick whichever arrives first. Since the dog takes 5 seconds, it wins against the cat that take 10 seconds.
* You can use `oneshot` channels in this example as the channels are supposed to receive only one `send`.
* You can try adding more contestants to the race and return a leaderboard. Also, you can add a deadline after which contestants get eliminated.

</details>
8 changes: 8 additions & 0 deletions src/async/pitfalls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Pitfalls of async/await

Async / await provides convenient and efficient abstraction for concurrent asynchronous programming. However, the async/await model in Rust also comes with its share of pitfalls and footguns. We illustrate some of them in this chapter:

---

- [Blocking the executor](pitfalls/blocking-executor.md)
- [Pin](pitfalls/pin.md)
60 changes: 60 additions & 0 deletions src/async/pitfalls/blocking-executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Blocking the executor

Most async runtimes only allow IO tasks to run concurrently.
This means that CPU blocking tasks will block the executor and prevent other tasks from being executed.
An easy workaround is to use async equivalent methods where possible.

```rust,editable,compile_fail
use futures::future::join_all;
use std::time::Instant;

// Uncomment to try with `spawn_blocking` around `std::thread::sleep`.
const USE_SPAWN_BLOCKING: bool = false;

async fn std_sleep_ms(duration_ms: u64) {
if USE_SPAWN_BLOCKING {
tokio::task::spawn_blocking(move || {
std::thread::sleep(std::time::Duration::from_millis(duration_ms));
})
.await
.unwrap();
} else {
std::thread::sleep(std::time::Duration::from_millis(duration_ms));
}
}

async fn tokio_sleep_ms(duration_ms: u64) {
tokio::time::sleep(tokio::time::Duration::from_millis(duration_ms)).await;
}

// Single threaded executor for better reproducibility in runtime.
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() {
let std_sleep_futures = (1..=100).map(std_sleep_ms);
let tokio_sleep_futures = (1..=100).map(tokio_sleep_ms);

let now = Instant::now();
join_all(std_sleep_futures).await;
assert!(now.elapsed().as_millis() >= 5050);

let now = Instant::now();
join_all(tokio_sleep_futures).await;
let runtime = now.elapsed().as_millis();
assert!((100..150).contains(&runtime));
}

```

<details>

- Using `std::thread::sleep` blocks the thread, so it prevents the executor from running. It means that while all futures are spawned at the same time, they all run one after the other. The runtime is the sum of all the `sleep` times. Try changing the runtime to `multi_thread` in a multi core environment to see how it impacts the run time.
- A simple fix is to use `tokio::time::sleep`. Now, the `sleep` calls are `async` and they are properly scheduled by the executor.
- Another fix would be to `tokio::task::spawn_blocking` which spawns an actual thread and transforms its handle into a future without blocking the executor. This thread is also scheduled as part of the executor's threadpool to grant better performance.

- You should not think of tasks as OS threads. They do not map 1 to 1 and most executors will allow many tasks to run on a single OS thread. This creates multiple gotchas:
- For instance, using `std::sync::mutex` in an `async` runtime is very dangerous. When you lock the mutex in a thread then yield the executor using `.await` the thread might try to lock the mutex once more in a different task. Hence, prefer `async` alternatives like `tokio::sync::mutex`.
- Thread-local storage should also be used with care in async contexts as it doesn't map to specific tasks.
- Device drivers sometimes map to specific OS threads (for instance CUDA.) Prefer `tokio::task::spawn_blocking` when dealing with those.
- Some C libraries rely on thread local storage as well.

</details>
108 changes: 108 additions & 0 deletions src/async/pitfalls/pin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Pin

When you await a future, you effectively move the whole stack frame from which you called `.await` to an internal data structure of your executor. If your future has pointers to data on the stack, the addresses might get invalidated. This is extremely unsafe. Therefore, you want to guarantee that the addresses your future point to don't change. That is why we need to `pin` futures. In most cases, you won't have to think about it when using futures from common libraries unless you use `select` in a loop (which is a pretty common use case). If, you implement your own future, you will likely run into this issue.

```rust,editable,compile_fail
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::{sleep, Duration};

#[derive(Debug, PartialEq)]
struct Runner {
name: String,
}

async fn race_finish_line(mut rcv: Receiver<String>, timeout: Duration) -> Option<Vec<Runner>> {
let mut performances: Vec<Runner> = Vec::new();
let timeout_sleep = sleep(timeout);
// Pinning here allows us to await `timeout_sleep` multiple times.
tokio::pin!(timeout_sleep);

loop {
tokio::select! {
// Rcv.recv() returns a new future every time, hence it does not need to be pinned.
name = rcv.recv() => performances.push(Runner { name: name? }),
_ = timeout_sleep.as_mut() => break
}
}
Some(performances)
}

#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(32);

let names_and_time = [
("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11),
];

let finish_line_future = race_finish_line(receiver, Duration::from_secs(6));

for (name, duration_secs) in names_and_time {
let sender = sender.clone();
tokio::spawn(async move {
sleep(Duration::from_secs(duration_secs)).await;
sender.send(String::from(name)).await.expect("Failed to send runner");
});
}

println!("{:?}", finish_line_future.await.expect("Failed to collect finish line"));
// [Runner { name: "Milo" }, Runner { name: "Oliver" }]
}
```


<details>

* `tokio::pin!` only works on futures that implement `Unpin`. Other futures need to use `box::pin`.
* Another alternative is to not use `tokio::pin!` at all but spawn another task that will send to a `oneshot` channel after the end of the `sleep` call.

```rust,editable,compile_fail
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::{sleep, Duration};
use tokio::sync::oneshot;

#[derive(Debug, PartialEq)]
struct Runner {
name: String,
}

async fn race_finish_line(mut rcv: Receiver<String>, mut timeout: oneshot::Receiver<()>) -> Option<Vec<Runner>> {
let mut performances: Vec<Runner> = Vec::new();
loop {
tokio::select! {
name = rcv.recv() => performances.push(Runner { name: name? }),
_ = &mut timeout => break
}
}
Some(performances)
}

#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(32);
let (os_sender, os_receiver) = oneshot::channel();

let names_and_time = [
("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11),
];

tokio::spawn(async move {
sleep(Duration::from_secs(5)).await;
os_sender.send(()).expect("Failed to send oneshot.");
});
let finish_line_future = race_finish_line(receiver, os_receiver);

for (name, duration_secs) in names_and_time {
let sender = sender.clone();
tokio::spawn(async move {
sleep(Duration::from_secs(duration_secs)).await;
sender.send(String::from(name)).await.expect("Failed to send runner");
});
}

println!("{:?}", finish_line_future.await.expect("Failed to collect finish line"));
// [Runner { name: "Milo" }, Runner { name: "Oliver" }]
}
```

</details>