Skip to content

Run both local end multi-threaded executors #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 82 additions & 48 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
Expand Down Expand Up @@ -166,32 +167,7 @@ impl Executor {
/// assert_eq!(res, 6);
/// ```
pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
self.enter(|| {
let (p, u) = parking::pair();

let ticker = self.ex.ticker({
let u = u.clone();
move || u.unpark()
});

pin!(future);
let waker = waker_fn(move || u.unpark());
let cx = &mut Context::from_waker(&waker);

'start: loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
break t;
}

for _ in 0..200 {
if !ticker.tick() {
p.park();
continue 'start;
}
}
p.park_timeout(Duration::from_secs(0));
}
})
self.enter(|| run(future))
}
}

Expand Down Expand Up @@ -275,7 +251,7 @@ impl Spawner {
#[derive(Debug)]
pub struct LocalExecutor {
ex: multitask::LocalExecutor,
parker: parking::Parker,
parker: Rc<parking::Parker>,
}

impl LocalExecutor {
Expand All @@ -292,10 +268,33 @@ impl LocalExecutor {
let (p, u) = parking::pair();
LocalExecutor {
ex: multitask::LocalExecutor::new(move || u.unpark()),
parker: p,
parker: Rc::new(p),
}
}

/// Enters the context of an executor.
///
/// # Examples
///
/// ```
/// use async_executor::{LocalExecutor, Task};
///
/// let local_ex = LocalExecutor::new();
///
/// local_ex.enter(|| {
/// // `Task::local()` now knows which executor to spawn onto.
/// let task = Task::local(async {
/// println!("Hello world");
/// });
/// });
/// ```
pub fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
if LOCAL_EX.is_set() {
panic!("cannot call `LocalExecutor::enter()` if already inside an `LocalExecutor`");
}
LOCAL_EX.set(self, f)
}

/// Spawns a task onto the executor.
///
/// # Examples
Expand Down Expand Up @@ -328,33 +327,68 @@ impl LocalExecutor {
/// assert_eq!(res, 6);
/// ```
pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
pin!(future);
self.enter(|| run(future))
}
}

impl Default for LocalExecutor {
fn default() -> LocalExecutor {
LocalExecutor::new()
}
}

/// Runs both Executor and LocalExecutor
fn run<T>(future: impl Future<Output = T>) -> T {
// If local executor is initialized, then use its parker
let p = if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| Rc::clone(&local_ex.parker))
} else {
Rc::new(parking::Parker::new())
};
Copy link
Author

@oblique oblique Jul 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this acceptable?
Another way is refactoring multitask and introducing LocalTicker.


let u = p.unparker();

let ticker = if EX.is_set() {
let u = u.clone();
let notify = move || u.unpark();
Some(EX.with(|ex| ex.ex.ticker(notify)))
} else {
None
};

pin!(future);
let waker = waker_fn(move || u.unpark());
let cx = &mut Context::from_waker(&waker);

'start: loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
break t;
}

let mut ticks = 0;

let u = self.parker.unparker();
let waker = waker_fn(move || u.unpark());
let cx = &mut Context::from_waker(&waker);
while ticks < 200 {
let prev_ticks = ticks;

LOCAL_EX.set(self, || {
'start: loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
break t;
if let Some(ref ticker) = ticker {
if ticker.tick() {
ticks += 1;
}
}

for _ in 0..200 {
if !self.ex.tick() {
self.parker.park();
continue 'start;
}
if LOCAL_EX.is_set() {
if LOCAL_EX.with(|local_ex| local_ex.ex.tick()) {
ticks += 1;
}
self.parker.park_timeout(Duration::from_secs(0));
}
})
}
}

impl Default for LocalExecutor {
fn default() -> LocalExecutor {
LocalExecutor::new()
if ticks == prev_ticks {
p.park();
continue 'start;
}
}

p.park_timeout(Duration::from_secs(0));
}
}

Expand Down