From 29d83002a27e6f47759b4a3bfe741fb061107816 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 20 May 2013 16:43:31 -0700 Subject: [PATCH 01/15] core::rt: Move uv idle tests to idle mod --- src/libcore/rt/uv/idle.rs | 62 +++++++++++++++++++++++++++++++++++++++ src/libcore/rt/uv/mod.rs | 54 ---------------------------------- 2 files changed, 62 insertions(+), 54 deletions(-) diff --git a/src/libcore/rt/uv/idle.rs b/src/libcore/rt/uv/idle.rs index 2cf0b5c48728..a81ab48696a3 100644 --- a/src/libcore/rt/uv/idle.rs +++ b/src/libcore/rt/uv/idle.rs @@ -89,3 +89,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { match self { &IdleWatcher(ptr) => ptr } } } + +#[cfg(test)] +mod test { + + use rt::uv::Loop; + use super::*; + use unstable::run_in_bare_thread; + + #[test] + #[ignore(reason = "valgrind - loop destroyed before watcher?")] + fn idle_new_then_close() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let idle_watcher = { IdleWatcher::new(&mut loop_) }; + idle_watcher.close(||()); + } + } + + #[test] + fn idle_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + let mut count = 10; + let count_ptr: *mut int = &mut count; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + if unsafe { *count_ptr == 10 } { + idle_watcher.stop(); + idle_watcher.close(||()); + } else { + unsafe { *count_ptr = *count_ptr + 1; } + } + } + loop_.run(); + loop_.close(); + assert_eq!(count, 10); + } + } + + #[test] + fn idle_start_stop_start() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + idle_watcher.stop(); + do idle_watcher.start |idle_watcher, status| { + assert!(status.is_none()); + let mut idle_watcher = idle_watcher; + idle_watcher.stop(); + idle_watcher.close(||()); + } + } + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 2bd657fd8641..8cc596b2876d 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -364,57 +364,3 @@ fn loop_smoke_test() { loop_.close(); } } - -#[test] -#[ignore(reason = "valgrind - loop destroyed before watcher?")] -fn idle_new_then_close() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let idle_watcher = { IdleWatcher::new(&mut loop_) }; - idle_watcher.close(||()); - } -} - -#[test] -fn idle_smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - let mut count = 10; - let count_ptr: *mut int = &mut count; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - if unsafe { *count_ptr == 10 } { - idle_watcher.stop(); - idle_watcher.close(||()); - } else { - unsafe { *count_ptr = *count_ptr + 1; } - } - } - loop_.run(); - loop_.close(); - assert_eq!(count, 10); - } -} - -#[test] -fn idle_start_stop_start() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - idle_watcher.stop(); - do idle_watcher.start |idle_watcher, status| { - assert!(status.is_none()); - let mut idle_watcher = idle_watcher; - idle_watcher.stop(); - idle_watcher.close(||()); - } - } - loop_.run(); - loop_.close(); - } -} From 807269041437411df49a9a893c86310283d6eb91 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 20 May 2013 18:16:09 -0700 Subject: [PATCH 02/15] core::rt: Add bindings for async uv handles --- src/libcore/rt/uv/async.rs | 105 +++++++++++++++++++++++++++++++++++++ src/libcore/rt/uv/mod.rs | 9 +++- 2 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 src/libcore/rt/uv/async.rs diff --git a/src/libcore/rt/uv/async.rs b/src/libcore/rt/uv/async.rs new file mode 100644 index 000000000000..0d032f512d38 --- /dev/null +++ b/src/libcore/rt/uv/async.rs @@ -0,0 +1,105 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::{c_int, c_void}; +use option::Some; +use rt::uv::uvll; +use rt::uv::uvll::UV_ASYNC; +use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback}; +use rt::uv::WatcherInterop; +use rt::uv::status_to_maybe_uv_error; + +pub struct AsyncWatcher(*uvll::uv_async_t); +impl Watcher for AsyncWatcher { } + +impl AsyncWatcher { + fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { + unsafe { + let handle = uvll::malloc_handle(UV_ASYNC); + assert!(handle.is_not_null()); + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + let data = watcher.get_watcher_data(); + data.async_cb = Some(cb); + assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb)); + return watcher; + } + + extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + let status = status_to_maybe_uv_error(watcher.native_handle(), status); + let data = watcher.get_watcher_data(); + let cb = data.async_cb.get_ref(); + (*cb)(watcher, status); + } + } + + fn send(&mut self) { + unsafe { + let handle = self.native_handle(); + uvll::async_send(handle); + } + } + + fn close(self, cb: NullCallback) { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + + unsafe { + uvll::close(self.native_handle(), close_cb); + } + + extern fn close_cb(handle: *uvll::uv_stream_t) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + { + let data = watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); + } + watcher.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void); } + } + } +} + +impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher { + fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher { + AsyncWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_async_t { + match self { &AsyncWatcher(ptr) => ptr } + } +} + +#[cfg(test)] +mod test { + + use super::*; + use rt::uv::Loop; + use unstable::run_in_bare_thread; + use rt::thread::Thread; + use cell::Cell; + + #[test] + fn smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) ); + let watcher_cell = Cell(watcher); + let _thread = do Thread::start { + let mut watcher = watcher_cell.take(); + watcher.send(); + }; + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 8cc596b2876d..5f9e56608149 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -57,6 +57,7 @@ pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; +pub use self::async::AsyncWatcher; /// The implementation of `rtio` for libuv pub mod uvio; @@ -68,6 +69,7 @@ pub mod file; pub mod net; pub mod idle; pub mod timer; +pub mod async; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -125,6 +127,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(FsRequest, Option); pub type TimerCallback = ~fn(TimerWatcher, Option); +pub type AsyncCallback = ~fn(AsyncWatcher, Option); /// Callbacks used by StreamWatchers, set as custom data on the foreign handle @@ -135,7 +138,8 @@ struct WatcherData { close_cb: Option, alloc_cb: Option, idle_cb: Option, - timer_cb: Option + timer_cb: Option, + async_cb: Option } pub trait WatcherInterop { @@ -164,7 +168,8 @@ impl> WatcherInterop for W { close_cb: None, alloc_cb: None, idle_cb: None, - timer_cb: None + timer_cb: None, + async_cb: None }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); From 6d8d73cfc4cba2fdb2ee67448df39d89be08ce69 Mon Sep 17 00:00:00 2001 From: James Miller Date: Tue, 21 May 2013 17:31:24 +1200 Subject: [PATCH 03/15] Add AtomicUint newtype --- src/libcore/unstable/sync.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/libcore/unstable/sync.rs b/src/libcore/unstable/sync.rs index 734368c70c4a..7c228ff56477 100644 --- a/src/libcore/unstable/sync.rs +++ b/src/libcore/unstable/sync.rs @@ -205,6 +205,26 @@ extern { fn rust_unlock_little_lock(lock: rust_little_lock); } +/* *********************************************************************/ + +//FIXME: #5042 This should be replaced by proper atomic type +pub struct AtomicUint(uint); +pub impl AtomicUint { + fn load(&self) -> uint { + unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint } + } + fn store(&mut self, val:uint) { + unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); } + } + fn add(&mut self, val:int) -> uint { + unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint } + } + fn cas(&self, old:uint, new:uint) -> uint { + unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint } + } +} + + #[cfg(test)] mod tests { use comm; From 8f77a6f422184dacc14ae1b6a042c321e06bef88 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 21 May 2013 17:36:59 -0700 Subject: [PATCH 04/15] core: Add AtomicInt and cleanup --- src/libcore/unstable/sync.rs | 61 ++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/src/libcore/unstable/sync.rs b/src/libcore/unstable/sync.rs index 7c228ff56477..6085ca1a482e 100644 --- a/src/libcore/unstable/sync.rs +++ b/src/libcore/unstable/sync.rs @@ -208,25 +208,50 @@ extern { /* *********************************************************************/ //FIXME: #5042 This should be replaced by proper atomic type -pub struct AtomicUint(uint); -pub impl AtomicUint { - fn load(&self) -> uint { +pub struct AtomicUint { + priv inner: uint +} + +impl AtomicUint { + pub fn new(val: uint) -> AtomicUint { AtomicUint { inner: val } } + pub fn load(&self) -> uint { unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint } } - fn store(&mut self, val:uint) { + pub fn store(&mut self, val: uint) { unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); } } - fn add(&mut self, val:int) -> uint { + pub fn add(&mut self, val: int) -> uint { unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint } } - fn cas(&self, old:uint, new:uint) -> uint { + pub fn cas(&mut self, old:uint, new: uint) -> uint { unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint } } } +pub struct AtomicInt { + priv inner: int +} + +impl AtomicInt { + pub fn new(val: int) -> AtomicInt { AtomicInt { inner: val } } + pub fn load(&self) -> int { + unsafe { intrinsics::atomic_load(&self.inner) } + } + pub fn store(&mut self, val: int) { + unsafe { intrinsics::atomic_store(&mut self.inner, val); } + } + pub fn add(&mut self, val: int) -> int { + unsafe { intrinsics::atomic_xadd(&mut self.inner, val) } + } + pub fn cas(&mut self, old: int, new: int) -> int { + unsafe { intrinsics::atomic_cxchg(&mut self.inner, old, new) } + } +} + #[cfg(test)] mod tests { + use super::*; use comm; use super::exclusive; use task; @@ -278,4 +303,28 @@ mod tests { assert_eq!(*one, 1); } } + + #[test] + fn atomic_int_smoke_test() { + let mut i = AtomicInt::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } + + #[test] + fn atomic_uint_smoke_test() { + let mut i = AtomicUint::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } } From a0cd55a1d7436dc9532ddf5cdad7d1f7e8f108f3 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 20 May 2013 18:23:56 -0700 Subject: [PATCH 05/15] core::rt: Add RemoteCallback trait and uv implementation This is used for signalling the event loop from other threads. --- src/libcore/rt/rtio.rs | 11 +++++ src/libcore/rt/uv/async.rs | 6 +-- src/libcore/rt/uv/uvio.rs | 86 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs index 4b5eda22ff5d..fa657555f3aa 100644 --- a/src/libcore/rt/rtio.rs +++ b/src/libcore/rt/rtio.rs @@ -18,6 +18,7 @@ use rt::uv::uvio; // XXX: ~object doesn't work currently so these are some placeholder // types to use instead pub type EventLoopObject = uvio::UvEventLoop; +pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; @@ -26,10 +27,20 @@ pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn callback_ms(&mut self, ms: u64, ~fn()); + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; /// The asynchronous I/O services. Not all event loops may provide one fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; } +pub trait RemoteCallback { + /// Trigger the remote callback. Note that the number of times the callback + /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire', + /// the callback will be called at least once, but multiple callbacks may be coalesced + /// and callbacks may be called more often requested. Destruction also triggers the + /// callback. + fn fire(&mut self); +} + pub trait IoFactory { fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; diff --git a/src/libcore/rt/uv/async.rs b/src/libcore/rt/uv/async.rs index 0d032f512d38..6ed06cc10b78 100644 --- a/src/libcore/rt/uv/async.rs +++ b/src/libcore/rt/uv/async.rs @@ -20,7 +20,7 @@ pub struct AsyncWatcher(*uvll::uv_async_t); impl Watcher for AsyncWatcher { } impl AsyncWatcher { - fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { + pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { unsafe { let handle = uvll::malloc_handle(UV_ASYNC); assert!(handle.is_not_null()); @@ -41,14 +41,14 @@ impl AsyncWatcher { } } - fn send(&mut self) { + pub fn send(&mut self) { unsafe { let handle = self.native_handle(); uvll::async_send(handle); } } - fn close(self, cb: NullCallback) { + pub fn close(self, cb: NullCallback) { let mut this = self; let data = this.get_watcher_data(); assert!(data.close_cb.is_none()); diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index cacd67314eba..cf1bd568d028 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -12,6 +12,7 @@ use option::*; use result::*; use ops::Drop; use cell::{Cell, empty_cell}; +use cast; use cast::transmute; use clone::Clone; use rt::io::IoError; @@ -23,6 +24,8 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; +use unstable::sync::{UnsafeAtomicRcBox, AtomicInt}; +use unstable::intrinsics; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -82,6 +85,10 @@ impl EventLoop for UvEventLoop { } } + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + } + fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { Some(&mut self.uvio) } @@ -101,6 +108,85 @@ fn test_callback_run_once() { } } +pub struct UvRemoteCallback { + // The uv async handle for triggering the callback + async: AsyncWatcher, + // An atomic flag to tell the callback to exit, + // set from the dtor. + exit_flag: UnsafeAtomicRcBox +} + +impl UvRemoteCallback { + pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback { + let exit_flag = UnsafeAtomicRcBox::new(AtomicInt::new(0)); + let exit_flag_clone = exit_flag.clone(); + let async = do AsyncWatcher::new(loop_) |watcher, status| { + assert!(status.is_none()); + f(); + let exit_flag_ptr = exit_flag_clone.get(); + unsafe { + if (*exit_flag_ptr).load() == 1 { + watcher.close(||()); + } + } + }; + UvRemoteCallback { + async: async, + exit_flag: exit_flag + } + } +} + +impl RemoteCallback for UvRemoteCallback { + fn fire(&mut self) { self.async.send() } +} + +impl Drop for UvRemoteCallback { + fn finalize(&self) { + unsafe { + let mut this: &mut UvRemoteCallback = cast::transmute_mut(self); + let exit_flag_ptr = this.exit_flag.get(); + (*exit_flag_ptr).store(1); + this.async.send(); + } + } +} + +#[cfg(test)] +mod test_remote { + use super::*; + use cell; + use cell::Cell; + use rt::test::*; + use rt::thread::Thread; + use rt::tube::Tube; + use rt::rtio::EventLoop; + use rt::local::Local; + use rt::sched::Scheduler; + + #[test] + fn test_uv_remote() { + do run_in_newsched_task { + let mut tube = Tube::new(); + let tube_clone = tube.clone(); + let remote_cell = cell::empty_cell(); + do Local::borrow::() |sched| { + let tube_clone = tube_clone.clone(); + let tube_clone_cell = Cell(tube_clone); + let remote = do sched.event_loop.remote_callback { + tube_clone_cell.take().send(1); + }; + remote_cell.put_back(remote); + } + let _thread = do Thread::start { + remote_cell.take().fire(); + }; + + assert!(tube.recv() == 1); + } + } +} + pub struct UvIoFactory(Loop); pub impl UvIoFactory { From 41c21685dd149fb95dededfb4edaf87c6603c099 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 22 May 2013 15:39:39 -0700 Subject: [PATCH 06/15] core::rt: Add SchedHandle type --- src/libcore/rt/sched.rs | 101 +++++++++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 50c6a894093f..3f7b332e184b 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -15,7 +15,7 @@ use cell::Cell; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::Task; use rt::local_ptr; @@ -41,16 +41,19 @@ pub struct Scheduler { priv cleanup_job: Option } -// XXX: Some hacks to put a &fn in Scheduler without borrowck -// complaining -type UnsafeTaskReceiver = sys::Closure; -trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); +pub struct Coroutine { + /// The segment of stack on which the task is currently running or, + /// if the task is blocked, on which the task will resume execution + priv current_stack_segment: StackSegment, + /// These are always valid when the task is not running, unless + /// the task is dead + priv saved_context: Context, + /// The heap, GC, unwinding, local storage, logging + task: ~Task } -impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject } enum CleanupJob { @@ -103,6 +106,17 @@ pub impl Scheduler { return sched; } + fn make_handle(&mut self) -> SchedHandle { + let remote = self.event_loop.remote_callback(wake_up); + + return SchedHandle { + remote: remote + }; + + fn wake_up() { + } + } + /// Schedule a task to be executed later. /// /// Pushes the task onto the work stealing queue and tells the event loop @@ -337,19 +351,6 @@ pub impl Scheduler { } } -static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack - -pub struct Coroutine { - /// The segment of stack on which the task is currently running or, - /// if the task is blocked, on which the task will resume execution - priv current_stack_segment: StackSegment, - /// These are always valid when the task is not running, unless - /// the task is dead - priv saved_context: Context, - /// The heap, GC, unwinding, local storage, logging - task: ~Task -} - pub impl Coroutine { fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { Coroutine::with_task(stack_pool, ~Task::new(), start) @@ -358,6 +359,9 @@ pub impl Coroutine { fn with_task(stack_pool: &mut StackPool, task: ~Task, start: ~fn()) -> Coroutine { + + static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack + let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn @@ -401,6 +405,18 @@ pub impl Coroutine { } } +// XXX: Some hacks to put a &fn in Scheduler without borrowck +// complaining +type UnsafeTaskReceiver = sys::Closure; +trait ClosureConverter { + fn from_fn(&fn(~Coroutine)) -> Self; + fn to_fn(self) -> &fn(~Coroutine); +} +impl ClosureConverter for UnsafeTaskReceiver { + fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } +} + #[cfg(test)] mod test { use int; @@ -411,6 +427,7 @@ mod test { use rt::local::Local; use rt::test::*; use super::*; + use rt::thread::Thread; #[test] fn test_simple_scheduling() { @@ -551,4 +568,42 @@ mod test { } } } + + #[test] + fn handle() { + use rt::comm::*; + + do run_in_bare_thread { + let (port, chan) = oneshot::<()>(); + let port_cell = Cell(port); + let chan_cell = Cell(chan); + let mut sched1 = ~UvEventLoop::new_scheduler(); + let handle1 = sched1.make_handle(); + let handle1_cell = Cell(handle1); + let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { + chan_cell.take().send(()); + }; + sched1.enqueue_task(task1); + + let mut sched2 = ~UvEventLoop::new_scheduler(); + let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { + port_cell.take().recv(); + // Release the other scheduler's handle so it can exit + handle1_cell.take(); + }; + sched2.enqueue_task(task2); + + let sched1_cell = Cell(sched1); + let _thread1 = do Thread::start { + let mut sched1 = sched1_cell.take(); + sched1.run(); + }; + + let sched2_cell = Cell(sched2); + let _thread2 = do Thread::start { + let mut sched2 = sched2_cell.take(); + sched2.run(); + }; + } + } } From 8b7e392752eddc202dae12c6b89b7c59556990ce Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 22 May 2013 21:20:19 -0700 Subject: [PATCH 07/15] core::rt: Scheduler takes a WorkQueue This will be for implementing a work-sharing strategy --- src/libcore/rt/mod.rs | 4 +++- src/libcore/rt/sched.rs | 4 ++-- src/libcore/rt/uv/uvio.rs | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 2fac1df01a49..b6abab38da79 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -145,12 +145,14 @@ pub mod thread_local_storage; pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { use self::sched::{Scheduler, Coroutine}; + use self::work_queue::WorkQueue; use self::uv::uvio::UvEventLoop; init(crate_map); let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_); + let work_queue = WorkQueue::new(); + let mut sched = ~Scheduler::new(loop_, work_queue); let main_task = ~Coroutine::new(&mut sched.stack_pool, main); sched.enqueue_task(main_task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 3f7b332e184b..f1670d4896a4 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -65,14 +65,14 @@ pub impl Scheduler { fn in_task_context(&self) -> bool { self.current_task.is_some() } - fn new(event_loop: ~EventLoopObject) -> Scheduler { + fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Coroutine>) -> Scheduler { // Lazily initialize the runtime TLS key local_ptr::init_tls_key(); Scheduler { event_loop: event_loop, - work_queue: WorkQueue::new(), + work_queue: work_queue, stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index cf1bd568d028..793a341bffbf 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -24,6 +24,7 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; +use rt::work_queue::WorkQueue; use unstable::sync::{UnsafeAtomicRcBox, AtomicInt}; use unstable::intrinsics; @@ -45,7 +46,7 @@ pub impl UvEventLoop { /// A convenience constructor fn new_scheduler() -> Scheduler { - Scheduler::new(~UvEventLoop::new()) + Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) } } From 7f107c415f1c88b016b9da0fa9c58e6b61f82589 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 22 May 2013 22:18:29 -0700 Subject: [PATCH 08/15] core::rt: Remove UvEventLoop::new_scheduler function --- src/libcore/rt/local.rs | 9 +++++---- src/libcore/rt/mod.rs | 3 ++- src/libcore/rt/sched.rs | 14 +++++++------- src/libcore/rt/test.rs | 12 ++++++++++-- src/libcore/rt/uv/uvio.rs | 5 ----- 5 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/libcore/rt/local.rs b/src/libcore/rt/local.rs index 64a384ddff0b..b4ecf9cd0616 100644 --- a/src/libcore/rt/local.rs +++ b/src/libcore/rt/local.rs @@ -85,30 +85,31 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { + use rt::test::*; use rt::sched::Scheduler; use rt::uv::uvio::UvEventLoop; use super::*; #[test] fn thread_local_scheduler_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn thread_local_scheduler_two_instances() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn borrow_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); unsafe { let _scheduler: *mut Scheduler = Local::unsafe_borrow(); diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index b6abab38da79..f136732c00b9 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -223,11 +223,12 @@ fn test_context() { use rt::uv::uvio::UvEventLoop; use cell::Cell; use rt::local::Local; + use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index f1670d4896a4..78c5da08c39b 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -435,7 +435,7 @@ mod test { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; @@ -452,7 +452,7 @@ mod test { let mut task_count = 0; let task_count_ptr: *mut int = &mut task_count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { let task = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } @@ -470,7 +470,7 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task1 = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::(); @@ -499,7 +499,7 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let start_task = ~do Coroutine::new(&mut sched.stack_pool) { run_task(count_ptr); @@ -528,7 +528,7 @@ mod test { #[test] fn test_block_task() { do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); @@ -577,7 +577,7 @@ mod test { let (port, chan) = oneshot::<()>(); let port_cell = Cell(port); let chan_cell = Cell(chan); - let mut sched1 = ~UvEventLoop::new_scheduler(); + let mut sched1 = ~new_test_uv_sched(); let handle1 = sched1.make_handle(); let handle1_cell = Cell(handle1); let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { @@ -585,7 +585,7 @@ mod test { }; sched1.enqueue_task(task1); - let mut sched2 = ~UvEventLoop::new_scheduler(); + let mut sched2 = ~new_test_uv_sched(); let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { port_cell.take().recv(); // Release the other scheduler's handle so it can exit diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index c60ae2bfeffc..0e2da452366c 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -16,6 +16,14 @@ use super::io::net::ip::{IpAddr, Ipv4}; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; +use rt::sched::Scheduler; + +pub fn new_test_uv_sched() -> Scheduler { + use rt::uv::uvio::UvEventLoop; + use rt::work_queue::WorkQueue; + + Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) +} /// Creates a new scheduler in a new thread and runs a task in it, /// then waits for the scheduler to exit. Failure of the task @@ -28,7 +36,7 @@ pub fn run_in_newsched_task(f: ~fn()) { let f = Cell(f); do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f.take()); @@ -155,7 +163,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread { let f = Cell(f); let thread = do Thread::start { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f.take()); diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index 793a341bffbf..e25b6140abbf 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -43,11 +43,6 @@ pub impl UvEventLoop { uvio: UvIoFactory(Loop::new()) } } - - /// A convenience constructor - fn new_scheduler() -> Scheduler { - Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) - } } impl Drop for UvEventLoop { From 3f8095e55043f35e08adba5fe5b0a2d687ebc514 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 23 May 2013 00:04:50 -0700 Subject: [PATCH 09/15] core::rt: Add a very basic multi-threaded scheduling test --- src/libcore/rt/sched.rs | 72 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 78c5da08c39b..e78d50beebe1 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -114,6 +114,8 @@ pub impl Scheduler { }; fn wake_up() { + let sched = Local::take::(); + sched.resume_task_from_queue(); } } @@ -127,8 +129,8 @@ pub impl Scheduler { self.event_loop.callback(resume_task_from_queue); fn resume_task_from_queue() { - let scheduler = Local::take::(); - scheduler.resume_task_from_queue(); + let sched = Local::take::(); + sched.resume_task_from_queue(); } } @@ -606,4 +608,70 @@ mod test { }; } } + + #[test] + fn multithreading() { + use clone::Clone; + use iter::Times; + use rt::work_queue::WorkQueue; + use rt::comm::*; + use container::Container; + use vec::OwnedVector; + use rt::rtio::RemoteCallback; + + do run_in_bare_thread { + let work_queue1 = WorkQueue::new(); + let work_queue2 = work_queue1.clone(); + + let loop1 = ~UvEventLoop::new(); + let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone()); + let handle1 = sched1.make_handle(); + let sched1_cell = Cell(sched1); + let handle1_cell = Cell(handle1); + + let loop2 = ~UvEventLoop::new(); + let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone()); + let handle2 = sched2.make_handle(); + let sched2_cell = Cell(sched2); + let handle2_cell = Cell(handle2); + + let _thread1 = do Thread::start { + let mut sched1 = sched1_cell.take(); + sched1.run(); + }; + + let _thread2 = do Thread::start { + let mut sched2 = sched2_cell.take(); + let handle1_cell = Cell(handle1_cell.take()); + let handle2_cell = Cell(handle2_cell.take()); + + let task = ~do Coroutine::new(&mut sched2.stack_pool) { + // Hold handles to keep the schedulers alive + let mut handle1 = handle1_cell.take(); + let mut handle2 = handle2_cell.take(); + + let mut ports = ~[]; + for 10.times { + let (port, chan) = oneshot(); + let chan_cell = Cell(chan); + do spawntask_later { + chan_cell.take().send(()); + } + ports.push(port); + + // Make sure the other scheduler is awake + handle1.remote.fire(); + handle2.remote.fire(); + } + + while !ports.is_empty() { + ports.pop().recv(); + } + }; + + sched2.enqueue_task(task); + sched2.run(); + }; + } + } } From dec9db10da062b1c528d46426d9f62e201d39bc6 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 28 May 2013 18:39:52 -0700 Subject: [PATCH 10/15] core::rt: Add SleeperList Just a simple place to stuff handles to sleeping schedulers. --- src/libcore/rt/mod.rs | 3 +++ src/libcore/rt/sleeper_list.rs | 46 ++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 src/libcore/rt/sleeper_list.rs diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index f136732c00b9..82496ec55894 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -88,6 +88,9 @@ mod work_queue; /// A parallel queue. mod message_queue; +/// A parallel data structure for tracking sleeping schedulers. +mod sleeper_list; + /// Stack segments and caching. mod stack; diff --git a/src/libcore/rt/sleeper_list.rs b/src/libcore/rt/sleeper_list.rs new file mode 100644 index 000000000000..9507dec001d5 --- /dev/null +++ b/src/libcore/rt/sleeper_list.rs @@ -0,0 +1,46 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Maintains a shared list of sleeping schedulers. Schedulers +//! use this to wake each other up. + +use container::Container; +use vec::OwnedVector; +use option::{Option, Some, None}; +use cell::Cell; +use unstable::sync::{Exclusive, exclusive}; +use rt::sched::{Scheduler, SchedHandle}; + +pub struct SleeperList { + priv stack: ~Exclusive<~[SchedHandle]> +} + +impl SleeperList { + pub fn new() -> SleeperList { + SleeperList { + stack: ~exclusive(~[]) + } + } + + pub fn push(&mut self, handle: SchedHandle) { + let handle = Cell(handle); + self.stack.with(|s| s.push(handle.take())); + } + + pub fn pop(&mut self) -> Option { + do self.stack.with |s| { + if !s.is_empty() { + Some(s.pop()) + } else { + None + } + } + } +} From ed8c3594bc86dd366e729d02c34915c783e6ac81 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 28 May 2013 19:53:55 -0700 Subject: [PATCH 11/15] core::rt: Add SleeperList to Scheduler --- src/libcore/rt/mod.rs | 4 +++- src/libcore/rt/sched.rs | 20 +++++++++++++++++--- src/libcore/rt/sleeper_list.rs | 9 +++++++++ src/libcore/rt/test.rs | 3 ++- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 82496ec55894..75036dcd28f8 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -150,12 +150,14 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { use self::sched::{Scheduler, Coroutine}; use self::work_queue::WorkQueue; use self::uv::uvio::UvEventLoop; + use self::sleeper_list::SleeperList; init(crate_map); let loop_ = ~UvEventLoop::new(); let work_queue = WorkQueue::new(); - let mut sched = ~Scheduler::new(loop_, work_queue); + let sleepers = SleeperList::new(); + let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); let main_task = ~Coroutine::new(&mut sched.stack_pool, main); sched.enqueue_task(main_task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index e78d50beebe1..2a99648fa045 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -13,6 +13,7 @@ use sys; use cast::transmute; use cell::Cell; +use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; @@ -27,7 +28,12 @@ use rt::rtio::IoFactoryObject; /// thread local storage and the running task is owned by the /// scheduler. pub struct Scheduler { + /// A queue of available work. Under a work-stealing policy there + /// is one per Scheduler. priv work_queue: WorkQueue<~Coroutine>, + /// A shared list of sleeping schedulers. We'll use this to wake + /// up schedulers when pushing work onto the work queue. + priv sleeper_list: SleeperList, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, @@ -65,12 +71,16 @@ pub impl Scheduler { fn in_task_context(&self) -> bool { self.current_task.is_some() } - fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Coroutine>) -> Scheduler { + fn new(event_loop: ~EventLoopObject, + work_queue: WorkQueue<~Coroutine>, + sleeper_list: SleeperList) + -> Scheduler { // Lazily initialize the runtime TLS key local_ptr::init_tls_key(); Scheduler { + sleeper_list: sleeper_list, event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), @@ -618,19 +628,23 @@ mod test { use container::Container; use vec::OwnedVector; use rt::rtio::RemoteCallback; + use rt::sleeper_list::SleeperList; do run_in_bare_thread { + let sleepers1 = SleeperList::new(); let work_queue1 = WorkQueue::new(); + + let sleepers2 = sleepers1.clone(); let work_queue2 = work_queue1.clone(); let loop1 = ~UvEventLoop::new(); - let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone()); + let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone(), sleepers1); let handle1 = sched1.make_handle(); let sched1_cell = Cell(sched1); let handle1_cell = Cell(handle1); let loop2 = ~UvEventLoop::new(); - let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone()); + let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone(), sleepers2); let handle2 = sched2.make_handle(); let sched2_cell = Cell(sched2); let handle2_cell = Cell(handle2); diff --git a/src/libcore/rt/sleeper_list.rs b/src/libcore/rt/sleeper_list.rs index 9507dec001d5..dfcac8eb088f 100644 --- a/src/libcore/rt/sleeper_list.rs +++ b/src/libcore/rt/sleeper_list.rs @@ -17,6 +17,7 @@ use option::{Option, Some, None}; use cell::Cell; use unstable::sync::{Exclusive, exclusive}; use rt::sched::{Scheduler, SchedHandle}; +use clone::Clone; pub struct SleeperList { priv stack: ~Exclusive<~[SchedHandle]> @@ -44,3 +45,11 @@ impl SleeperList { } } } + +impl Clone for SleeperList { + fn clone(&self) -> SleeperList { + SleeperList { + stack: self.stack.clone() + } + } +} \ No newline at end of file diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index 0e2da452366c..d6896f500343 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -21,8 +21,9 @@ use rt::sched::Scheduler; pub fn new_test_uv_sched() -> Scheduler { use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; + use rt::sleeper_list::SleeperList; - Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) + Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()) } /// Creates a new scheduler in a new thread and runs a task in it, From 5043ea269da73e96fbadc7c443aec01f087dabe9 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 28 May 2013 23:35:22 -0700 Subject: [PATCH 12/15] core::rt: Add run_in_mt_newsched_task test function --- src/libcore/rt/test.rs | 63 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index d6896f500343..a66e4f09fe72 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -9,14 +9,20 @@ // except according to those terms. use uint; -use option::*; +use option::{Option, Some, None}; use cell::Cell; +use clone::Clone; +use container::Container; +use vec::OwnedVector; use result::{Result, Ok, Err}; +use unstable::run_in_bare_thread; use super::io::net::ip::{IpAddr, Ipv4}; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; -use rt::sched::Scheduler; +use rt::sched::{Scheduler, Coroutine}; +use rt::sleeper_list::SleeperList; +use rt::work_queue::WorkQueue; pub fn new_test_uv_sched() -> Scheduler { use rt::uv::uvio::UvEventLoop; @@ -46,6 +52,59 @@ pub fn run_in_newsched_task(f: ~fn()) { } } +/// Create more than one scheduler and run a function in a task +/// in one of the schedulers. The schedulers will stay alive +/// until the function `f` returns. +pub fn run_in_mt_newsched_task(f: ~fn()) { + use rt::uv::uvio::UvEventLoop; + + let f_cell = Cell(f); + + do run_in_bare_thread { + static N: uint = 2; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, N) |i| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let handle = sched.make_handle(); + handles.push(handle); + scheds.push(sched); + } + + let f_cell = Cell(f_cell.take()); + let handles = handles; // Freeze + let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { + f_cell.take()(); + // Hold on to handles until the function exits. This keeps the schedulers alive. + let _captured_handles = &handles; + }; + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell(sched); + let thread = do Thread::start { + let mut sched = sched_cell.take(); + sched.run(); + }; + + threads.push(thread); + } + + // Wait for schedulers + let _threads = threads; + } +} + /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { use super::sched::*; From a373dad74d0bd89a9d5362bba1059d9cc25afb9a Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 15:55:23 -0700 Subject: [PATCH 13/15] core::rt: Outline the full multithreaded scheduling algo. Implement sleeping --- src/libcore/rt/message_queue.rs | 3 + src/libcore/rt/mod.rs | 1 + src/libcore/rt/sched.rs | 222 +++++++++++++++++++++----------- src/libcore/rt/test.rs | 17 ++- 4 files changed, 162 insertions(+), 81 deletions(-) diff --git a/src/libcore/rt/message_queue.rs b/src/libcore/rt/message_queue.rs index eaab9288ac8d..21711bbe84c7 100644 --- a/src/libcore/rt/message_queue.rs +++ b/src/libcore/rt/message_queue.rs @@ -8,6 +8,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! A concurrent queue that supports multiple producers and a +//! single consumer. + use container::Container; use kinds::Owned; use vec::OwnedVector; diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 75036dcd28f8..e23ad76a8c61 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -158,6 +158,7 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { let work_queue = WorkQueue::new(); let sleepers = SleeperList::new(); let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); + sched.no_sleep = true; let main_task = ~Coroutine::new(&mut sched.stack_pool, main); sched.enqueue_task(main_task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 2a99648fa045..c6d6bb9f39e5 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -12,6 +12,7 @@ use option::*; use sys; use cast::transmute; use cell::Cell; +use clone::Clone; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; @@ -19,9 +20,10 @@ use super::stack::{StackPool, StackSegment}; use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::Task; +use super::message_queue::MessageQueue; use rt::local_ptr; use rt::local::Local; -use rt::rtio::IoFactoryObject; +use rt::rtio::{IoFactoryObject, RemoteCallback}; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -31,9 +33,23 @@ pub struct Scheduler { /// A queue of available work. Under a work-stealing policy there /// is one per Scheduler. priv work_queue: WorkQueue<~Coroutine>, + /// The queue of incoming messages from other schedulers. + /// These are enqueued by SchedHandles after which a remote callback + /// is triggered to handle the message. + priv message_queue: MessageQueue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. priv sleeper_list: SleeperList, + /// Indicates that we have previously pushed a handle onto the + /// SleeperList but have not yet received the Wake message. + /// Being `true` does not necessarily mean that the scheduler is + /// not active since there are multiple event sources that may + /// wake the scheduler. It just prevents the scheduler from pushing + /// multiple handles onto the sleeper list. + priv sleepy: bool, + /// A flag to indicate we've received the shutdown message and should + /// no longer try to go to sleep, but exit instead. + no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, @@ -47,6 +63,11 @@ pub struct Scheduler { priv cleanup_job: Option } +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject, + priv queue: MessageQueue +} + pub struct Coroutine { /// The segment of stack on which the task is currently running or, /// if the task is blocked, on which the task will resume execution @@ -58,8 +79,9 @@ pub struct Coroutine { task: ~Task } -pub struct SchedHandle { - priv remote: ~RemoteCallbackObject +pub enum SchedMessage { + Wake, + Shutdown } enum CleanupJob { @@ -81,12 +103,15 @@ pub impl Scheduler { Scheduler { sleeper_list: sleeper_list, + message_queue: MessageQueue::new(), + sleepy: false, + no_sleep: false, event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, - cleanup_job: None + cleanup_job: None, } } @@ -116,17 +141,51 @@ pub impl Scheduler { return sched; } + fn run_sched_once() { + + let sched = Local::take::(); + if sched.interpret_message_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + let mut sched = Local::take::(); + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + let sched = Local::take::(); + if sched.resume_task_from_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + let mut sched = Local::take::(); + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + // If we got here then there was no work to do. + // Generate a SchedHandle and push it to the sleeper list so + // somebody can wake us up later. + rtdebug!("no work to do"); + let mut sched = Local::take::(); + if !sched.sleepy && !sched.no_sleep { + rtdebug!("sleeping"); + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping"); + } + Local::put(sched); + } + fn make_handle(&mut self) -> SchedHandle { - let remote = self.event_loop.remote_callback(wake_up); + let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); return SchedHandle { - remote: remote + remote: remote, + queue: self.message_queue.clone() }; - - fn wake_up() { - let sched = Local::take::(); - sched.resume_task_from_queue(); - } } /// Schedule a task to be executed later. @@ -136,17 +195,63 @@ pub impl Scheduler { /// directly. fn enqueue_task(&mut self, task: ~Coroutine) { self.work_queue.push(task); - self.event_loop.callback(resume_task_from_queue); + self.event_loop.callback(Scheduler::run_sched_once); - fn resume_task_from_queue() { - let sched = Local::take::(); - sched.resume_task_from_queue(); + // We've made work available. Notify a sleeping scheduler. + match self.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake) + } + None => (/* pass */) } } // * Scheduler-context operations - fn resume_task_from_queue(~self) { + fn interpret_message_queue(~self) -> bool { + assert!(!self.in_task_context()); + + rtdebug!("looking for scheduler messages"); + + let mut this = self; + match this.message_queue.pop() { + Some(Wake) => { + rtdebug!("recv Wake message"); + this.sleepy = false; + Local::put(this); + return true; + } + Some(Shutdown) => { + rtdebug!("recv Shutdown message"); + if this.sleepy { + // There may be an outstanding handle on the sleeper list. + // Pop them all to make sure that's not the case. + loop { + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake); + } + None => (/* pass */) + } + } + } + // No more sleeping. After there are no outstanding event loop + // references we will shut down. + this.no_sleep = true; + this.sleepy = false; + Local::put(this); + return true; + } + None => { + Local::put(this); + return false; + } + } + } + + fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); rtdebug!("looking in work queue for task to schedule"); @@ -156,10 +261,12 @@ pub impl Scheduler { Some(task) => { rtdebug!("resuming task from work queue"); this.resume_task_immediately(task); + return true; } None => { rtdebug!("no tasks in queue"); Local::put(this); + return false; } } } @@ -363,6 +470,13 @@ pub impl Scheduler { } } +impl SchedHandle { + pub fn send(&mut self, msg: SchedMessage) { + self.queue.push(msg); + self.remote.fire(); + } +} + pub impl Coroutine { fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { Coroutine::with_task(stack_pool, ~Task::new(), start) @@ -621,71 +735,25 @@ mod test { #[test] fn multithreading() { - use clone::Clone; - use iter::Times; - use rt::work_queue::WorkQueue; use rt::comm::*; - use container::Container; + use iter::Times; use vec::OwnedVector; - use rt::rtio::RemoteCallback; - use rt::sleeper_list::SleeperList; - - do run_in_bare_thread { - let sleepers1 = SleeperList::new(); - let work_queue1 = WorkQueue::new(); - - let sleepers2 = sleepers1.clone(); - let work_queue2 = work_queue1.clone(); - - let loop1 = ~UvEventLoop::new(); - let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone(), sleepers1); - let handle1 = sched1.make_handle(); - let sched1_cell = Cell(sched1); - let handle1_cell = Cell(handle1); - - let loop2 = ~UvEventLoop::new(); - let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone(), sleepers2); - let handle2 = sched2.make_handle(); - let sched2_cell = Cell(sched2); - let handle2_cell = Cell(handle2); - - let _thread1 = do Thread::start { - let mut sched1 = sched1_cell.take(); - sched1.run(); - }; - - let _thread2 = do Thread::start { - let mut sched2 = sched2_cell.take(); - let handle1_cell = Cell(handle1_cell.take()); - let handle2_cell = Cell(handle2_cell.take()); - - let task = ~do Coroutine::new(&mut sched2.stack_pool) { - // Hold handles to keep the schedulers alive - let mut handle1 = handle1_cell.take(); - let mut handle2 = handle2_cell.take(); - - let mut ports = ~[]; - for 10.times { - let (port, chan) = oneshot(); - let chan_cell = Cell(chan); - do spawntask_later { - chan_cell.take().send(()); - } - ports.push(port); - - // Make sure the other scheduler is awake - handle1.remote.fire(); - handle2.remote.fire(); - } + use container::Container; - while !ports.is_empty() { - ports.pop().recv(); - } - }; + do run_in_mt_newsched_task { + let mut ports = ~[]; + for 10.times { + let (port, chan) = oneshot(); + let chan_cell = Cell(chan); + do spawntask_later { + chan_cell.take().send(()); + } + ports.push(port); + } - sched2.enqueue_task(task); - sched2.run(); - }; + while !ports.is_empty() { + ports.pop().recv(); + } } } } diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index a66e4f09fe72..1bbfe8d473db 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -13,6 +13,7 @@ use option::{Option, Some, None}; use cell::Cell; use clone::Clone; use container::Container; +use old_iter::MutableIter; use vec::OwnedVector; use result::{Result, Ok, Err}; use unstable::run_in_bare_thread; @@ -29,7 +30,10 @@ pub fn new_test_uv_sched() -> Scheduler { use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; - Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()) + let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message + sched.no_sleep = true; + return sched; } /// Creates a new scheduler in a new thread and runs a task in it, @@ -57,6 +61,7 @@ pub fn run_in_newsched_task(f: ~fn()) { /// until the function `f` returns. pub fn run_in_mt_newsched_task(f: ~fn()) { use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; let f_cell = Cell(f); @@ -78,11 +83,15 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } let f_cell = Cell(f_cell.take()); - let handles = handles; // Freeze + let handles = Cell(handles); let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { f_cell.take()(); - // Hold on to handles until the function exits. This keeps the schedulers alive. - let _captured_handles = &handles; + + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.each_mut |handle| { + handle.send(Shutdown); + } }; scheds[0].enqueue_task(main_task); From f343e6172b7132545c72e3e09e6afccc06fdcee7 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 17:25:29 -0700 Subject: [PATCH 14/15] core::rt: Fix an infinite recursion bug --- src/libcore/rt/comm.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs index 576a402b7091..d108e20347a0 100644 --- a/src/libcore/rt/comm.rs +++ b/src/libcore/rt/comm.rs @@ -22,6 +22,7 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; +use rt::rtio::EventLoop; use unstable::intrinsics::{atomic_xchg, atomic_load}; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; @@ -172,9 +173,17 @@ impl PortOne { } STATE_ONE => { // Channel is closed. Switch back and check the data. + // NB: We have to drop back into the scheduler event loop here + // instead of switching immediately back or we could end up + // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); - let sched = Local::take::(); - sched.resume_task_immediately(task); + let task = Cell(task); + let mut sched = Local::take::(); + do sched.event_loop.callback { + let sched = Local::take::(); + sched.resume_task_immediately(task.take()); + } + Local::put(sched); } _ => util::unreachable() } @@ -614,5 +623,15 @@ mod test { } } } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + do run_in_newsched_task { + let (port, chan) = stream(); + for 10000.times { chan.send(()) } + for 10000.times { port.recv() } + } + } } From 134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 17:52:00 -0700 Subject: [PATCH 15/15] core::rt: Change the signature of context switching methods to avoid infinite recursion --- src/libcore/rt/comm.rs | 4 +-- src/libcore/rt/mod.rs | 7 ++--- src/libcore/rt/sched.rs | 63 +++++++++++++++++++-------------------- src/libcore/rt/test.rs | 32 ++++++-------------- src/libcore/rt/tube.rs | 34 +++++++++------------ src/libcore/rt/uv/uvio.rs | 26 +++++++--------- 6 files changed, 66 insertions(+), 100 deletions(-) diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs index d108e20347a0..8ff3887f779c 100644 --- a/src/libcore/rt/comm.rs +++ b/src/libcore/rt/comm.rs @@ -159,7 +159,7 @@ impl PortOne { // Switch to the scheduler to put the ~Task into the Packet state. let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { unsafe { // Atomically swap the task pointer into the Packet state, issuing // an acquire barrier to prevent reordering of the subsequent read @@ -178,12 +178,10 @@ impl PortOne { // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); let task = Cell(task); - let mut sched = Local::take::(); do sched.event_loop.callback { let sched = Local::take::(); sched.resume_task_immediately(task.take()); } - Local::put(sched); } _ => util::unreachable() } diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index e23ad76a8c61..1113d7abe7dc 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -238,12 +238,9 @@ fn test_context() { let task = ~do Coroutine::new(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { assert_eq!(context(), SchedulerContext); - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + sched.enqueue_task(task); } }; sched.enqueue_task(task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index c6d6bb9f39e5..089c95cd7cd5 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -280,11 +280,9 @@ pub impl Scheduler { rtdebug!("ending running task"); - do self.deschedule_running_task_and_then |dead_task| { + do self.deschedule_running_task_and_then |sched, dead_task| { let dead_task = Cell(dead_task); - do Local::borrow:: |sched| { - dead_task.take().recycle(&mut sched.stack_pool); - } + dead_task.take().recycle(&mut sched.stack_pool); } abort!("control reached end of task"); @@ -293,22 +291,18 @@ pub impl Scheduler { fn schedule_new_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow:: |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } fn schedule_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow:: |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } @@ -352,7 +346,11 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) { + /// + /// This passes a Scheduler pointer to the fn after the context switch + /// in order to prevent that fn from performing further scheduling operations. + /// Doing further scheduling could easily result in infinite recursion. + fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); @@ -360,7 +358,8 @@ pub impl Scheduler { unsafe { let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f); + let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f); let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); } @@ -382,14 +381,18 @@ pub impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) { + fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, + f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); rtdebug!("switching tasks"); let old_running_task = this.current_task.swap_unwrap(); - let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) }; + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f) + }; let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); this.current_task = Some(next_task); @@ -426,7 +429,7 @@ pub impl Scheduler { let cleanup_job = self.cleanup_job.swap_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => (f.to_fn())(task) + GiveTask(task, f) => (f.to_fn())(self, task) } } @@ -535,12 +538,12 @@ pub impl Coroutine { // complaining type UnsafeTaskReceiver = sys::Closure; trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); + fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } } } #[cfg(test)] @@ -604,11 +607,9 @@ mod test { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |task1| { + do sched.switch_running_tasks_and_then(task2) |sched, task1| { let task1 = Cell(task1); - do Local::borrow:: |sched| { - sched.enqueue_task(task1.take()); - } + sched.enqueue_task(task1.take()); } unsafe { *count_ptr = *count_ptr + 1; } }; @@ -658,12 +659,10 @@ mod test { let task = ~do Coroutine::new(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { let task = Cell(task); - do Local::borrow:: |sched| { - assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); - } + assert!(!sched.in_task_context()); + sched.enqueue_task(task.take()); } }; sched.enqueue_task(task); @@ -680,8 +679,7 @@ mod test { do run_in_newsched_task { do spawn { let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { - let mut sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { let task = Cell(task); do sched.event_loop.callback_ms(10) { rtdebug!("in callback"); @@ -689,7 +687,6 @@ mod test { sched.enqueue_task(task.take()); Local::put(sched); } - Local::put(sched); } } } diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index 1bbfe8d473db..16b0aef5e266 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -122,11 +122,7 @@ pub fn spawntask(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - let sched = Local::take::(); - sched.schedule_new_task(task.take()); - } + sched.schedule_new_task(task); } /// Create a new task and run it right now. Aborts on failure @@ -137,11 +133,8 @@ pub fn spawntask_immediately(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } @@ -172,11 +165,8 @@ pub fn spawntask_random(f: ~fn()) { f); if run_now { - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } else { sched.enqueue_task(task); @@ -199,10 +189,9 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Switch to the scheduler let f = Cell(Cell(f)); let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |old_task| { + do sched.deschedule_running_task_and_then() |sched, old_task| { let old_task = Cell(old_task); let f = f.take(); - let mut sched = Local::take::(); let new_task = ~do Coroutine::new(&mut sched.stack_pool) { do (|| { (f.take())() @@ -210,16 +199,13 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Check for failure then resume the parent task unsafe { *failed_ptr = task::failing(); } let sched = Local::take::(); - do sched.switch_running_tasks_and_then(old_task.take()) |new_task| { - let new_task = Cell(new_task); - do Local::borrow:: |sched| { - sched.enqueue_task(new_task.take()); - } + do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| { + sched.enqueue_task(new_task); } } }; - sched.resume_task_immediately(new_task); + sched.enqueue_task(new_task); } if !failed { Ok(()) } else { Err(()) } diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs index b2f475a69660..4482a92d916a 100644 --- a/src/libcore/rt/tube.rs +++ b/src/libcore/rt/tube.rs @@ -72,7 +72,7 @@ impl Tube { assert!(self.p.refcount() > 1); // There better be somebody to wake us up assert!((*state).blocked_task.is_none()); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |_, task| { (*state).blocked_task = Some(task); } rtdebug!("waking after tube recv"); @@ -107,11 +107,10 @@ mod test { let tube_clone = tube.clone(); let tube_clone_cell = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { let mut tube_clone = tube_clone_cell.take(); tube_clone.send(1); - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -123,21 +122,17 @@ mod test { do run_in_newsched_task { let mut tube: Tube = Tube::new(); let tube_clone = tube.clone(); - let tube_clone = Cell(Cell(Cell(tube_clone))); + let tube_clone = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { - let tube_clone = tube_clone.take(); - do Local::borrow:: |sched| { - let tube_clone = tube_clone.take(); - do sched.event_loop.callback { - let mut tube_clone = tube_clone.take(); - // The task should be blocked on this now and - // sending will wake it up. - tube_clone.send(1); - } + do sched.deschedule_running_task_and_then |sched, task| { + let tube_clone = Cell(tube_clone.take()); + do sched.event_loop.callback { + let mut tube_clone = tube_clone.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube_clone.send(1); } - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -153,7 +148,7 @@ mod test { let tube_clone = tube.clone(); let tube_clone = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { callback_send(tube_clone.take(), 0); fn callback_send(tube: Tube, i: int) { @@ -172,8 +167,7 @@ mod test { } } - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } for int::range(0, MAX) |i| { diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index e25b6140abbf..1ee6504d11fc 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -205,12 +205,10 @@ impl IoFactory for UvIoFactory { assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("connect: entered scheduler context"); - do Local::borrow:: |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell(task); @@ -250,7 +248,7 @@ impl IoFactory for UvIoFactory { Ok(_) => Ok(~UvTcpListener::new(watcher)), Err(uverr) => { let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::(); @@ -286,7 +284,7 @@ impl Drop for UvTcpListener { fn finalize(&self) { let watcher = self.watcher(); let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::(); @@ -348,7 +346,7 @@ impl Drop for UvTcpStream { rtdebug!("closing tcp stream"); let watcher = self.watcher(); let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.close { let scheduler = Local::take::(); @@ -367,11 +365,9 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("read: entered scheduler context"); - do Local::borrow:: |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut watcher = watcher; let task_cell = Cell(task); // XXX: We shouldn't reallocate these callbacks every @@ -413,7 +409,7 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let mut watcher = watcher; let task_cell = Cell(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; @@ -507,11 +503,9 @@ fn test_read_and_block() { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { let task = Cell(task); - do Local::borrow:: |scheduler| { - scheduler.enqueue_task(task.take()); - } + sched.enqueue_task(task.take()); } }