Skip to content

Refactor futures executor #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 28, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions src/webcore/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
// This file implements a futures-compatible executor which schedules futures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add in TODO for easier greppability.

// onto the JavaScript event loop. This implementation assumes there is a
// single thread and is *not* compatible with multiple WebAssembly workers sharing
// the same address space.
//
// TODO: Implement support for multiple threads. This will require a mechanism to
// wake up another thread, such as the `postMessage` API.

use futures::future::{Future, ExecuteError, Executor};
use futures::executor::{self, Notify, Spawn};
use futures::Async;
use std::collections::VecDeque;
use std::result::Result as StdResult;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::cmp;
use webcore::try_from::TryInto;
use webcore::value::Reference;


// TODO: Determine optimal values for these constants
// Initial capacity of the event queue
const INITIAL_QUEUE_CAPACITY: usize = 10;
// Iterations to wait before allowing the queue to shrink
const QUEUE_SHRINK_DELAY: usize = 25;


// This functionality should really be in libstd, because the implementation
// looks stupid.
unsafe fn clone_raw< T >( ptr: *const T ) -> Rc< T > {
let result = Rc::from_raw( ptr );
::std::mem::forget( result.clone() );
result
}

// Typing this out is tedious
type BoxedFuture = Box< Future< Item = (), Error = () > + 'static >;

struct SpawnedTask {
is_queued: Cell< bool >,
spawn: RefCell< Option< Spawn< BoxedFuture > > >,
}

impl SpawnedTask {
fn new< F >( future: F ) -> Rc< Self >
where F: Future< Item = (), Error = () > + 'static {
Rc::new( Self {
is_queued: Cell::new( false ),
spawn: RefCell::new( Some( executor::spawn(
Box::new( future ) as BoxedFuture
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested it, but is it actually needed to convert this into a Box? Looking at the docs for spawn, it doesn't seem to require a Box

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spawn<T>(future: T) returns a Spawn<T> - ie. it does not perform type-erasure. The box is needed to erase the type.

) ) )
} )
}

fn poll( &self ) {
let mut spawn = self.spawn.borrow_mut();

// Take the future so that if we panic it gets dropped
if let Some( mut spawn_future ) = spawn.take() {
// Clear `is_queued` flag so that it will re-queue if poll calls task.notify()
self.is_queued.set( false );

if spawn_future.poll_future_notify( &&EventLoop, self as *const _ as usize ) == Ok( Async::NotReady ) {
// Future was not ready, so put it back
*spawn = Some( spawn_future );
}
}
}

fn notify( task: Rc< SpawnedTask > ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be marked #[inline]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until we find a performance issue due to poor codegen, I'd rather let the compiler decide whether to inline it.

// If not already queued
if !task.is_queued.replace( true ) {
EventLoop.push_task(task);
}
}
}

// A proxy for the JavaScript event loop.
struct EventLoop;

// There's only one thread, but this lets us tell the compiler that we
// don't need a `Sync` bound, and also gives us lazy initialization.
thread_local! {
static EVENT_LOOP_INNER: EventLoopInner = EventLoopInner::new();
}

impl EventLoop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions should probably be marked as #[inline]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

fn drain(&self) {
EVENT_LOOP_INNER.with(EventLoopInner::drain)
}
fn push_task(&self, task: Rc< SpawnedTask >) {
EVENT_LOOP_INNER.with(|inner| inner.push_task(task))
}
}

// State relating to the JavaScript event loop. Only one instance ever exists.
struct EventLoopInner {
// Avoid unnecessary allocation and interop by keeping a local
// queue of pending tasks.
microtask_queue: RefCell< VecDeque< Rc< SpawnedTask > > >,
waker: Reference,
shrink_counter: Cell<usize>
}

// Not strictly necessary, but may become relevant in the future
impl Drop for EventLoopInner {
#[inline]
fn drop( &mut self ) {
js! { @(no_return)
@{&self.waker}.drop();
}
}
}

