Skip to content

Commit 917caad

Browse files
authored
ex: Add an example of an executor with limited tasks
Signed-off-by: John Nunley <[email protected]>
1 parent 2cfb6e4 commit 917caad

File tree

2 files changed

+169
-0
lines changed

2 files changed

+169
-0
lines changed

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ async-channel = "1.4.1"
2727
async-io = "1.1.9"
2828
criterion = { version = "0.4.0", default-features = false, features = ["cargo_bench_support"] }
2929
easy-parallel = "3.1.0"
30+
event-listener = "3.0.0"
31+
fastrand = "2.0.0"
3032
once_cell = "1.16.0"
3133

3234
[[bench]]

examples/limit.rs

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
//! An executor where you can only push a limited number of tasks.
2+
3+
use async_executor::{Executor, Task};
4+
use event_listener::{Event, EventListener};
5+
use futures_lite::pin;
6+
use std::{
7+
future::Future,
8+
sync::{
9+
atomic::{AtomicUsize, Ordering},
10+
Arc,
11+
},
12+
time::Duration,
13+
};
14+
15+
/// An executor where you can only push a limited number of tasks.
16+
struct LimitedExecutor {
17+
/// Inner running executor.
18+
executor: Executor<'static>,
19+
20+
/// Shared state.
21+
shared: Arc<SharedState>,
22+
}
23+
24+
struct SharedState {
25+
/// The maximum number of tasks that can be pushed.
26+
max: usize,
27+
28+
/// The current number of active tasks.
29+
active: AtomicUsize,
30+
31+
/// Event listeners for when a new task is available.
32+
slot_available: Event,
33+
}
34+
35+
impl LimitedExecutor {
36+
fn new(max: usize) -> Self {
37+
Self {
38+
executor: Executor::new(),
39+
shared: Arc::new(SharedState {
40+
max,
41+
active: AtomicUsize::new(0),
42+
slot_available: Event::new(),
43+
}),
44+
}
45+
}
46+
47+
/// Spawn a task, waiting until there is a slot available.
48+
async fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output>
49+
where
50+
F::Output: Send + 'static,
51+
{
52+
let listener = EventListener::new(&self.shared.slot_available);
53+
pin!(listener);
54+
55+
// Load the current number of active tasks.
56+
let mut active = self.shared.active.load(Ordering::Acquire);
57+
58+
loop {
59+
// Check if there is a slot available.
60+
if active < self.shared.max {
61+
// Try to set the slot to what would be the new number of tasks.
62+
let new_active = active + 1;
63+
match self.shared.active.compare_exchange(
64+
active,
65+
new_active,
66+
Ordering::SeqCst,
67+
Ordering::SeqCst,
68+
) {
69+
Ok(_) => {
70+
// Wrap the future in another future that decrements the active count
71+
// when it's done.
72+
let future = {
73+
let shared = self.shared.clone();
74+
async move {
75+
struct DecOnDrop(Arc<SharedState>);
76+
77+
impl Drop for DecOnDrop {
78+
fn drop(&mut self) {
79+
// Decrement the count and notify someone.
80+
self.0.active.fetch_sub(1, Ordering::SeqCst);
81+
self.0.slot_available.notify(usize::MAX);
82+
}
83+
}
84+
85+
let _dec = DecOnDrop(shared);
86+
future.await
87+
}
88+
};
89+
90+
// Wake up another waiter, in case there is one.
91+
self.shared.slot_available.notify(1);
92+
93+
// Spawn the task.
94+
return self.executor.spawn(future);
95+
}
96+
97+
Err(actual) => {
98+
// Try again.
99+
active = actual;
100+
}
101+
}
102+
} else {
103+
// Start waiting for a slot to become available.
104+
if listener.as_ref().is_listening() {
105+
listener.as_mut().await;
106+
} else {
107+
listener.as_mut().listen();
108+
}
109+
110+
active = self.shared.active.load(Ordering::Acquire);
111+
}
112+
}
113+
}
114+
115+
/// Run a future to completion.
116+
async fn run<F: Future>(&self, future: F) -> F::Output {
117+
self.executor.run(future).await
118+
}
119+
}
120+
121+
fn main() {
122+
futures_lite::future::block_on(async {
123+
let ex = Arc::new(LimitedExecutor::new(10));
124+
ex.run({
125+
let ex = ex.clone();
126+
async move {
127+
// Spawn a bunch of tasks that wait for a while.
128+
for i in 0..15 {
129+
ex.spawn(async move {
130+
async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await;
131+
println!("Waiting task #{i} finished!");
132+
})
133+
.await
134+
.detach();
135+
}
136+
137+
let (start_tx, start_rx) = async_channel::bounded::<()>(1);
138+
let mut current_rx = start_rx;
139+
140+
// Send the first message.
141+
start_tx.send(()).await.unwrap();
142+
143+
// Spawn a bunch of channel tasks that wake eachother up.
144+
for i in 0..25 {
145+
let (next_tx, next_rx) = async_channel::bounded::<()>(1);
146+
147+
ex.spawn(async move {
148+
current_rx.recv().await.unwrap();
149+
println!("Channel task {i} woken up!");
150+
next_tx.send(()).await.unwrap();
151+
println!("Channel task {i} finished!");
152+
})
153+
.await
154+
.detach();
155+
156+
current_rx = next_rx;
157+
}
158+
159+
// Wait for the last task to finish.
160+
current_rx.recv().await.unwrap();
161+
162+
println!("All tasks finished!");
163+
}
164+
})
165+
.await;
166+
});
167+
}

0 commit comments

Comments
 (0)