diff --git a/Cargo.toml b/Cargo.toml index 2c58175..19cd23f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-executor" -version = "1.4.0" +version = "1.4.1" authors = ["Stjepan Glavina "] edition = "2018" description = "Async executor" @@ -17,10 +17,25 @@ concurrent-queue = "1.2.2" fastrand = "1.3.4" futures-lite = "1.11.0" once_cell = "1.4.1" -vec-arena = "1.0.0" +parking_lot = "0.11.1" +slab = "0.4.2" +cache-padded="1" +crossbeam-deque="0.8" [dev-dependencies] +async-oneshot="0.5" async-channel = "1.4.1" async-io = "1.1.9" easy-parallel = "3.1.0" -num_cpus = "1.13.0" +scopeguard="1.1" +criterion = "0.3" +num_cpus="1" + +[profile.release] +incremental = true +debug = 2 # Set this to 1 or 2 to get more useful backtraces in debugger. +lto = 'thin' + +[[bench]] +name = "my_benchmark" +harness = false \ No newline at end of file diff --git a/benches/executor.rs b/benches/executor.rs index 98f1cb5..4d46c21 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -1,109 +1,165 @@ -#![feature(test)] +// #![feature(test)] -extern crate test; +// extern crate test; -use std::future::Future; +// use std::future::Future; -use async_executor::Executor; -use futures_lite::{future, prelude::*}; +// use async_executor::{Executor, Task}; +// use futures_lite::{future, prelude::*}; -const TASKS: usize = 300; -const STEPS: usize = 300; -const LIGHT_TASKS: usize = 25_000; +// const TASKS: usize = 300; +// const STEPS: usize = 300; +// const LIGHT_TASKS: usize = 25_000; -static EX: Executor<'_> = Executor::new(); +// static EX: Executor<'_> = Executor::new(); -fn run(f: impl FnOnce()) { - let (s, r) = async_channel::bounded::<()>(1); - easy_parallel::Parallel::new() - .each(0..num_cpus::get(), |_| future::block_on(EX.run(r.recv()))) - .finish(move || { - let _s = s; - f() - }); -} +// fn run(f: impl FnOnce()) { +// let (s, r) = async_channel::bounded::<()>(1); +// easy_parallel::Parallel::new() +// .each(0..num_cpus::get(), |_| future::block_on(EX.run(r.recv()))) +// .finish(move || { +// let _s = s; +// f() +// }); +// } -#[bench] -fn create(b: &mut test::Bencher) { - b.iter(move || { - let ex = Executor::new(); - let task = ex.spawn(async {}); - future::block_on(ex.run(task)); - }); -} +// #[bench] +// fn create(b: &mut test::Bencher) { +// b.iter(move || { +// let ex = Executor::new(); +// let task = ex.spawn(async {}); +// future::block_on(ex.run(task)); +// }); +// } -#[bench] -fn spawn_one(b: &mut test::Bencher) { - run(|| { - b.iter(move || { - future::block_on(async { EX.spawn(async {}).await }); - }); - }); -} +// #[bench] +// fn spawn_one(b: &mut test::Bencher) { +// run(|| { +// b.iter(move || { +// future::block_on(async { EX.spawn(async {}).await }); +// }); +// }); +// } -#[bench] -fn spawn_many(b: &mut test::Bencher) { - run(|| { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..LIGHT_TASKS { - tasks.push(EX.spawn(async {})); - } - for task in tasks { - task.await; - } - }); - }); - }); -} +// #[bench] +// fn spawn_many(b: &mut test::Bencher) { +// run(|| { +// b.iter(move || { +// future::block_on(async { +// let mut tasks = Vec::new(); +// for _ in 0..LIGHT_TASKS { +// tasks.push(EX.spawn(async {})); +// } +// for task in tasks { +// task.await; +// } +// }); +// }); +// }); +// } -#[bench] -fn spawn_recursively(b: &mut test::Bencher) { - fn go(i: usize) -> impl Future + Send + 'static { - async move { - if i != 0 { - EX.spawn(async move { - let fut = go(i - 1).boxed(); - fut.await; - }) - .await; - } - } - } +// #[bench] +// fn spawn_recursively(b: &mut test::Bencher) { +// fn go(i: usize) -> impl Future + Send + 'static { +// async move { +// if i != 0 { +// EX.spawn(async move { +// let fut = go(i - 1).boxed(); +// fut.await; +// }) +// .await; +// } +// } +// } - run(|| { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(go(STEPS))); - } - for task in tasks { - task.await; - } - }); - }); - }); -} +// run(|| { +// b.iter(move || { +// future::block_on(async { +// let mut tasks = Vec::new(); +// for _ in 0..TASKS { +// tasks.push(EX.spawn(go(STEPS))); +// } +// for task in tasks { +// task.await; +// } +// }); +// }); +// }); +// } -#[bench] -fn yield_now(b: &mut test::Bencher) { - run(|| { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(async move { - for _ in 0..STEPS { - future::yield_now().await; - } - })); - } - for task in tasks { - task.await; - } - }); - }); - }); -} +// #[bench] +// fn yield_now(b: &mut test::Bencher) { +// run(|| { +// b.iter(move || { +// future::block_on(async { +// let mut tasks = Vec::new(); +// for _ in 0..TASKS { +// tasks.push(EX.spawn(async move { +// for _ in 0..STEPS { +// future::yield_now().await; +// } +// })); +// } +// for task in tasks { +// task.await; +// } +// }); +// }); +// }); +// } + +// #[bench] +// fn context_switch_quiet(b: &mut test::Bencher) { +// let (send, mut recv) = async_channel::bounded::(1); +// let mut tasks: Vec>> = vec![]; +// for _ in 0..TASKS { +// let old_recv = recv.clone(); +// let (new_send, new_recv) = async_channel::bounded(1); +// tasks.push(EX.spawn(async move { +// loop { +// new_send.send(old_recv.recv().await.ok()?).await.ok()? +// } +// })); +// recv = new_recv; +// } +// run(|| { +// b.iter(move || { +// future::block_on(async { +// send.send(1).await.unwrap(); +// recv.recv().await.unwrap(); +// }); +// }); +// }); +// } + +// #[bench] +// fn context_switch_busy(b: &mut test::Bencher) { +// let (send, mut recv) = async_channel::bounded::(1); +// let mut tasks: Vec>> = vec![]; +// for num in 0..TASKS { +// let old_recv = recv.clone(); +// let (new_send, new_recv) = async_channel::bounded(1); +// tasks.push(EX.spawn(async move { +// loop { +// // eprintln!("forward {}", num); +// new_send.send(old_recv.recv().await.ok()?).await.ok()?; +// } +// })); +// recv = new_recv; +// } +// for _ in 0..TASKS { +// tasks.push(EX.spawn(async move { +// loop { +// future::yield_now().await; +// } +// })) +// } +// run(|| { +// b.iter(move || { +// future::block_on(async { +// send.send(1).await.unwrap(); +// recv.recv().await.unwrap(); +// }); +// }); +// }); +// } diff --git a/benches/my_benchmark.rs b/benches/my_benchmark.rs new file mode 100644 index 0000000..c9e3d09 --- /dev/null +++ b/benches/my_benchmark.rs @@ -0,0 +1,195 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use std::future::Future; + +use async_executor::{Executor, Task}; +use futures_lite::{future, prelude::*}; + +const TASKS: usize = 300; +const STEPS: usize = 300; +const LIGHT_TASKS: usize = 25_000; + +static EX: Executor<'_> = Executor::new(); + +fn run(f: impl FnOnce()) { + let (s, r) = async_channel::bounded::<()>(1); + easy_parallel::Parallel::new() + .each(0..num_cpus::get(), |_| future::block_on(EX.run(r.recv()))) + .finish(move || { + let _s = s; + f() + }); +} + +fn create(b: &mut criterion::Bencher) { + b.iter(move || { + let ex = Executor::new(); + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }); +} + +fn spawn_one(b: &mut criterion::Bencher) { + run(|| { + b.iter(move || { + future::block_on(async { EX.spawn(async {}).await }); + }); + }); +} + +fn spawn_many(b: &mut criterion::Bencher) { + run(|| { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }); +} + +fn spawn_executors_recursively(b: &mut criterion::Bencher) { + fn go(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + let exec = async_executor::Executor::new(); + let task = exec.spawn(async move { + let fut = go(i - 1).boxed(); + fut.await; + }); + exec.run(task).await + } + } + } + + run(|| { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(go(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }); +} + +fn yield_now(b: &mut criterion::Bencher) { + run(|| { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }); +} + +fn ping_pong(b: &mut criterion::Bencher) { + const NUM_PINGS: usize = 1_000; + + let (send, recv) = async_channel::bounded::>(10); + let _task: Task> = EX.spawn(async move { + loop { + let mut os = recv.recv().await.ok()?; + os.send(0u8).ok()?; + } + }); + run(|| { + b.iter(move || { + future::block_on(async { + for i in 0..NUM_PINGS { + let (os_send, os_recv) = async_oneshot::oneshot(); + send.send(os_send).await.unwrap(); + os_recv.await.unwrap(); + } + }); + }); + }); +} + +fn context_switch_quiet(b: &mut criterion::Bencher) { + let (send, mut recv) = async_channel::bounded::(1); + let mut tasks: Vec>> = vec![]; + for _ in 0..TASKS { + let old_recv = recv.clone(); + let (new_send, new_recv) = async_channel::bounded(1); + tasks.push(EX.spawn(async move { + loop { + new_send.send(old_recv.recv().await.ok()?).await.ok()? + } + })); + recv = new_recv; + } + run(|| { + b.iter(move || { + future::block_on(async { + send.send(1).await.unwrap(); + recv.recv().await.unwrap(); + }); + }); + }); +} + +fn context_switch_busy(b: &mut criterion::Bencher) { + let (send, mut recv) = async_channel::bounded::(1); + let mut tasks: Vec>> = vec![]; + for num in 0..TASKS / 10 { + let old_recv = recv.clone(); + let (new_send, new_recv) = async_channel::bounded(1); + tasks.push(EX.spawn(async move { + loop { + // eprintln!("forward {}", num); + new_send.send(old_recv.recv().await.ok()?).await.ok()?; + } + })); + recv = new_recv; + } + for _ in 0..TASKS { + tasks.push(EX.spawn(async move { + loop { + future::yield_now().await; + } + })) + } + run(|| { + b.iter(move || { + future::block_on(async { + for _ in 0..10 { + send.send(1).await.unwrap(); + recv.recv().await.unwrap(); + } + }); + }); + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("create", create); + c.bench_function("spawn_one", spawn_one); + c.bench_function("spawn_many", spawn_many); + c.bench_function("yield_now", yield_now); + c.bench_function("ping_pong", ping_pong); + c.bench_function("spawn_executors_recursively", spawn_executors_recursively); + c.bench_function("context_switch_quiet", context_switch_quiet); + c.bench_function("context_switch_busy", context_switch_busy); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index a43a498..f6559de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,18 +20,25 @@ #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] -use std::future::Future; +mod taskqueue; use std::marker::PhantomData; -use std::panic::{RefUnwindSafe, UnwindSafe}; use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::Arc; use std::task::{Poll, Waker}; +use std::{ + cell::Cell, + panic::{RefUnwindSafe, UnwindSafe}, +}; +use std::{cell::RefCell, future::Future}; use async_task::Runnable; -use concurrent_queue::ConcurrentQueue; + +use cache_padded::CachePadded; use futures_lite::{future, prelude::*}; -use vec_arena::Arena; +use parking_lot::{Mutex, RwLock}; +use slab::Slab; +use taskqueue::{GlobalQueue, LocalQueue, LocalQueueHandle}; #[doc(no_inline)] pub use async_task::Task; @@ -111,7 +118,7 @@ impl<'a> Executor<'a> { /// assert!(ex.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.state().active.lock().unwrap().is_empty() + self.state().active.lock().is_empty() } /// Spawns a task onto the executor. @@ -128,13 +135,19 @@ impl<'a> Executor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { - let mut active = self.state().active.lock().unwrap(); + let mut active = self.state().active.lock(); // Remove the task from the set of active tasks when the future finishes. - let index = active.next_vacant(); + let index = active.vacant_entry().key(); let state = self.state().clone(); let future = async move { - let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); + let _guard = CallOnDrop(move || { + // TODO: use try_remove once https://github.com/tokio-rs/slab/pull/89 merged + let mut active = state.active.lock(); + if active.contains(index) { + drop(active.remove(index)); + } + }); future.await }; @@ -165,8 +178,8 @@ impl<'a> Executor<'a> { /// ``` pub fn try_tick(&self) -> bool { match self.state().queue.pop() { - Err(_) => false, - Ok(runnable) => { + None => false, + Some(runnable) => { // Notify another ticker now to pick up where this ticker left off, just in case // running the task takes a long time. self.state().notify(); @@ -198,7 +211,7 @@ impl<'a> Executor<'a> { /// future::block_on(ex.tick()); // runs the task /// ``` pub async fn tick(&self) { - let state = self.state(); + let state = self.state().clone(); let runnable = Ticker::new(state).runnable().await; runnable.run(); } @@ -219,14 +232,16 @@ impl<'a> Executor<'a> { /// assert_eq!(res, 6); /// ``` pub async fn run(&self, future: impl Future) -> T { - let runner = Runner::new(self.state()); - + let mut runner = Runner::new(self.state().clone()); + runner.set_tls_active(); + let _guard = CallOnDrop(clear_tls); // A future that runs tasks forever. let run_forever = async { loop { for _ in 0..200 { let runnable = runner.runnable().await; - runnable.run(); + let yielded = runnable.run(); + JUST_YIELDED.with(|v| v.replace(yielded)); } future::yield_now().await; } @@ -240,10 +255,12 @@ impl<'a> Executor<'a> { fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { let state = self.state().clone(); - // TODO(stjepang): If possible, push into the current local queue and notify the ticker. + // Try to push to the local queue. If it doesn't work, push to the global queue. move |runnable| { - state.queue.push(runnable).unwrap(); - state.notify(); + if let Err(runnable) = try_push_tls(&state, runnable) { + state.queue.push(runnable); + state.notify(); + } } } @@ -253,18 +270,20 @@ impl<'a> Executor<'a> { } } +thread_local! { + static JUST_YIELDED: Cell = Cell::new(false); +} + impl Drop for Executor<'_> { fn drop(&mut self) { if let Some(state) = self.state.get() { - let mut active = state.active.lock().unwrap(); - for i in 0..active.capacity() { - if let Some(w) = active.remove(i) { - w.wake(); - } + let mut active = state.active.lock(); + for w in active.drain() { + w.wake(); } drop(active); - while state.queue.pop().is_ok() {} + while state.queue.pop().is_some() {} } } } @@ -356,13 +375,13 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.inner().state().active.lock().unwrap(); + let mut active = self.inner().state().active.lock(); // Remove the task from the set of active tasks when the future finishes. - let index = active.next_vacant(); + let index = active.vacant_entry().key(); let state = self.inner().state().clone(); let future = async move { - let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); + let _guard = CallOnDrop(move || drop(state.active.lock().remove(index))); future.await }; @@ -440,16 +459,15 @@ impl<'a> LocalExecutor<'a> { /// Returns a function that schedules a runnable task when it gets woken up. fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { let state = self.inner().state().clone(); - move |runnable| { - state.queue.push(runnable).unwrap(); + state.queue.push(runnable); state.notify(); } } /// Returns a reference to the inner executor. fn inner(&self) -> &Executor<'a> { - self.inner.get_or_init(|| Executor::new()) + self.inner.get_or_init(Executor::new) } } @@ -463,34 +481,39 @@ impl<'a> Default for LocalExecutor<'a> { #[derive(Debug)] struct State { /// The global queue. - queue: ConcurrentQueue, + queue: CachePadded, + + /// Count of searching runners. + searching_count: CachePadded, /// Local queues created by runners. - local_queues: RwLock>>>, + local_queues: CachePadded>>, /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. - notified: AtomicBool, + notified: CachePadded, /// A list of sleeping tickers. - sleepers: Mutex, + sleepers: CachePadded>, /// Currently active tasks. - active: Mutex>, + active: CachePadded>>, } impl State { /// Creates state for a new executor. fn new() -> State { State { - queue: ConcurrentQueue::unbounded(), - local_queues: RwLock::new(Vec::new()), - notified: AtomicBool::new(true), - sleepers: Mutex::new(Sleepers { + queue: GlobalQueue::default().into(), + searching_count: AtomicUsize::new(0).into(), + local_queues: RwLock::new(Slab::new()).into(), + notified: AtomicBool::new(true).into(), + sleepers: parking_lot::Mutex::new(Sleepers { count: 0, wakers: Vec::new(), free_ids: Vec::new(), - }), - active: Mutex::new(Arena::new()), + }) + .into(), + active: Mutex::new(Slab::new()).into(), } } @@ -502,7 +525,7 @@ impl State { .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_ok() { - let waker = self.sleepers.lock().unwrap().notify(); + let waker = self.sleepers.lock().notify(); if let Some(w) = waker { w.wake(); } @@ -557,17 +580,17 @@ impl Sleepers { /// Removes a previously inserted sleeping ticker. /// /// Returns `true` if the ticker was notified. - fn remove(&mut self, id: usize) -> bool { + fn remove(&mut self, id: usize) -> Option { self.count -= 1; self.free_ids.push(id); for i in (0..self.wakers.len()).rev() { if self.wakers[i].0 == id { - self.wakers.remove(i); - return false; + let (_, waker) = self.wakers.remove(i); + return Some(waker); } } - true + None } /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. @@ -589,9 +612,9 @@ impl Sleepers { /// Runs task one by one. #[derive(Debug)] -struct Ticker<'a> { +struct Ticker { /// The executor state. - state: &'a State, + state: Arc, /// Set to a non-zero sleeper ID when in sleeping state. /// @@ -602,9 +625,9 @@ struct Ticker<'a> { sleeping: AtomicUsize, } -impl Ticker<'_> { +impl Ticker { /// Creates a ticker. - fn new(state: &State) -> Ticker<'_> { + fn new(state: Arc) -> Ticker { Ticker { state, sleeping: AtomicUsize::new(0), @@ -615,7 +638,7 @@ impl Ticker<'_> { /// /// Returns `false` if the ticker was already sleeping and unnotified. fn sleep(&self, waker: &Waker) -> bool { - let mut sleepers = self.state.sleepers.lock().unwrap(); + let mut sleepers = self.state.sleepers.lock(); match self.sleeping.load(Ordering::SeqCst) { // Move to sleeping state. @@ -639,28 +662,36 @@ impl Ticker<'_> { } /// Moves the ticker into woken state. - fn wake(&self) { + fn wake(&self) -> Option { let id = self.sleeping.swap(0, Ordering::SeqCst); if id != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); - sleepers.remove(id); + let mut sleepers = self.state.sleepers.lock(); + let toret = sleepers.remove(id); self.state .notified .swap(sleepers.is_notified(), Ordering::SeqCst); + toret + } else { + None } } /// Waits for the next runnable task to run. async fn runnable(&self) -> Runnable { - self.runnable_with(|| self.state.queue.pop().ok()).await + self.runnable_with(|| self.state.queue.pop()).await } /// Waits for the next runnable task to run, given a function that searches for a task. async fn runnable_with(&self, mut search: impl FnMut() -> Option) -> Runnable { future::poll_fn(|cx| { loop { - match search() { + // This kills performance somehow + // DEBUG_SEARCHING_COUNT.fetch_add(1, Ordering::Relaxed); + let res = search(); + // DEBUG_SEARCHING_COUNT.fetch_sub(1, Ordering::Relaxed); + + match res { None => { // Move to sleeping and unnotified state. if !self.sleep(cx.waker()) { @@ -671,11 +702,12 @@ impl Ticker<'_> { Some(r) => { // Wake up. self.wake(); - - // Notify another ticker now to pick up where this ticker left off, just in - // case running the task takes a long time. - self.state.notify(); - + // Sibling notification. + if self.state.searching_count.load(Ordering::Relaxed) == 0 + // && fastrand::u8(0..) < 5 + { + self.state.notify(); + } return Poll::Ready(r); } } @@ -685,13 +717,13 @@ impl Ticker<'_> { } } -impl Drop for Ticker<'_> { +impl Drop for Ticker { fn drop(&mut self) { // If this ticker is in sleeping state, it must be removed from the sleepers list. let id = self.sleeping.swap(0, Ordering::SeqCst); if id != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); - let notified = sleepers.remove(id); + let mut sleepers = self.state.sleepers.lock(); + let notified = sleepers.remove(id).is_none(); self.state .notified @@ -700,65 +732,160 @@ impl Drop for Ticker<'_> { // If this ticker was notified, then notify another ticker. if notified { drop(sleepers); + // eprintln!("TICKER DROP"); self.state.notify(); } } } } +struct TlsData { + state: Arc, + ticker: Arc, + pending_tasks: Vec, +} + +impl Drop for TlsData { + fn drop(&mut self) { + // move the pending tasks into the state + for task in self.pending_tasks.drain(0..) { + self.state.queue.push(task) + } + } +} + +thread_local! { + static TLS: RefCell> = Default::default() +} + +fn clear_tls() { + TLS.with(|v| *v.borrow_mut() = Default::default()) +} + +fn try_push_tls(state: &Arc, runnable: Runnable) -> Result<(), Runnable> { + TLS.with(|tls| { + let tls = tls.try_borrow_mut(); + if let Ok(mut tls) = tls { + if let Some(tlsdata) = tls.as_mut() { + if !Arc::ptr_eq(state, &tlsdata.state) { + return Err(runnable); + } + tlsdata.pending_tasks.push(runnable); + // notify ticker + // eprintln!("successfully pushed locally"); + if let Some(v) = tlsdata.ticker.wake() { + v.wake() + } + Ok(()) + } else { + Err(runnable) + } + } else { + Err(runnable) + } + }) +} + +fn try_pop_tls() -> Option> { + TLS.with(|tls| { + let mut tls = tls.borrow_mut(); + if let Some(tlsdata) = tls.as_mut() { + Some(std::mem::replace( + &mut tlsdata.pending_tasks, + Vec::with_capacity(4), + )) + } else { + None + } + }) +} + /// A worker in a work-stealing executor. /// /// This is just a ticker that also has an associated local queue for improved cache locality. #[derive(Debug)] -struct Runner<'a> { +struct Runner { /// The executor state. - state: &'a State, + state: Arc, /// Inner ticker. - ticker: Ticker<'a>, + ticker: Arc, /// The local queue. - local: Arc>, + local: LocalQueue, /// Bumped every time a runnable task is found. - ticks: AtomicUsize, + ticks: usize, + + /// ID. + id: usize, } -impl Runner<'_> { +impl Runner { /// Creates a runner and registers it in the executor state. - fn new(state: &State) -> Runner<'_> { - let runner = Runner { - state, - ticker: Ticker::new(state), - local: Arc::new(ConcurrentQueue::bounded(512)), - ticks: AtomicUsize::new(0), + fn new(state: Arc) -> Runner { + let mut runner = Runner { + state: state.clone(), + ticker: Arc::new(Ticker::new(state.clone())), + local: LocalQueue::default(), + ticks: 0, + id: 0, }; - state - .local_queues - .write() - .unwrap() - .push(runner.local.clone()); + runner.id = state.local_queues.write().insert(runner.local.handle()); runner } + /// Sets as active in the TLS + fn set_tls_active(&self) { + // let weak_ticker = Arc::downgrade(&self.ticker); + // let weak_local = Arc::downgrade(&self.local); + TLS.with(|tls| { + let mut tls = tls.borrow_mut(); + if tls.is_none() { + *tls = Some(TlsData { + state: self.state.clone(), + ticker: self.ticker.clone(), + pending_tasks: Vec::new(), + }) + } + }) + } + /// Waits for the next runnable task to run. - async fn runnable(&self) -> Runnable { + async fn runnable(&mut self) -> Runnable { + // static USELESS_WAKEUP_COUNT: AtomicUsize = AtomicUsize::new(0); + // static GOOD_WAKEUP_COUNT: AtomicUsize = AtomicUsize::new(0); + let runnable = self .ticker + .clone() .runnable_with(|| { + let must_yield = JUST_YIELDED.with(|v| v.replace(false)); + // Try the TLS. + if let Some(r) = try_pop_tls() { + for task in r { + // SAFETY: only one thread can push to self.local at the same time + if let Err(task) = self.local.push(must_yield, task) { + self.state.queue.push(task); + } + } + } + // Try the local queue. - if let Ok(r) = self.local.pop() { + if let Some(r) = self.local.pop() { return Some(r); } + self.state.searching_count.fetch_add(1, Ordering::Relaxed); // Try stealing from the global queue. - if let Ok(r) = self.state.queue.pop() { - steal(&self.state.queue, &self.local); + self.local.steal_global(&self.state.queue); + if let Some(r) = self.local.pop() { + self.state.searching_count.fetch_sub(1, Ordering::Relaxed); return Some(r); } // Try stealing from other runners. - let local_queues = self.state.local_queues.read().unwrap(); + let local_queues = self.state.local_queues.read(); // Pick a random starting point in the iterator list and rotate the list. let n = local_queues.len(); @@ -770,70 +897,47 @@ impl Runner<'_> { .take(n); // Remove this runner's local queue. - let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); + let id = self.id; + let iter = iter.filter(|local| local.0 != id); // Try stealing from each local queue in the list. - for local in iter { - steal(local, &self.local); - if let Ok(r) = self.local.pop() { + for (_, local) in iter { + self.local.steal_local(local); + if let Some(r) = self.local.pop() { + self.state.searching_count.fetch_sub(1, Ordering::Relaxed); return Some(r); } } + self.state.searching_count.fetch_sub(1, Ordering::Relaxed); None }) .await; // Bump the tick counter. - let ticks = self.ticks.fetch_add(1, Ordering::SeqCst); + self.ticks += 1; - if ticks % 64 == 0 { + if self.ticks % 64 == 0 { // Steal tasks from the global queue to ensure fair task scheduling. - steal(&self.state.queue, &self.local); + self.local.steal_global(&self.state.queue) } runnable } } -impl Drop for Runner<'_> { +impl Drop for Runner { fn drop(&mut self) { // Remove the local queue. - self.state - .local_queues - .write() - .unwrap() - .retain(|local| !Arc::ptr_eq(local, &self.local)); + self.state.local_queues.write().remove(self.id); // Re-schedule remaining tasks in the local queue. - while let Ok(r) = self.local.pop() { + // SAFETY: this cannot possibly be run from two different threads concurrently. + while let Some(r) = self.local.pop() { r.schedule(); } } } - -/// Steals some items from one queue into another. -fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { - // Half of `src`'s length rounded up. - let mut count = (src.len() + 1) / 2; - - if count > 0 { - // Don't steal more than fits into the queue. - if let Some(cap) = dest.capacity() { - count = count.min(cap - dest.len()); - } - - // Steal tasks. - for _ in 0..count { - if let Ok(t) = src.pop() { - assert!(dest.push(t).is_ok()); - } else { - break; - } - } - } -} - /// Runs a closure when dropped. struct CallOnDrop(F); diff --git a/src/taskqueue.rs b/src/taskqueue.rs new file mode 100644 index 0000000..b191072 --- /dev/null +++ b/src/taskqueue.rs @@ -0,0 +1,102 @@ +use std::cell::UnsafeCell; + +use async_task::Runnable; + +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; + +#[derive(Debug)] +pub struct GlobalQueue { + inner: Injector, +} + +impl Default for GlobalQueue { + fn default() -> Self { + Self { + inner: Injector::default(), + } + } +} + +impl GlobalQueue { + pub fn push(&self, task: Runnable) { + self.inner.push(task) + } + + pub fn pop(&self) -> Option { + loop { + match self.inner.steal() { + Steal::Retry => continue, + Steal::Empty => return None, + Steal::Success(v) => return Some(v), + } + } + } +} + +#[derive(Debug, Clone)] +pub struct LocalQueueHandle { + inner: Stealer, +} + +#[derive(Debug)] +pub struct LocalQueue { + inner: Worker, + next_task: Option, +} + +impl Default for LocalQueue { + fn default() -> Self { + Self { + inner: Worker::new_fifo(), + next_task: Default::default(), + } + } +} + +impl LocalQueue { + #[inline] + pub fn push(&mut self, is_yield: bool, task: Runnable) -> Result<(), Runnable> { + // if this is the same task as last time, we don't push to next_task + // if is_yield { + self.inner.push(task); + // } else { + // let next_task = self.next_task(); + // if let Some(task) = next_task.replace(task) { + // self.inner.push(task); + // } + // } + Ok(()) + } + + #[inline] + pub fn pop(&mut self) -> Option { + // let next_task = self.next_task(); + // if let Some(next_task) = next_task.take() { + // Some(next_task) + // } else { + self.inner.pop() + // } + } + + #[inline] + pub fn len(&self) -> usize { + self.inner.len() + } + + #[inline] + pub fn steal_global(&self, other: &GlobalQueue) { + std::iter::repeat_with(|| other.inner.steal_batch(&self.inner)).find(|v| !v.is_retry()); + } + + #[inline] + pub fn steal_local(&self, other: &LocalQueueHandle) { + let _ = other.inner.steal_batch(&self.inner); + } + + #[inline] + pub fn handle(&self) -> LocalQueueHandle { + LocalQueueHandle { + inner: self.inner.stealer(), + } + } +}