diff --git a/src/lib.rs b/src/lib.rs index e23b4c9..d41627d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ use std::future::Future; use std::pin::Pin; +use std::rc::Rc; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -166,32 +167,7 @@ impl Executor { /// assert_eq!(res, 6); /// ``` pub fn run(&self, future: impl Future) -> T { - self.enter(|| { - let (p, u) = parking::pair(); - - let ticker = self.ex.ticker({ - let u = u.clone(); - move || u.unpark() - }); - - pin!(future); - let waker = waker_fn(move || u.unpark()); - let cx = &mut Context::from_waker(&waker); - - 'start: loop { - if let Poll::Ready(t) = future.as_mut().poll(cx) { - break t; - } - - for _ in 0..200 { - if !ticker.tick() { - p.park(); - continue 'start; - } - } - p.park_timeout(Duration::from_secs(0)); - } - }) + self.enter(|| run(future)) } } @@ -275,7 +251,7 @@ impl Spawner { #[derive(Debug)] pub struct LocalExecutor { ex: multitask::LocalExecutor, - parker: parking::Parker, + parker: Rc, } impl LocalExecutor { @@ -292,10 +268,33 @@ impl LocalExecutor { let (p, u) = parking::pair(); LocalExecutor { ex: multitask::LocalExecutor::new(move || u.unpark()), - parker: p, + parker: Rc::new(p), } } + /// Enters the context of an executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::{LocalExecutor, Task}; + /// + /// let local_ex = LocalExecutor::new(); + /// + /// local_ex.enter(|| { + /// // `Task::local()` now knows which executor to spawn onto. + /// let task = Task::local(async { + /// println!("Hello world"); + /// }); + /// }); + /// ``` + pub fn enter(&self, f: impl FnOnce() -> T) -> T { + if LOCAL_EX.is_set() { + panic!("cannot call `LocalExecutor::enter()` if already inside an `LocalExecutor`"); + } + LOCAL_EX.set(self, f) + } + /// Spawns a task onto the executor. /// /// # Examples @@ -328,33 +327,68 @@ impl LocalExecutor { /// assert_eq!(res, 6); /// ``` pub fn run(&self, future: impl Future) -> T { - pin!(future); + self.enter(|| run(future)) + } +} + +impl Default for LocalExecutor { + fn default() -> LocalExecutor { + LocalExecutor::new() + } +} + +/// Runs both Executor and LocalExecutor +fn run(future: impl Future) -> T { + // If local executor is initialized, then use its parker + let p = if LOCAL_EX.is_set() { + LOCAL_EX.with(|local_ex| Rc::clone(&local_ex.parker)) + } else { + Rc::new(parking::Parker::new()) + }; + + let u = p.unparker(); + + let ticker = if EX.is_set() { + let u = u.clone(); + let notify = move || u.unpark(); + Some(EX.with(|ex| ex.ex.ticker(notify))) + } else { + None + }; + + pin!(future); + let waker = waker_fn(move || u.unpark()); + let cx = &mut Context::from_waker(&waker); + + 'start: loop { + if let Poll::Ready(t) = future.as_mut().poll(cx) { + break t; + } + + let mut ticks = 0; - let u = self.parker.unparker(); - let waker = waker_fn(move || u.unpark()); - let cx = &mut Context::from_waker(&waker); + while ticks < 200 { + let prev_ticks = ticks; - LOCAL_EX.set(self, || { - 'start: loop { - if let Poll::Ready(t) = future.as_mut().poll(cx) { - break t; + if let Some(ref ticker) = ticker { + if ticker.tick() { + ticks += 1; } + } - for _ in 0..200 { - if !self.ex.tick() { - self.parker.park(); - continue 'start; - } + if LOCAL_EX.is_set() { + if LOCAL_EX.with(|local_ex| local_ex.ex.tick()) { + ticks += 1; } - self.parker.park_timeout(Duration::from_secs(0)); } - }) - } -} -impl Default for LocalExecutor { - fn default() -> LocalExecutor { - LocalExecutor::new() + if ticks == prev_ticks { + p.park(); + continue 'start; + } + } + + p.park_timeout(Duration::from_secs(0)); } }