impl EventLoopInner {
// Initializes the event loop. Only called once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not true when wasm gets multiple threads.

fn new() -> Self {
EventLoopInner {
microtask_queue: RefCell::new(VecDeque::with_capacity(INITIAL_QUEUE_CAPACITY)),
waker: js!(
var callback = @{|| EventLoop.drain()};
var wrapper = function() {
if (!callback.dropped) { callback() }
};
var nextTick;

// Modern browsers can use `MutationObserver` which allows
// us to schedule a micro-task without allocating a promise.
// https://dom.spec.whatwg.org/#notify-mutation-observers
if ( typeof MutationObserver === "function" ) {
var node = document.createTextNode( "0" );
var state = false;

new MutationObserver( wrapper ).observe( node, { characterData: true } );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the original code doesn't clean this up, but it really should clean up the MutationObserver when the EventLoopInner is dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will happen automatically - a MutationObserver is garbage collected when its target node is collected. The target node will be garbage collected when the waker is dropped. The waker will be dropped when the EventLoopInner is dropped.


nextTick = function() {
state = !state;
node.data = ( state ? "1" : "0" );
};

// Node.js and other environments
} else {
var promise = Promise.resolve( null );

nextTick = function() {
promise.then( wrapper );
};
}

nextTick.drop = function() {
callback.dropped = true;
callback.drop();
};

return nextTick;
).try_into().unwrap(),
shrink_counter: Cell::new(0)
}
}
// Pushes a task onto the queue
fn push_task(&self, task: Rc< SpawnedTask >) {
let mut queue = self.microtask_queue.borrow_mut();
queue.push_back(task);

// If the queue was previously empty, then we need to schedule
// the queue to be drained.
if queue.len() == 1 {
self.wake();
}
}
// Invoke the JavaScript waker function
fn wake(&self) {
js! { @(no_return) @{&self.waker}(); }
}
// Remove and return a task from the front of the queue
fn pop_task(&self) -> Option< Rc< SpawnedTask > > {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be marked as #[inline]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

self.microtask_queue.borrow_mut().pop_front()
}
// See if it's worth trying to reclaim some space from the queue
fn estimate_realloc_capacity(&self) -> Option<usize> {
let queue = self.microtask_queue.borrow();
// A VecDeque retains a `2^n-1` capacity
let half_cap = queue.capacity()/2;
// We consider shrinking the queue if it is less than
// half full...
if half_cap > queue.len() && half_cap > INITIAL_QUEUE_CAPACITY {
// ...and if it's been that way for at least
// `QUEUE_SHRINK_DELAY` iterations.
let shrink_counter = self.shrink_counter.get();
if shrink_counter < QUEUE_SHRINK_DELAY {
self.shrink_counter.set(shrink_counter + 1);
} else {
self.shrink_counter.set(0);
return Some(cmp::max(queue.len(), INITIAL_QUEUE_CAPACITY));
}
} else {
self.shrink_counter.set(0);
}
None
}
// Poll the queue until it is empty
fn drain(&self) {
let maybe_realloc_capacity = self.estimate_realloc_capacity();

// Poll all the pending tasks
while let Some(task) = self.pop_task() {
task.poll();
}

if let Some(realloc_capacity) = maybe_realloc_capacity {
// We decided to reclaim some space
*self.microtask_queue.borrow_mut() = VecDeque::with_capacity(realloc_capacity);
}
}
}

impl< F > Executor< F > for EventLoop where
F: Future< Item = (), Error = () > + 'static {
fn execute( &self, future: F ) -> StdResult< (), ExecuteError< F > > {
SpawnedTask::notify( SpawnedTask::new( future ) );
Ok( () )
}
}

impl Notify for EventLoop {
fn notify( &self, spawned_id: usize ) {
SpawnedTask::notify( unsafe { clone_raw( spawned_id as *const _ ) } );
}

fn clone_id( &self, id: usize ) -> usize {
unsafe { Rc::into_raw( clone_raw( id as *const SpawnedTask ) ) as usize }
}

fn drop_id( &self, id: usize ) {
unsafe { Rc::from_raw( id as *const SpawnedTask ) };
}
}

#[inline]
pub fn spawn< F >( future: F ) where
F: Future< Item = (), Error = () > + 'static {
EventLoop.execute( future ).unwrap();
}
2 changes: 1 addition & 1 deletion src/webcore/mod.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ pub mod discard;
pub mod promise_future;

#[cfg(feature = "futures")]
pub mod promise_executor;
pub mod executor;

#[cfg(feature = "nightly")]
pub mod void {
211 changes: 0 additions & 211 deletions src/webcore/promise_executor.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/webcore/promise_future.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use webcore::try_from::{TryInto, TryFrom};
use webapi::error;
use futures::{Future, Poll, Async};
use futures::unsync::oneshot::Receiver;
use webcore::promise_executor::spawn;
use webcore::executor::spawn;
use webcore::discard::DiscardOnDrop;
use super::promise::{Promise, DoneHandle};