From 29d83002a27e6f47759b4a3bfe741fb061107816 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 20 May 2013 16:43:31 -0700 Subject: [PATCH 01/32] core::rt: Move uv idle tests to idle mod --- src/libcore/rt/uv/idle.rs | 62 +++++++++++++++++++++++++++++++++++++++ src/libcore/rt/uv/mod.rs | 54 ---------------------------------- 2 files changed, 62 insertions(+), 54 deletions(-) diff --git a/src/libcore/rt/uv/idle.rs b/src/libcore/rt/uv/idle.rs index 2cf0b5c487288..a81ab48696a36 100644 --- a/src/libcore/rt/uv/idle.rs +++ b/src/libcore/rt/uv/idle.rs @@ -89,3 +89,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { match self { &IdleWatcher(ptr) => ptr } } } + +#[cfg(test)] +mod test { + + use rt::uv::Loop; + use super::*; + use unstable::run_in_bare_thread; + + #[test] + #[ignore(reason = "valgrind - loop destroyed before watcher?")] + fn idle_new_then_close() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let idle_watcher = { IdleWatcher::new(&mut loop_) }; + idle_watcher.close(||()); + } + } + + #[test] + fn idle_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + let mut count = 10; + let count_ptr: *mut int = &mut count; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + if unsafe { *count_ptr == 10 } { + idle_watcher.stop(); + idle_watcher.close(||()); + } else { + unsafe { *count_ptr = *count_ptr + 1; } + } + } + loop_.run(); + loop_.close(); + assert_eq!(count, 10); + } + } + + #[test] + fn idle_start_stop_start() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + idle_watcher.stop(); + do idle_watcher.start |idle_watcher, status| { + assert!(status.is_none()); + let mut idle_watcher = idle_watcher; + idle_watcher.stop(); + idle_watcher.close(||()); + } + } + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 2bd657fd8641f..8cc596b2876d9 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -364,57 +364,3 @@ fn loop_smoke_test() { loop_.close(); } } - -#[test] -#[ignore(reason = "valgrind - loop destroyed before watcher?")] -fn idle_new_then_close() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let idle_watcher = { IdleWatcher::new(&mut loop_) }; - idle_watcher.close(||()); - } -} - -#[test] -fn idle_smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - let mut count = 10; - let count_ptr: *mut int = &mut count; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - if unsafe { *count_ptr == 10 } { - idle_watcher.stop(); - idle_watcher.close(||()); - } else { - unsafe { *count_ptr = *count_ptr + 1; } - } - } - loop_.run(); - loop_.close(); - assert_eq!(count, 10); - } -} - -#[test] -fn idle_start_stop_start() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - idle_watcher.stop(); - do idle_watcher.start |idle_watcher, status| { - assert!(status.is_none()); - let mut idle_watcher = idle_watcher; - idle_watcher.stop(); - idle_watcher.close(||()); - } - } - loop_.run(); - loop_.close(); - } -} From 807269041437411df49a9a893c86310283d6eb91 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 20 May 2013 18:16:09 -0700 Subject: [PATCH 02/32] core::rt: Add bindings for async uv handles --- src/libcore/rt/uv/async.rs | 105 +++++++++++++++++++++++++++++++++++++ src/libcore/rt/uv/mod.rs | 9 +++- 2 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 src/libcore/rt/uv/async.rs diff --git a/src/libcore/rt/uv/async.rs b/src/libcore/rt/uv/async.rs new file mode 100644 index 0000000000000..0d032f512d38b --- /dev/null +++ b/src/libcore/rt/uv/async.rs @@ -0,0 +1,105 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::{c_int, c_void}; +use option::Some; +use rt::uv::uvll; +use rt::uv::uvll::UV_ASYNC; +use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback}; +use rt::uv::WatcherInterop; +use rt::uv::status_to_maybe_uv_error; + +pub struct AsyncWatcher(*uvll::uv_async_t); +impl Watcher for AsyncWatcher { } + +impl AsyncWatcher { + fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { + unsafe { + let handle = uvll::malloc_handle(UV_ASYNC); + assert!(handle.is_not_null()); + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + let data = watcher.get_watcher_data(); + data.async_cb = Some(cb); + assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb)); + return watcher; + } + + extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + let status = status_to_maybe_uv_error(watcher.native_handle(), status); + let data = watcher.get_watcher_data(); + let cb = data.async_cb.get_ref(); + (*cb)(watcher, status); + } + } + + fn send(&mut self) { + unsafe { + let handle = self.native_handle(); + uvll::async_send(handle); + } + } + + fn close(self, cb: NullCallback) { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + + unsafe { + uvll::close(self.native_handle(), close_cb); + } + + extern fn close_cb(handle: *uvll::uv_stream_t) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + { + let data = watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); + } + watcher.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void); } + } + } +} + +impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher { + fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher { + AsyncWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_async_t { + match self { &AsyncWatcher(ptr) => ptr } + } +} + +#[cfg(test)] +mod test { + + use super::*; + use rt::uv::Loop; + use unstable::run_in_bare_thread; + use rt::thread::Thread; + use cell::Cell; + + #[test] + fn smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) ); + let watcher_cell = Cell(watcher); + let _thread = do Thread::start { + let mut watcher = watcher_cell.take(); + watcher.send(); + }; + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 8cc596b2876d9..5f9e56608149f 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -57,6 +57,7 @@ pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; +pub use self::async::AsyncWatcher; /// The implementation of `rtio` for libuv pub mod uvio; @@ -68,6 +69,7 @@ pub mod file; pub mod net; pub mod idle; pub mod timer; +pub mod async; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -125,6 +127,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(FsRequest, Option); pub type TimerCallback = ~fn(TimerWatcher, Option); +pub type AsyncCallback = ~fn(AsyncWatcher, Option); /// Callbacks used by StreamWatchers, set as custom data on the foreign handle @@ -135,7 +138,8 @@ struct WatcherData { close_cb: Option, alloc_cb: Option, idle_cb: Option, - timer_cb: Option + timer_cb: Option, + async_cb: Option } pub trait WatcherInterop { @@ -164,7 +168,8 @@ impl> WatcherInterop for W { close_cb: None, alloc_cb: None, idle_cb: None, - timer_cb: None + timer_cb: None, + async_cb: None }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); From 6d8d73cfc4cba2fdb2ee67448df39d89be08ce69 Mon Sep 17 00:00:00 2001 From: James Miller Date: Tue, 21 May 2013 17:31:24 +1200 Subject: [PATCH 03/32] Add AtomicUint newtype --- src/libcore/unstable/sync.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/libcore/unstable/sync.rs b/src/libcore/unstable/sync.rs index 734368c70c4a0..7c228ff564778 100644 --- a/src/libcore/unstable/sync.rs +++ b/src/libcore/unstable/sync.rs @@ -205,6 +205,26 @@ extern { fn rust_unlock_little_lock(lock: rust_little_lock); } +/* *********************************************************************/ + +//FIXME: #5042 This should be replaced by proper atomic type +pub struct AtomicUint(uint); +pub impl AtomicUint { + fn load(&self) -> uint { + unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint } + } + fn store(&mut self, val:uint) { + unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); } + } + fn add(&mut self, val:int) -> uint { + unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint } + } + fn cas(&self, old:uint, new:uint) -> uint { + unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint } + } +} + + #[cfg(test)] mod tests { use comm; From 8f77a6f422184dacc14ae1b6a042c321e06bef88 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 21 May 2013 17:36:59 -0700 Subject: [PATCH 04/32] core: Add AtomicInt and cleanup --- src/libcore/unstable/sync.rs | 61 ++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/src/libcore/unstable/sync.rs b/src/libcore/unstable/sync.rs index 7c228ff564778..6085ca1a482ee 100644 --- a/src/libcore/unstable/sync.rs +++ b/src/libcore/unstable/sync.rs @@ -208,25 +208,50 @@ extern { /* *********************************************************************/ //FIXME: #5042 This should be replaced by proper atomic type -pub struct AtomicUint(uint); -pub impl AtomicUint { - fn load(&self) -> uint { +pub struct AtomicUint { + priv inner: uint +} + +impl AtomicUint { + pub fn new(val: uint) -> AtomicUint { AtomicUint { inner: val } } + pub fn load(&self) -> uint { unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint } } - fn store(&mut self, val:uint) { + pub fn store(&mut self, val: uint) { unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); } } - fn add(&mut self, val:int) -> uint { + pub fn add(&mut self, val: int) -> uint { unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint } } - fn cas(&self, old:uint, new:uint) -> uint { + pub fn cas(&mut self, old:uint, new: uint) -> uint { unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint } } } +pub struct AtomicInt { + priv inner: int +} + +impl AtomicInt { + pub fn new(val: int) -> AtomicInt { AtomicInt { inner: val } } + pub fn load(&self) -> int { + unsafe { intrinsics::atomic_load(&self.inner) } + } + pub fn store(&mut self, val: int) { + unsafe { intrinsics::atomic_store(&mut self.inner, val); } + } + pub fn add(&mut self, val: int) -> int { + unsafe { intrinsics::atomic_xadd(&mut self.inner, val) } + } + pub fn cas(&mut self, old: int, new: int) -> int { + unsafe { intrinsics::atomic_cxchg(&mut self.inner, old, new) } + } +} + #[cfg(test)] mod tests { + use super::*; use comm; use super::exclusive; use task; @@ -278,4 +303,28 @@ mod tests { assert_eq!(*one, 1); } } + + #[test] + fn atomic_int_smoke_test() { + let mut i = AtomicInt::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } + + #[test] + fn atomic_uint_smoke_test() { + let mut i = AtomicUint::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } } From a0cd55a1d7436dc9532ddf5cdad7d1f7e8f108f3 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 20 May 2013 18:23:56 -0700 Subject: [PATCH 05/32] core::rt: Add RemoteCallback trait and uv implementation This is used for signalling the event loop from other threads. --- src/libcore/rt/rtio.rs | 11 +++++ src/libcore/rt/uv/async.rs | 6 +-- src/libcore/rt/uv/uvio.rs | 86 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs index 4b5eda22ff5de..fa657555f3aa0 100644 --- a/src/libcore/rt/rtio.rs +++ b/src/libcore/rt/rtio.rs @@ -18,6 +18,7 @@ use rt::uv::uvio; // XXX: ~object doesn't work currently so these are some placeholder // types to use instead pub type EventLoopObject = uvio::UvEventLoop; +pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; @@ -26,10 +27,20 @@ pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn callback_ms(&mut self, ms: u64, ~fn()); + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; /// The asynchronous I/O services. Not all event loops may provide one fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; } +pub trait RemoteCallback { + /// Trigger the remote callback. Note that the number of times the callback + /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire', + /// the callback will be called at least once, but multiple callbacks may be coalesced + /// and callbacks may be called more often requested. Destruction also triggers the + /// callback. + fn fire(&mut self); +} + pub trait IoFactory { fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; diff --git a/src/libcore/rt/uv/async.rs b/src/libcore/rt/uv/async.rs index 0d032f512d38b..6ed06cc10b78a 100644 --- a/src/libcore/rt/uv/async.rs +++ b/src/libcore/rt/uv/async.rs @@ -20,7 +20,7 @@ pub struct AsyncWatcher(*uvll::uv_async_t); impl Watcher for AsyncWatcher { } impl AsyncWatcher { - fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { + pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { unsafe { let handle = uvll::malloc_handle(UV_ASYNC); assert!(handle.is_not_null()); @@ -41,14 +41,14 @@ impl AsyncWatcher { } } - fn send(&mut self) { + pub fn send(&mut self) { unsafe { let handle = self.native_handle(); uvll::async_send(handle); } } - fn close(self, cb: NullCallback) { + pub fn close(self, cb: NullCallback) { let mut this = self; let data = this.get_watcher_data(); assert!(data.close_cb.is_none()); diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index cacd67314ebac..cf1bd568d0288 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -12,6 +12,7 @@ use option::*; use result::*; use ops::Drop; use cell::{Cell, empty_cell}; +use cast; use cast::transmute; use clone::Clone; use rt::io::IoError; @@ -23,6 +24,8 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; +use unstable::sync::{UnsafeAtomicRcBox, AtomicInt}; +use unstable::intrinsics; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -82,6 +85,10 @@ impl EventLoop for UvEventLoop { } } + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + } + fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { Some(&mut self.uvio) } @@ -101,6 +108,85 @@ fn test_callback_run_once() { } } +pub struct UvRemoteCallback { + // The uv async handle for triggering the callback + async: AsyncWatcher, + // An atomic flag to tell the callback to exit, + // set from the dtor. + exit_flag: UnsafeAtomicRcBox +} + +impl UvRemoteCallback { + pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback { + let exit_flag = UnsafeAtomicRcBox::new(AtomicInt::new(0)); + let exit_flag_clone = exit_flag.clone(); + let async = do AsyncWatcher::new(loop_) |watcher, status| { + assert!(status.is_none()); + f(); + let exit_flag_ptr = exit_flag_clone.get(); + unsafe { + if (*exit_flag_ptr).load() == 1 { + watcher.close(||()); + } + } + }; + UvRemoteCallback { + async: async, + exit_flag: exit_flag + } + } +} + +impl RemoteCallback for UvRemoteCallback { + fn fire(&mut self) { self.async.send() } +} + +impl Drop for UvRemoteCallback { + fn finalize(&self) { + unsafe { + let mut this: &mut UvRemoteCallback = cast::transmute_mut(self); + let exit_flag_ptr = this.exit_flag.get(); + (*exit_flag_ptr).store(1); + this.async.send(); + } + } +} + +#[cfg(test)] +mod test_remote { + use super::*; + use cell; + use cell::Cell; + use rt::test::*; + use rt::thread::Thread; + use rt::tube::Tube; + use rt::rtio::EventLoop; + use rt::local::Local; + use rt::sched::Scheduler; + + #[test] + fn test_uv_remote() { + do run_in_newsched_task { + let mut tube = Tube::new(); + let tube_clone = tube.clone(); + let remote_cell = cell::empty_cell(); + do Local::borrow::() |sched| { + let tube_clone = tube_clone.clone(); + let tube_clone_cell = Cell(tube_clone); + let remote = do sched.event_loop.remote_callback { + tube_clone_cell.take().send(1); + }; + remote_cell.put_back(remote); + } + let _thread = do Thread::start { + remote_cell.take().fire(); + }; + + assert!(tube.recv() == 1); + } + } +} + pub struct UvIoFactory(Loop); pub impl UvIoFactory { From 41c21685dd149fb95dededfb4edaf87c6603c099 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 22 May 2013 15:39:39 -0700 Subject: [PATCH 06/32] core::rt: Add SchedHandle type --- src/libcore/rt/sched.rs | 101 +++++++++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 50c6a894093f3..3f7b332e184b1 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -15,7 +15,7 @@ use cell::Cell; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::Task; use rt::local_ptr; @@ -41,16 +41,19 @@ pub struct Scheduler { priv cleanup_job: Option } -// XXX: Some hacks to put a &fn in Scheduler without borrowck -// complaining -type UnsafeTaskReceiver = sys::Closure; -trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); +pub struct Coroutine { + /// The segment of stack on which the task is currently running or, + /// if the task is blocked, on which the task will resume execution + priv current_stack_segment: StackSegment, + /// These are always valid when the task is not running, unless + /// the task is dead + priv saved_context: Context, + /// The heap, GC, unwinding, local storage, logging + task: ~Task } -impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject } enum CleanupJob { @@ -103,6 +106,17 @@ pub impl Scheduler { return sched; } + fn make_handle(&mut self) -> SchedHandle { + let remote = self.event_loop.remote_callback(wake_up); + + return SchedHandle { + remote: remote + }; + + fn wake_up() { + } + } + /// Schedule a task to be executed later. /// /// Pushes the task onto the work stealing queue and tells the event loop @@ -337,19 +351,6 @@ pub impl Scheduler { } } -static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack - -pub struct Coroutine { - /// The segment of stack on which the task is currently running or, - /// if the task is blocked, on which the task will resume execution - priv current_stack_segment: StackSegment, - /// These are always valid when the task is not running, unless - /// the task is dead - priv saved_context: Context, - /// The heap, GC, unwinding, local storage, logging - task: ~Task -} - pub impl Coroutine { fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { Coroutine::with_task(stack_pool, ~Task::new(), start) @@ -358,6 +359,9 @@ pub impl Coroutine { fn with_task(stack_pool: &mut StackPool, task: ~Task, start: ~fn()) -> Coroutine { + + static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack + let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn @@ -401,6 +405,18 @@ pub impl Coroutine { } } +// XXX: Some hacks to put a &fn in Scheduler without borrowck +// complaining +type UnsafeTaskReceiver = sys::Closure; +trait ClosureConverter { + fn from_fn(&fn(~Coroutine)) -> Self; + fn to_fn(self) -> &fn(~Coroutine); +} +impl ClosureConverter for UnsafeTaskReceiver { + fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } +} + #[cfg(test)] mod test { use int; @@ -411,6 +427,7 @@ mod test { use rt::local::Local; use rt::test::*; use super::*; + use rt::thread::Thread; #[test] fn test_simple_scheduling() { @@ -551,4 +568,42 @@ mod test { } } } + + #[test] + fn handle() { + use rt::comm::*; + + do run_in_bare_thread { + let (port, chan) = oneshot::<()>(); + let port_cell = Cell(port); + let chan_cell = Cell(chan); + let mut sched1 = ~UvEventLoop::new_scheduler(); + let handle1 = sched1.make_handle(); + let handle1_cell = Cell(handle1); + let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { + chan_cell.take().send(()); + }; + sched1.enqueue_task(task1); + + let mut sched2 = ~UvEventLoop::new_scheduler(); + let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { + port_cell.take().recv(); + // Release the other scheduler's handle so it can exit + handle1_cell.take(); + }; + sched2.enqueue_task(task2); + + let sched1_cell = Cell(sched1); + let _thread1 = do Thread::start { + let mut sched1 = sched1_cell.take(); + sched1.run(); + }; + + let sched2_cell = Cell(sched2); + let _thread2 = do Thread::start { + let mut sched2 = sched2_cell.take(); + sched2.run(); + }; + } + } } From 8b7e392752eddc202dae12c6b89b7c59556990ce Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 22 May 2013 21:20:19 -0700 Subject: [PATCH 07/32] core::rt: Scheduler takes a WorkQueue This will be for implementing a work-sharing strategy --- src/libcore/rt/mod.rs | 4 +++- src/libcore/rt/sched.rs | 4 ++-- src/libcore/rt/uv/uvio.rs | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 2fac1df01a495..b6abab38da791 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -145,12 +145,14 @@ pub mod thread_local_storage; pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { use self::sched::{Scheduler, Coroutine}; + use self::work_queue::WorkQueue; use self::uv::uvio::UvEventLoop; init(crate_map); let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_); + let work_queue = WorkQueue::new(); + let mut sched = ~Scheduler::new(loop_, work_queue); let main_task = ~Coroutine::new(&mut sched.stack_pool, main); sched.enqueue_task(main_task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 3f7b332e184b1..f1670d4896a49 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -65,14 +65,14 @@ pub impl Scheduler { fn in_task_context(&self) -> bool { self.current_task.is_some() } - fn new(event_loop: ~EventLoopObject) -> Scheduler { + fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Coroutine>) -> Scheduler { // Lazily initialize the runtime TLS key local_ptr::init_tls_key(); Scheduler { event_loop: event_loop, - work_queue: WorkQueue::new(), + work_queue: work_queue, stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index cf1bd568d0288..793a341bffbfb 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -24,6 +24,7 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; +use rt::work_queue::WorkQueue; use unstable::sync::{UnsafeAtomicRcBox, AtomicInt}; use unstable::intrinsics; @@ -45,7 +46,7 @@ pub impl UvEventLoop { /// A convenience constructor fn new_scheduler() -> Scheduler { - Scheduler::new(~UvEventLoop::new()) + Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) } } From 7f107c415f1c88b016b9da0fa9c58e6b61f82589 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 22 May 2013 22:18:29 -0700 Subject: [PATCH 08/32] core::rt: Remove UvEventLoop::new_scheduler function --- src/libcore/rt/local.rs | 9 +++++---- src/libcore/rt/mod.rs | 3 ++- src/libcore/rt/sched.rs | 14 +++++++------- src/libcore/rt/test.rs | 12 ++++++++++-- src/libcore/rt/uv/uvio.rs | 5 ----- 5 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/libcore/rt/local.rs b/src/libcore/rt/local.rs index 64a384ddff0b9..b4ecf9cd06162 100644 --- a/src/libcore/rt/local.rs +++ b/src/libcore/rt/local.rs @@ -85,30 +85,31 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { + use rt::test::*; use rt::sched::Scheduler; use rt::uv::uvio::UvEventLoop; use super::*; #[test] fn thread_local_scheduler_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn thread_local_scheduler_two_instances() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn borrow_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); unsafe { let _scheduler: *mut Scheduler = Local::unsafe_borrow(); diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index b6abab38da791..f136732c00b93 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -223,11 +223,12 @@ fn test_context() { use rt::uv::uvio::UvEventLoop; use cell::Cell; use rt::local::Local; + use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index f1670d4896a49..78c5da08c39ba 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -435,7 +435,7 @@ mod test { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; @@ -452,7 +452,7 @@ mod test { let mut task_count = 0; let task_count_ptr: *mut int = &mut task_count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { let task = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } @@ -470,7 +470,7 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task1 = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::(); @@ -499,7 +499,7 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let start_task = ~do Coroutine::new(&mut sched.stack_pool) { run_task(count_ptr); @@ -528,7 +528,7 @@ mod test { #[test] fn test_block_task() { do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); @@ -577,7 +577,7 @@ mod test { let (port, chan) = oneshot::<()>(); let port_cell = Cell(port); let chan_cell = Cell(chan); - let mut sched1 = ~UvEventLoop::new_scheduler(); + let mut sched1 = ~new_test_uv_sched(); let handle1 = sched1.make_handle(); let handle1_cell = Cell(handle1); let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { @@ -585,7 +585,7 @@ mod test { }; sched1.enqueue_task(task1); - let mut sched2 = ~UvEventLoop::new_scheduler(); + let mut sched2 = ~new_test_uv_sched(); let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { port_cell.take().recv(); // Release the other scheduler's handle so it can exit diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index c60ae2bfeffc8..0e2da452366cf 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -16,6 +16,14 @@ use super::io::net::ip::{IpAddr, Ipv4}; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; +use rt::sched::Scheduler; + +pub fn new_test_uv_sched() -> Scheduler { + use rt::uv::uvio::UvEventLoop; + use rt::work_queue::WorkQueue; + + Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) +} /// Creates a new scheduler in a new thread and runs a task in it, /// then waits for the scheduler to exit. Failure of the task @@ -28,7 +36,7 @@ pub fn run_in_newsched_task(f: ~fn()) { let f = Cell(f); do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f.take()); @@ -155,7 +163,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread { let f = Cell(f); let thread = do Thread::start { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f.take()); diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index 793a341bffbfb..e25b6140abbfd 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -43,11 +43,6 @@ pub impl UvEventLoop { uvio: UvIoFactory(Loop::new()) } } - - /// A convenience constructor - fn new_scheduler() -> Scheduler { - Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) - } } impl Drop for UvEventLoop { From 3f8095e55043f35e08adba5fe5b0a2d687ebc514 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 23 May 2013 00:04:50 -0700 Subject: [PATCH 09/32] core::rt: Add a very basic multi-threaded scheduling test --- src/libcore/rt/sched.rs | 72 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 78c5da08c39ba..e78d50beebe1b 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -114,6 +114,8 @@ pub impl Scheduler { }; fn wake_up() { + let sched = Local::take::(); + sched.resume_task_from_queue(); } } @@ -127,8 +129,8 @@ pub impl Scheduler { self.event_loop.callback(resume_task_from_queue); fn resume_task_from_queue() { - let scheduler = Local::take::(); - scheduler.resume_task_from_queue(); + let sched = Local::take::(); + sched.resume_task_from_queue(); } } @@ -606,4 +608,70 @@ mod test { }; } } + + #[test] + fn multithreading() { + use clone::Clone; + use iter::Times; + use rt::work_queue::WorkQueue; + use rt::comm::*; + use container::Container; + use vec::OwnedVector; + use rt::rtio::RemoteCallback; + + do run_in_bare_thread { + let work_queue1 = WorkQueue::new(); + let work_queue2 = work_queue1.clone(); + + let loop1 = ~UvEventLoop::new(); + let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone()); + let handle1 = sched1.make_handle(); + let sched1_cell = Cell(sched1); + let handle1_cell = Cell(handle1); + + let loop2 = ~UvEventLoop::new(); + let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone()); + let handle2 = sched2.make_handle(); + let sched2_cell = Cell(sched2); + let handle2_cell = Cell(handle2); + + let _thread1 = do Thread::start { + let mut sched1 = sched1_cell.take(); + sched1.run(); + }; + + let _thread2 = do Thread::start { + let mut sched2 = sched2_cell.take(); + let handle1_cell = Cell(handle1_cell.take()); + let handle2_cell = Cell(handle2_cell.take()); + + let task = ~do Coroutine::new(&mut sched2.stack_pool) { + // Hold handles to keep the schedulers alive + let mut handle1 = handle1_cell.take(); + let mut handle2 = handle2_cell.take(); + + let mut ports = ~[]; + for 10.times { + let (port, chan) = oneshot(); + let chan_cell = Cell(chan); + do spawntask_later { + chan_cell.take().send(()); + } + ports.push(port); + + // Make sure the other scheduler is awake + handle1.remote.fire(); + handle2.remote.fire(); + } + + while !ports.is_empty() { + ports.pop().recv(); + } + }; + + sched2.enqueue_task(task); + sched2.run(); + }; + } + } } From dec9db10da062b1c528d46426d9f62e201d39bc6 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 28 May 2013 18:39:52 -0700 Subject: [PATCH 10/32] core::rt: Add SleeperList Just a simple place to stuff handles to sleeping schedulers. --- src/libcore/rt/mod.rs | 3 +++ src/libcore/rt/sleeper_list.rs | 46 ++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 src/libcore/rt/sleeper_list.rs diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index f136732c00b93..82496ec558940 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -88,6 +88,9 @@ mod work_queue; /// A parallel queue. mod message_queue; +/// A parallel data structure for tracking sleeping schedulers. +mod sleeper_list; + /// Stack segments and caching. mod stack; diff --git a/src/libcore/rt/sleeper_list.rs b/src/libcore/rt/sleeper_list.rs new file mode 100644 index 0000000000000..9507dec001d51 --- /dev/null +++ b/src/libcore/rt/sleeper_list.rs @@ -0,0 +1,46 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Maintains a shared list of sleeping schedulers. Schedulers +//! use this to wake each other up. + +use container::Container; +use vec::OwnedVector; +use option::{Option, Some, None}; +use cell::Cell; +use unstable::sync::{Exclusive, exclusive}; +use rt::sched::{Scheduler, SchedHandle}; + +pub struct SleeperList { + priv stack: ~Exclusive<~[SchedHandle]> +} + +impl SleeperList { + pub fn new() -> SleeperList { + SleeperList { + stack: ~exclusive(~[]) + } + } + + pub fn push(&mut self, handle: SchedHandle) { + let handle = Cell(handle); + self.stack.with(|s| s.push(handle.take())); + } + + pub fn pop(&mut self) -> Option { + do self.stack.with |s| { + if !s.is_empty() { + Some(s.pop()) + } else { + None + } + } + } +} From ed8c3594bc86dd366e729d02c34915c783e6ac81 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 28 May 2013 19:53:55 -0700 Subject: [PATCH 11/32] core::rt: Add SleeperList to Scheduler --- src/libcore/rt/mod.rs | 4 +++- src/libcore/rt/sched.rs | 20 +++++++++++++++++--- src/libcore/rt/sleeper_list.rs | 9 +++++++++ src/libcore/rt/test.rs | 3 ++- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 82496ec558940..75036dcd28f8d 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -150,12 +150,14 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { use self::sched::{Scheduler, Coroutine}; use self::work_queue::WorkQueue; use self::uv::uvio::UvEventLoop; + use self::sleeper_list::SleeperList; init(crate_map); let loop_ = ~UvEventLoop::new(); let work_queue = WorkQueue::new(); - let mut sched = ~Scheduler::new(loop_, work_queue); + let sleepers = SleeperList::new(); + let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); let main_task = ~Coroutine::new(&mut sched.stack_pool, main); sched.enqueue_task(main_task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index e78d50beebe1b..2a99648fa0459 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -13,6 +13,7 @@ use sys; use cast::transmute; use cell::Cell; +use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; @@ -27,7 +28,12 @@ use rt::rtio::IoFactoryObject; /// thread local storage and the running task is owned by the /// scheduler. pub struct Scheduler { + /// A queue of available work. Under a work-stealing policy there + /// is one per Scheduler. priv work_queue: WorkQueue<~Coroutine>, + /// A shared list of sleeping schedulers. We'll use this to wake + /// up schedulers when pushing work onto the work queue. + priv sleeper_list: SleeperList, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, @@ -65,12 +71,16 @@ pub impl Scheduler { fn in_task_context(&self) -> bool { self.current_task.is_some() } - fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Coroutine>) -> Scheduler { + fn new(event_loop: ~EventLoopObject, + work_queue: WorkQueue<~Coroutine>, + sleeper_list: SleeperList) + -> Scheduler { // Lazily initialize the runtime TLS key local_ptr::init_tls_key(); Scheduler { + sleeper_list: sleeper_list, event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), @@ -618,19 +628,23 @@ mod test { use container::Container; use vec::OwnedVector; use rt::rtio::RemoteCallback; + use rt::sleeper_list::SleeperList; do run_in_bare_thread { + let sleepers1 = SleeperList::new(); let work_queue1 = WorkQueue::new(); + + let sleepers2 = sleepers1.clone(); let work_queue2 = work_queue1.clone(); let loop1 = ~UvEventLoop::new(); - let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone()); + let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone(), sleepers1); let handle1 = sched1.make_handle(); let sched1_cell = Cell(sched1); let handle1_cell = Cell(handle1); let loop2 = ~UvEventLoop::new(); - let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone()); + let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone(), sleepers2); let handle2 = sched2.make_handle(); let sched2_cell = Cell(sched2); let handle2_cell = Cell(handle2); diff --git a/src/libcore/rt/sleeper_list.rs b/src/libcore/rt/sleeper_list.rs index 9507dec001d51..dfcac8eb088f7 100644 --- a/src/libcore/rt/sleeper_list.rs +++ b/src/libcore/rt/sleeper_list.rs @@ -17,6 +17,7 @@ use option::{Option, Some, None}; use cell::Cell; use unstable::sync::{Exclusive, exclusive}; use rt::sched::{Scheduler, SchedHandle}; +use clone::Clone; pub struct SleeperList { priv stack: ~Exclusive<~[SchedHandle]> @@ -44,3 +45,11 @@ impl SleeperList { } } } + +impl Clone for SleeperList { + fn clone(&self) -> SleeperList { + SleeperList { + stack: self.stack.clone() + } + } +} \ No newline at end of file diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index 0e2da452366cf..d6896f5003437 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -21,8 +21,9 @@ use rt::sched::Scheduler; pub fn new_test_uv_sched() -> Scheduler { use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; + use rt::sleeper_list::SleeperList; - Scheduler::new(~UvEventLoop::new(), WorkQueue::new()) + Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()) } /// Creates a new scheduler in a new thread and runs a task in it, From 5043ea269da73e96fbadc7c443aec01f087dabe9 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 28 May 2013 23:35:22 -0700 Subject: [PATCH 12/32] core::rt: Add run_in_mt_newsched_task test function --- src/libcore/rt/test.rs | 63 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index d6896f5003437..a66e4f09fe72b 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -9,14 +9,20 @@ // except according to those terms. use uint; -use option::*; +use option::{Option, Some, None}; use cell::Cell; +use clone::Clone; +use container::Container; +use vec::OwnedVector; use result::{Result, Ok, Err}; +use unstable::run_in_bare_thread; use super::io::net::ip::{IpAddr, Ipv4}; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; -use rt::sched::Scheduler; +use rt::sched::{Scheduler, Coroutine}; +use rt::sleeper_list::SleeperList; +use rt::work_queue::WorkQueue; pub fn new_test_uv_sched() -> Scheduler { use rt::uv::uvio::UvEventLoop; @@ -46,6 +52,59 @@ pub fn run_in_newsched_task(f: ~fn()) { } } +/// Create more than one scheduler and run a function in a task +/// in one of the schedulers. The schedulers will stay alive +/// until the function `f` returns. +pub fn run_in_mt_newsched_task(f: ~fn()) { + use rt::uv::uvio::UvEventLoop; + + let f_cell = Cell(f); + + do run_in_bare_thread { + static N: uint = 2; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, N) |i| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let handle = sched.make_handle(); + handles.push(handle); + scheds.push(sched); + } + + let f_cell = Cell(f_cell.take()); + let handles = handles; // Freeze + let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { + f_cell.take()(); + // Hold on to handles until the function exits. This keeps the schedulers alive. + let _captured_handles = &handles; + }; + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell(sched); + let thread = do Thread::start { + let mut sched = sched_cell.take(); + sched.run(); + }; + + threads.push(thread); + } + + // Wait for schedulers + let _threads = threads; + } +} + /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { use super::sched::*; From a373dad74d0bd89a9d5362bba1059d9cc25afb9a Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 15:55:23 -0700 Subject: [PATCH 13/32] core::rt: Outline the full multithreaded scheduling algo. Implement sleeping --- src/libcore/rt/message_queue.rs | 3 + src/libcore/rt/mod.rs | 1 + src/libcore/rt/sched.rs | 222 +++++++++++++++++++++----------- src/libcore/rt/test.rs | 17 ++- 4 files changed, 162 insertions(+), 81 deletions(-) diff --git a/src/libcore/rt/message_queue.rs b/src/libcore/rt/message_queue.rs index eaab9288ac8d0..21711bbe84c70 100644 --- a/src/libcore/rt/message_queue.rs +++ b/src/libcore/rt/message_queue.rs @@ -8,6 +8,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! A concurrent queue that supports multiple producers and a +//! single consumer. + use container::Container; use kinds::Owned; use vec::OwnedVector; diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 75036dcd28f8d..e23ad76a8c610 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -158,6 +158,7 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { let work_queue = WorkQueue::new(); let sleepers = SleeperList::new(); let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); + sched.no_sleep = true; let main_task = ~Coroutine::new(&mut sched.stack_pool, main); sched.enqueue_task(main_task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 2a99648fa0459..c6d6bb9f39e50 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -12,6 +12,7 @@ use option::*; use sys; use cast::transmute; use cell::Cell; +use clone::Clone; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; @@ -19,9 +20,10 @@ use super::stack::{StackPool, StackSegment}; use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::Task; +use super::message_queue::MessageQueue; use rt::local_ptr; use rt::local::Local; -use rt::rtio::IoFactoryObject; +use rt::rtio::{IoFactoryObject, RemoteCallback}; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -31,9 +33,23 @@ pub struct Scheduler { /// A queue of available work. Under a work-stealing policy there /// is one per Scheduler. priv work_queue: WorkQueue<~Coroutine>, + /// The queue of incoming messages from other schedulers. + /// These are enqueued by SchedHandles after which a remote callback + /// is triggered to handle the message. + priv message_queue: MessageQueue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. priv sleeper_list: SleeperList, + /// Indicates that we have previously pushed a handle onto the + /// SleeperList but have not yet received the Wake message. + /// Being `true` does not necessarily mean that the scheduler is + /// not active since there are multiple event sources that may + /// wake the scheduler. It just prevents the scheduler from pushing + /// multiple handles onto the sleeper list. + priv sleepy: bool, + /// A flag to indicate we've received the shutdown message and should + /// no longer try to go to sleep, but exit instead. + no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, @@ -47,6 +63,11 @@ pub struct Scheduler { priv cleanup_job: Option } +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject, + priv queue: MessageQueue +} + pub struct Coroutine { /// The segment of stack on which the task is currently running or, /// if the task is blocked, on which the task will resume execution @@ -58,8 +79,9 @@ pub struct Coroutine { task: ~Task } -pub struct SchedHandle { - priv remote: ~RemoteCallbackObject +pub enum SchedMessage { + Wake, + Shutdown } enum CleanupJob { @@ -81,12 +103,15 @@ pub impl Scheduler { Scheduler { sleeper_list: sleeper_list, + message_queue: MessageQueue::new(), + sleepy: false, + no_sleep: false, event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, - cleanup_job: None + cleanup_job: None, } } @@ -116,17 +141,51 @@ pub impl Scheduler { return sched; } + fn run_sched_once() { + + let sched = Local::take::(); + if sched.interpret_message_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + let mut sched = Local::take::(); + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + let sched = Local::take::(); + if sched.resume_task_from_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + let mut sched = Local::take::(); + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + // If we got here then there was no work to do. + // Generate a SchedHandle and push it to the sleeper list so + // somebody can wake us up later. + rtdebug!("no work to do"); + let mut sched = Local::take::(); + if !sched.sleepy && !sched.no_sleep { + rtdebug!("sleeping"); + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping"); + } + Local::put(sched); + } + fn make_handle(&mut self) -> SchedHandle { - let remote = self.event_loop.remote_callback(wake_up); + let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); return SchedHandle { - remote: remote + remote: remote, + queue: self.message_queue.clone() }; - - fn wake_up() { - let sched = Local::take::(); - sched.resume_task_from_queue(); - } } /// Schedule a task to be executed later. @@ -136,17 +195,63 @@ pub impl Scheduler { /// directly. fn enqueue_task(&mut self, task: ~Coroutine) { self.work_queue.push(task); - self.event_loop.callback(resume_task_from_queue); + self.event_loop.callback(Scheduler::run_sched_once); - fn resume_task_from_queue() { - let sched = Local::take::(); - sched.resume_task_from_queue(); + // We've made work available. Notify a sleeping scheduler. + match self.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake) + } + None => (/* pass */) } } // * Scheduler-context operations - fn resume_task_from_queue(~self) { + fn interpret_message_queue(~self) -> bool { + assert!(!self.in_task_context()); + + rtdebug!("looking for scheduler messages"); + + let mut this = self; + match this.message_queue.pop() { + Some(Wake) => { + rtdebug!("recv Wake message"); + this.sleepy = false; + Local::put(this); + return true; + } + Some(Shutdown) => { + rtdebug!("recv Shutdown message"); + if this.sleepy { + // There may be an outstanding handle on the sleeper list. + // Pop them all to make sure that's not the case. + loop { + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake); + } + None => (/* pass */) + } + } + } + // No more sleeping. After there are no outstanding event loop + // references we will shut down. + this.no_sleep = true; + this.sleepy = false; + Local::put(this); + return true; + } + None => { + Local::put(this); + return false; + } + } + } + + fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); rtdebug!("looking in work queue for task to schedule"); @@ -156,10 +261,12 @@ pub impl Scheduler { Some(task) => { rtdebug!("resuming task from work queue"); this.resume_task_immediately(task); + return true; } None => { rtdebug!("no tasks in queue"); Local::put(this); + return false; } } } @@ -363,6 +470,13 @@ pub impl Scheduler { } } +impl SchedHandle { + pub fn send(&mut self, msg: SchedMessage) { + self.queue.push(msg); + self.remote.fire(); + } +} + pub impl Coroutine { fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { Coroutine::with_task(stack_pool, ~Task::new(), start) @@ -621,71 +735,25 @@ mod test { #[test] fn multithreading() { - use clone::Clone; - use iter::Times; - use rt::work_queue::WorkQueue; use rt::comm::*; - use container::Container; + use iter::Times; use vec::OwnedVector; - use rt::rtio::RemoteCallback; - use rt::sleeper_list::SleeperList; - - do run_in_bare_thread { - let sleepers1 = SleeperList::new(); - let work_queue1 = WorkQueue::new(); - - let sleepers2 = sleepers1.clone(); - let work_queue2 = work_queue1.clone(); - - let loop1 = ~UvEventLoop::new(); - let mut sched1 = ~Scheduler::new(loop1, work_queue1.clone(), sleepers1); - let handle1 = sched1.make_handle(); - let sched1_cell = Cell(sched1); - let handle1_cell = Cell(handle1); - - let loop2 = ~UvEventLoop::new(); - let mut sched2 = ~Scheduler::new(loop2, work_queue2.clone(), sleepers2); - let handle2 = sched2.make_handle(); - let sched2_cell = Cell(sched2); - let handle2_cell = Cell(handle2); - - let _thread1 = do Thread::start { - let mut sched1 = sched1_cell.take(); - sched1.run(); - }; - - let _thread2 = do Thread::start { - let mut sched2 = sched2_cell.take(); - let handle1_cell = Cell(handle1_cell.take()); - let handle2_cell = Cell(handle2_cell.take()); - - let task = ~do Coroutine::new(&mut sched2.stack_pool) { - // Hold handles to keep the schedulers alive - let mut handle1 = handle1_cell.take(); - let mut handle2 = handle2_cell.take(); - - let mut ports = ~[]; - for 10.times { - let (port, chan) = oneshot(); - let chan_cell = Cell(chan); - do spawntask_later { - chan_cell.take().send(()); - } - ports.push(port); - - // Make sure the other scheduler is awake - handle1.remote.fire(); - handle2.remote.fire(); - } + use container::Container; - while !ports.is_empty() { - ports.pop().recv(); - } - }; + do run_in_mt_newsched_task { + let mut ports = ~[]; + for 10.times { + let (port, chan) = oneshot(); + let chan_cell = Cell(chan); + do spawntask_later { + chan_cell.take().send(()); + } + ports.push(port); + } - sched2.enqueue_task(task); - sched2.run(); - }; + while !ports.is_empty() { + ports.pop().recv(); + } } } } diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index a66e4f09fe72b..1bbfe8d473db3 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -13,6 +13,7 @@ use option::{Option, Some, None}; use cell::Cell; use clone::Clone; use container::Container; +use old_iter::MutableIter; use vec::OwnedVector; use result::{Result, Ok, Err}; use unstable::run_in_bare_thread; @@ -29,7 +30,10 @@ pub fn new_test_uv_sched() -> Scheduler { use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; - Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()) + let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message + sched.no_sleep = true; + return sched; } /// Creates a new scheduler in a new thread and runs a task in it, @@ -57,6 +61,7 @@ pub fn run_in_newsched_task(f: ~fn()) { /// until the function `f` returns. pub fn run_in_mt_newsched_task(f: ~fn()) { use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; let f_cell = Cell(f); @@ -78,11 +83,15 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } let f_cell = Cell(f_cell.take()); - let handles = handles; // Freeze + let handles = Cell(handles); let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { f_cell.take()(); - // Hold on to handles until the function exits. This keeps the schedulers alive. - let _captured_handles = &handles; + + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.each_mut |handle| { + handle.send(Shutdown); + } }; scheds[0].enqueue_task(main_task); From f343e6172b7132545c72e3e09e6afccc06fdcee7 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 17:25:29 -0700 Subject: [PATCH 14/32] core::rt: Fix an infinite recursion bug --- src/libcore/rt/comm.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs index 576a402b70919..d108e20347a05 100644 --- a/src/libcore/rt/comm.rs +++ b/src/libcore/rt/comm.rs @@ -22,6 +22,7 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; +use rt::rtio::EventLoop; use unstable::intrinsics::{atomic_xchg, atomic_load}; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; @@ -172,9 +173,17 @@ impl PortOne { } STATE_ONE => { // Channel is closed. Switch back and check the data. + // NB: We have to drop back into the scheduler event loop here + // instead of switching immediately back or we could end up + // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); - let sched = Local::take::(); - sched.resume_task_immediately(task); + let task = Cell(task); + let mut sched = Local::take::(); + do sched.event_loop.callback { + let sched = Local::take::(); + sched.resume_task_immediately(task.take()); + } + Local::put(sched); } _ => util::unreachable() } @@ -614,5 +623,15 @@ mod test { } } } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + do run_in_newsched_task { + let (port, chan) = stream(); + for 10000.times { chan.send(()) } + for 10000.times { port.recv() } + } + } } From 134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 17:52:00 -0700 Subject: [PATCH 15/32] core::rt: Change the signature of context switching methods to avoid infinite recursion --- src/libcore/rt/comm.rs | 4 +-- src/libcore/rt/mod.rs | 7 ++--- src/libcore/rt/sched.rs | 63 +++++++++++++++++++-------------------- src/libcore/rt/test.rs | 32 ++++++-------------- src/libcore/rt/tube.rs | 34 +++++++++------------ src/libcore/rt/uv/uvio.rs | 26 +++++++--------- 6 files changed, 66 insertions(+), 100 deletions(-) diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs index d108e20347a05..8ff3887f779c0 100644 --- a/src/libcore/rt/comm.rs +++ b/src/libcore/rt/comm.rs @@ -159,7 +159,7 @@ impl PortOne { // Switch to the scheduler to put the ~Task into the Packet state. let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { unsafe { // Atomically swap the task pointer into the Packet state, issuing // an acquire barrier to prevent reordering of the subsequent read @@ -178,12 +178,10 @@ impl PortOne { // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); let task = Cell(task); - let mut sched = Local::take::(); do sched.event_loop.callback { let sched = Local::take::(); sched.resume_task_immediately(task.take()); } - Local::put(sched); } _ => util::unreachable() } diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index e23ad76a8c610..1113d7abe7dcb 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -238,12 +238,9 @@ fn test_context() { let task = ~do Coroutine::new(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { assert_eq!(context(), SchedulerContext); - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + sched.enqueue_task(task); } }; sched.enqueue_task(task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index c6d6bb9f39e50..089c95cd7cd53 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -280,11 +280,9 @@ pub impl Scheduler { rtdebug!("ending running task"); - do self.deschedule_running_task_and_then |dead_task| { + do self.deschedule_running_task_and_then |sched, dead_task| { let dead_task = Cell(dead_task); - do Local::borrow:: |sched| { - dead_task.take().recycle(&mut sched.stack_pool); - } + dead_task.take().recycle(&mut sched.stack_pool); } abort!("control reached end of task"); @@ -293,22 +291,18 @@ pub impl Scheduler { fn schedule_new_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow:: |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } fn schedule_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow:: |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } @@ -352,7 +346,11 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) { + /// + /// This passes a Scheduler pointer to the fn after the context switch + /// in order to prevent that fn from performing further scheduling operations. + /// Doing further scheduling could easily result in infinite recursion. + fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); @@ -360,7 +358,8 @@ pub impl Scheduler { unsafe { let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f); + let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f); let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); } @@ -382,14 +381,18 @@ pub impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) { + fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, + f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); rtdebug!("switching tasks"); let old_running_task = this.current_task.swap_unwrap(); - let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) }; + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f) + }; let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); this.current_task = Some(next_task); @@ -426,7 +429,7 @@ pub impl Scheduler { let cleanup_job = self.cleanup_job.swap_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => (f.to_fn())(task) + GiveTask(task, f) => (f.to_fn())(self, task) } } @@ -535,12 +538,12 @@ pub impl Coroutine { // complaining type UnsafeTaskReceiver = sys::Closure; trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); + fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } } } #[cfg(test)] @@ -604,11 +607,9 @@ mod test { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |task1| { + do sched.switch_running_tasks_and_then(task2) |sched, task1| { let task1 = Cell(task1); - do Local::borrow:: |sched| { - sched.enqueue_task(task1.take()); - } + sched.enqueue_task(task1.take()); } unsafe { *count_ptr = *count_ptr + 1; } }; @@ -658,12 +659,10 @@ mod test { let task = ~do Coroutine::new(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { let task = Cell(task); - do Local::borrow:: |sched| { - assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); - } + assert!(!sched.in_task_context()); + sched.enqueue_task(task.take()); } }; sched.enqueue_task(task); @@ -680,8 +679,7 @@ mod test { do run_in_newsched_task { do spawn { let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { - let mut sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { let task = Cell(task); do sched.event_loop.callback_ms(10) { rtdebug!("in callback"); @@ -689,7 +687,6 @@ mod test { sched.enqueue_task(task.take()); Local::put(sched); } - Local::put(sched); } } } diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index 1bbfe8d473db3..16b0aef5e266b 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -122,11 +122,7 @@ pub fn spawntask(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - let sched = Local::take::(); - sched.schedule_new_task(task.take()); - } + sched.schedule_new_task(task); } /// Create a new task and run it right now. Aborts on failure @@ -137,11 +133,8 @@ pub fn spawntask_immediately(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } @@ -172,11 +165,8 @@ pub fn spawntask_random(f: ~fn()) { f); if run_now { - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } else { sched.enqueue_task(task); @@ -199,10 +189,9 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Switch to the scheduler let f = Cell(Cell(f)); let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |old_task| { + do sched.deschedule_running_task_and_then() |sched, old_task| { let old_task = Cell(old_task); let f = f.take(); - let mut sched = Local::take::(); let new_task = ~do Coroutine::new(&mut sched.stack_pool) { do (|| { (f.take())() @@ -210,16 +199,13 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Check for failure then resume the parent task unsafe { *failed_ptr = task::failing(); } let sched = Local::take::(); - do sched.switch_running_tasks_and_then(old_task.take()) |new_task| { - let new_task = Cell(new_task); - do Local::borrow:: |sched| { - sched.enqueue_task(new_task.take()); - } + do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| { + sched.enqueue_task(new_task); } } }; - sched.resume_task_immediately(new_task); + sched.enqueue_task(new_task); } if !failed { Ok(()) } else { Err(()) } diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs index b2f475a696605..4482a92d916aa 100644 --- a/src/libcore/rt/tube.rs +++ b/src/libcore/rt/tube.rs @@ -72,7 +72,7 @@ impl Tube { assert!(self.p.refcount() > 1); // There better be somebody to wake us up assert!((*state).blocked_task.is_none()); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |_, task| { (*state).blocked_task = Some(task); } rtdebug!("waking after tube recv"); @@ -107,11 +107,10 @@ mod test { let tube_clone = tube.clone(); let tube_clone_cell = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { let mut tube_clone = tube_clone_cell.take(); tube_clone.send(1); - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -123,21 +122,17 @@ mod test { do run_in_newsched_task { let mut tube: Tube = Tube::new(); let tube_clone = tube.clone(); - let tube_clone = Cell(Cell(Cell(tube_clone))); + let tube_clone = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { - let tube_clone = tube_clone.take(); - do Local::borrow:: |sched| { - let tube_clone = tube_clone.take(); - do sched.event_loop.callback { - let mut tube_clone = tube_clone.take(); - // The task should be blocked on this now and - // sending will wake it up. - tube_clone.send(1); - } + do sched.deschedule_running_task_and_then |sched, task| { + let tube_clone = Cell(tube_clone.take()); + do sched.event_loop.callback { + let mut tube_clone = tube_clone.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube_clone.send(1); } - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -153,7 +148,7 @@ mod test { let tube_clone = tube.clone(); let tube_clone = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { callback_send(tube_clone.take(), 0); fn callback_send(tube: Tube, i: int) { @@ -172,8 +167,7 @@ mod test { } } - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } for int::range(0, MAX) |i| { diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index e25b6140abbfd..1ee6504d11fc5 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -205,12 +205,10 @@ impl IoFactory for UvIoFactory { assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("connect: entered scheduler context"); - do Local::borrow:: |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell(task); @@ -250,7 +248,7 @@ impl IoFactory for UvIoFactory { Ok(_) => Ok(~UvTcpListener::new(watcher)), Err(uverr) => { let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::(); @@ -286,7 +284,7 @@ impl Drop for UvTcpListener { fn finalize(&self) { let watcher = self.watcher(); let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::(); @@ -348,7 +346,7 @@ impl Drop for UvTcpStream { rtdebug!("closing tcp stream"); let watcher = self.watcher(); let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.close { let scheduler = Local::take::(); @@ -367,11 +365,9 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("read: entered scheduler context"); - do Local::borrow:: |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut watcher = watcher; let task_cell = Cell(task); // XXX: We shouldn't reallocate these callbacks every @@ -413,7 +409,7 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let mut watcher = watcher; let task_cell = Cell(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; @@ -507,11 +503,9 @@ fn test_read_and_block() { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { let task = Cell(task); - do Local::borrow:: |scheduler| { - scheduler.enqueue_task(task.take()); - } + sched.enqueue_task(task.take()); } } From ca2eebd5dd8ceea1da77b6a6f4fb8c68462a400b Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 21:03:21 -0700 Subject: [PATCH 16/32] core::rt: Add some notes about optimizations --- src/libstd/rt/sched.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 089c95cd7cd53..75b5306644116 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -29,6 +29,9 @@ use rt::rtio::{IoFactoryObject, RemoteCallback}; /// on a single thread. When the scheduler is running it is owned by /// thread local storage and the running task is owned by the /// scheduler. +/// +/// XXX: This creates too many callbacks to run_sched_once, resulting +/// in too much allocation and too many events. pub struct Scheduler { /// A queue of available work. Under a work-stealing policy there /// is one per Scheduler. @@ -143,6 +146,10 @@ pub impl Scheduler { fn run_sched_once() { + // First, check the message queue for instructions. + // XXX: perf. Check for messages without atomics. + // It's ok if we miss messages occasionally, as long as + // we sync and check again before sleeping. let sched = Local::take::(); if sched.interpret_message_queue() { // We performed a scheduling action. There may be other work @@ -153,6 +160,7 @@ pub impl Scheduler { return; } + // Now, look in the work queue for tasks to run let sched = Local::take::(); if sched.resume_task_from_queue() { // We performed a scheduling action. There may be other work @@ -198,6 +206,12 @@ pub impl Scheduler { self.event_loop.callback(Scheduler::run_sched_once); // We've made work available. Notify a sleeping scheduler. + // XXX: perf. Check for a sleeper without synchronizing memory. + // It's not critical that we always find it. + // XXX: perf. If there's a sleeper then we might as well just send + // it the task directly instead of pushing it to the + // queue. That is essentially the intent here and it is less + // work. match self.sleeper_list.pop() { Some(handle) => { let mut handle = handle; From 8eb358bb00f161f9e289de6cad8cfecc4c6eb681 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 29 May 2013 22:38:15 -0700 Subject: [PATCH 17/32] core::rt: Begin recording scheduler metrics --- src/libstd/rt/comm.rs | 23 ++++++++--- src/libstd/rt/metrics.rs | 88 ++++++++++++++++++++++++++++++++++++++++ src/libstd/rt/mod.rs | 2 + src/libstd/rt/sched.rs | 18 +++++++- 4 files changed, 123 insertions(+), 8 deletions(-) create mode 100644 src/libstd/rt/metrics.rs diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index ebfa9e263ef80..19fb809d4378e 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -119,8 +119,16 @@ impl ChanOne { match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do + do Local::borrow:: |sched| { + rtdebug!("non-rendezvous send"); + sched.metrics.non_rendezvous_sends += 1; + } } STATE_ONE => { + do Local::borrow:: |sched| { + rtdebug!("rendezvous send"); + sched.metrics.rendezvous_sends += 1; + } // Port has closed. Need to clean up. let _packet: ~Packet = cast::transmute(this.inner.void_packet); recvr_active = false; @@ -128,7 +136,9 @@ impl ChanOne { task_as_state => { // Port is blocked. Wake it up. let recvr: ~Coroutine = cast::transmute(task_as_state); - let sched = Local::take::(); + let mut sched = Local::take::(); + rtdebug!("rendezvous send"); + sched.metrics.rendezvous_sends += 1; sched.schedule_task(recvr); } } @@ -170,18 +180,19 @@ impl PortOne { match oldstate { STATE_BOTH => { // Data has not been sent. Now we're blocked. + rtdebug!("non-rendezvous recv"); + sched.metrics.non_rendezvous_recvs += 1; } STATE_ONE => { + rtdebug!("rendezvous recv"); + sched.metrics.rendezvous_recvs += 1; + // Channel is closed. Switch back and check the data. // NB: We have to drop back into the scheduler event loop here // instead of switching immediately back or we could end up // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); - let task = Cell(task); - do sched.event_loop.callback { - let sched = Local::take::(); - sched.resume_task_immediately(task.take()); - } + sched.enqueue_task(task); } _ => util::unreachable() } diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs new file mode 100644 index 0000000000000..70e347fdfb6ac --- /dev/null +++ b/src/libstd/rt/metrics.rs @@ -0,0 +1,88 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use to_str::ToStr; + +pub struct SchedMetrics { + // The number of times executing `run_sched_once`. + turns: uint, + // The number of turns that received a message. + messages_received: uint, + // The number of turns that ran a task from the queue. + tasks_resumed_from_queue: uint, + // The number of turns that found no work to perform. + wasted_turns: uint, + // The number of times the scheduler went to sleep. + sleepy_times: uint, + // Context switches from the scheduler into a task. + context_switches_sched_to_task: uint, + // Context switches from a task into the scheduler. + context_switches_task_to_sched: uint, + // Context switches from a task to a task. + context_switches_task_to_task: uint, + // Message sends that unblock the receiver + rendezvous_sends: uint, + // Message sends that do not unblock the receiver + non_rendezvous_sends: uint, + // Message receives that do not block the receiver + rendezvous_recvs: uint, + // Message receives that block the receiver + non_rendezvous_recvs: uint +} + +impl SchedMetrics { + pub fn new() -> SchedMetrics { + SchedMetrics { + turns: 0, + messages_received: 0, + tasks_resumed_from_queue: 0, + wasted_turns: 0, + sleepy_times: 0, + context_switches_sched_to_task: 0, + context_switches_task_to_sched: 0, + context_switches_task_to_task: 0, + rendezvous_sends: 0, + non_rendezvous_sends: 0, + rendezvous_recvs: 0, + non_rendezvous_recvs: 0 + } + } +} + +impl ToStr for SchedMetrics { + fn to_str(&self) -> ~str { + fmt!("turns: %u\n\ + messages_received: %u\n\ + tasks_resumed_from_queue: %u\n\ + wasted_turns: %u\n\ + sleepy_times: %u\n\ + context_switches_sched_to_task: %u\n\ + context_switches_task_to_sched: %u\n\ + context_switches_task_to_task: %u\n\ + rendezvous_sends: %u\n\ + non_rendezvous_sends: %u\n\ + rendezvous_recvs: %u\n\ + non_rendezvous_recvs: %u\n\ + ", + self.turns, + self.messages_received, + self.tasks_resumed_from_queue, + self.wasted_turns, + self.sleepy_times, + self.context_switches_sched_to_task, + self.context_switches_task_to_sched, + self.context_switches_task_to_task, + self.rendezvous_sends, + self.non_rendezvous_sends, + self.rendezvous_recvs, + self.non_rendezvous_recvs + ) + } +} \ No newline at end of file diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 1113d7abe7dcb..23dc757800222 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -130,6 +130,8 @@ pub mod local_ptr; /// Bindings to pthread/windows thread-local storage. pub mod thread_local_storage; +pub mod metrics; + /// Set up a default runtime configuration, given compiler-supplied arguments. /// diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 75b5306644116..b5b8bb732e7fd 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -13,6 +13,7 @@ use sys; use cast::transmute; use cell::Cell; use clone::Clone; +use to_str::ToStr; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; @@ -24,6 +25,7 @@ use super::message_queue::MessageQueue; use rt::local_ptr; use rt::local::Local; use rt::rtio::{IoFactoryObject, RemoteCallback}; +use rt::metrics::SchedMetrics; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -63,7 +65,8 @@ pub struct Scheduler { current_task: Option<~Coroutine>, /// An action performed after a context switch on behalf of the /// code running before the context switch - priv cleanup_job: Option + priv cleanup_job: Option, + metrics: SchedMetrics } pub struct SchedHandle { @@ -115,6 +118,7 @@ pub impl Scheduler { saved_context: Context::empty(), current_task: None, cleanup_job: None, + metrics: SchedMetrics::new() } } @@ -141,20 +145,24 @@ pub impl Scheduler { let sched = Local::take::(); assert!(sched.work_queue.is_empty()); + rtdebug!("scheduler metrics: %s\n", sched.metrics.to_str()); return sched; } fn run_sched_once() { + let mut sched = Local::take::(); + sched.metrics.turns += 1; + // First, check the message queue for instructions. // XXX: perf. Check for messages without atomics. // It's ok if we miss messages occasionally, as long as // we sync and check again before sleeping. - let sched = Local::take::(); if sched.interpret_message_queue() { // We performed a scheduling action. There may be other work // to do yet, so let's try again later. let mut sched = Local::take::(); + sched.metrics.messages_received += 1; sched.event_loop.callback(Scheduler::run_sched_once); Local::put(sched); return; @@ -166,6 +174,7 @@ pub impl Scheduler { // We performed a scheduling action. There may be other work // to do yet, so let's try again later. let mut sched = Local::take::(); + sched.metrics.tasks_resumed_from_queue += 1; sched.event_loop.callback(Scheduler::run_sched_once); Local::put(sched); return; @@ -176,8 +185,10 @@ pub impl Scheduler { // somebody can wake us up later. rtdebug!("no work to do"); let mut sched = Local::take::(); + sched.metrics.wasted_turns += 1; if !sched.sleepy && !sched.no_sleep { rtdebug!("sleeping"); + sched.metrics.sleepy_times += 1; sched.sleepy = true; let handle = sched.make_handle(); sched.sleeper_list.push(handle); @@ -327,6 +338,7 @@ pub impl Scheduler { assert!(!this.in_task_context()); rtdebug!("scheduling a task"); + this.metrics.context_switches_sched_to_task += 1; // Store the task in the scheduler so it can be grabbed later this.current_task = Some(task); @@ -369,6 +381,7 @@ pub impl Scheduler { assert!(this.in_task_context()); rtdebug!("blocking task"); + this.metrics.context_switches_task_to_sched += 1; unsafe { let blocked_task = this.current_task.swap_unwrap(); @@ -401,6 +414,7 @@ pub impl Scheduler { assert!(this.in_task_context()); rtdebug!("switching tasks"); + this.metrics.context_switches_task_to_task += 1; let old_running_task = this.current_task.swap_unwrap(); let f_fake_region = unsafe { From 053b38e7e1cba8f7bb649a5fc8d82b0448d33c55 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 30 May 2013 00:18:49 -0700 Subject: [PATCH 18/32] core::rt: Fix two multithreading bugs and add a threadring test This properly distributes the load now --- src/libstd/rt/sched.rs | 66 +++++++++++++++++++++++++++++++++++++++++- src/libstd/rt/test.rs | 2 +- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index b5b8bb732e7fd..a57a87ffba774 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -131,6 +131,11 @@ pub impl Scheduler { let mut self_sched = self; + // Always run through the scheduler loop at least once so that + // we enter the sleep state and can then be woken up by other + // schedulers. + self_sched.event_loop.callback(Scheduler::run_sched_once); + unsafe { let event_loop: *mut ~EventLoopObject = { let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; @@ -258,7 +263,7 @@ pub impl Scheduler { let mut handle = handle; handle.send(Wake); } - None => (/* pass */) + None => break } } } @@ -781,4 +786,63 @@ mod test { } } } + + #[test] + fn thread_ring() { + use rt::comm::*; + use iter::Times; + use vec::OwnedVector; + use container::Container; + use comm::{GenericPort, GenericChan}; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = oneshot(); + + let n_tasks = 10; + let token = 2000; + + let mut (p, ch1) = stream(); + ch1.send((token, end_chan)); + let mut i = 2; + while i <= n_tasks { + let (next_p, ch) = stream(); + let imm_i = i; + let imm_p = p; + do spawntask_random { + roundtrip(imm_i, n_tasks, &imm_p, &ch); + }; + p = next_p; + i += 1; + } + let imm_p = p; + let imm_ch = ch1; + do spawntask_random { + roundtrip(1, n_tasks, &imm_p, &imm_ch); + } + + end_port.recv(); + } + + fn roundtrip(id: int, n_tasks: int, + p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) { + while (true) { + match p.recv() { + (1, end_chan) => { + debug!("%d\n", id); + end_chan.send(()); + return; + } + (token, end_chan) => { + debug!("thread: %d got token: %d", id, token); + ch.send((token - 1, end_chan)); + if token <= n_tasks { + return; + } + } + } + } + } + + } + } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 16b0aef5e266b..e05e2004e0b21 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -66,7 +66,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let f_cell = Cell(f); do run_in_bare_thread { - static N: uint = 2; + static N: uint = 4; let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); From ea633b42aeadf807a10036a87bf2903123250152 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 30 May 2013 13:20:17 -0700 Subject: [PATCH 19/32] core::rt: deny(unused_imports, unused_mut, unused_variable) --- src/libstd/rt/comm.rs | 1 - src/libstd/rt/local.rs | 1 - src/libstd/rt/mod.rs | 5 +++-- src/libstd/rt/sched.rs | 11 +++-------- src/libstd/rt/sleeper_list.rs | 2 +- src/libstd/rt/test.rs | 8 +++----- src/libstd/rt/uv/uvio.rs | 5 +---- 7 files changed, 11 insertions(+), 22 deletions(-) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 19fb809d4378e..26d02fb6640ac 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -22,7 +22,6 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; -use rt::rtio::EventLoop; use unstable::intrinsics::{atomic_xchg, atomic_load}; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index ffff54f00bbe7..e6988c538881a 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -87,7 +87,6 @@ impl Local for IoFactoryObject { mod test { use rt::test::*; use rt::sched::Scheduler; - use rt::uv::uvio::UvEventLoop; use super::*; #[test] diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 23dc757800222..caf3e15e535af 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -55,6 +55,9 @@ Several modules in `core` are clients of `rt`: */ #[doc(hidden)]; +#[deny(unused_imports)]; +#[deny(unused_mut)]; +#[deny(unused_variable)]; use ptr::Ptr; @@ -228,8 +231,6 @@ pub fn context() -> RuntimeContext { fn test_context() { use unstable::run_in_bare_thread; use self::sched::{Scheduler, Coroutine}; - use rt::uv::uvio::UvEventLoop; - use cell::Cell; use rt::local::Local; use rt::test::new_test_uv_sched; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index a57a87ffba774..b0080a010140d 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -13,7 +13,6 @@ use sys; use cast::transmute; use cell::Cell; use clone::Clone; -use to_str::ToStr; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; @@ -24,7 +23,7 @@ use super::task::Task; use super::message_queue::MessageQueue; use rt::local_ptr; use rt::local::Local; -use rt::rtio::{IoFactoryObject, RemoteCallback}; +use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; /// The Scheduler is responsible for coordinating execution of Coroutines @@ -583,7 +582,6 @@ impl ClosureConverter for UnsafeTaskReceiver { mod test { use int; use cell::Cell; - use rt::uv::uvio::UvEventLoop; use unstable::run_in_bare_thread; use task::spawn; use rt::local::Local; @@ -751,13 +749,13 @@ mod test { let sched1_cell = Cell(sched1); let _thread1 = do Thread::start { - let mut sched1 = sched1_cell.take(); + let sched1 = sched1_cell.take(); sched1.run(); }; let sched2_cell = Cell(sched2); let _thread2 = do Thread::start { - let mut sched2 = sched2_cell.take(); + let sched2 = sched2_cell.take(); sched2.run(); }; } @@ -790,9 +788,6 @@ mod test { #[test] fn thread_ring() { use rt::comm::*; - use iter::Times; - use vec::OwnedVector; - use container::Container; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs index dfcac8eb088f7..e2873e78d805f 100644 --- a/src/libstd/rt/sleeper_list.rs +++ b/src/libstd/rt/sleeper_list.rs @@ -16,7 +16,7 @@ use vec::OwnedVector; use option::{Option, Some, None}; use cell::Cell; use unstable::sync::{Exclusive, exclusive}; -use rt::sched::{Scheduler, SchedHandle}; +use rt::sched::SchedHandle; use clone::Clone; pub struct SleeperList { diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index e05e2004e0b21..907d289fb0748 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -9,7 +9,7 @@ // except according to those terms. use uint; -use option::{Option, Some, None}; +use option::{Some, None}; use cell::Cell; use clone::Clone; use container::Container; @@ -42,7 +42,6 @@ pub fn new_test_uv_sched() -> Scheduler { pub fn run_in_newsched_task(f: ~fn()) { use super::sched::*; use unstable::run_in_bare_thread; - use rt::uv::uvio::UvEventLoop; let f = Cell(f); @@ -74,7 +73,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let mut handles = ~[]; let mut scheds = ~[]; - for uint::range(0, N) |i| { + for uint::range(0, N) |_| { let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); let handle = sched.make_handle(); @@ -102,7 +101,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let sched = scheds.pop(); let sched_cell = Cell(sched); let thread = do Thread::start { - let mut sched = sched_cell.take(); + let sched = sched_cell.take(); sched.run(); }; @@ -214,7 +213,6 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; - use rt::uv::uvio::UvEventLoop; let f = Cell(f); let thread = do Thread::start { diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 1ee6504d11fc5..0d9530239a3d9 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -24,9 +24,7 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; -use rt::work_queue::WorkQueue; use unstable::sync::{UnsafeAtomicRcBox, AtomicInt}; -use unstable::intrinsics; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -140,7 +138,7 @@ impl RemoteCallback for UvRemoteCallback { impl Drop for UvRemoteCallback { fn finalize(&self) { unsafe { - let mut this: &mut UvRemoteCallback = cast::transmute_mut(self); + let this: &mut UvRemoteCallback = cast::transmute_mut(self); let exit_flag_ptr = this.exit_flag.get(); (*exit_flag_ptr).store(1); this.async.send(); @@ -150,7 +148,6 @@ impl Drop for UvRemoteCallback { #[cfg(test)] mod test_remote { - use super::*; use cell; use cell::Cell; use rt::test::*; From e2bedb1b868a634885df9f8a277bec1915c98fc2 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 1 Jun 2013 02:11:38 -0700 Subject: [PATCH 20/32] core: Make atomic methods public --- src/libstd/unstable/atomics.rs | 62 +++++++++++++++++----------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/libstd/unstable/atomics.rs b/src/libstd/unstable/atomics.rs index ab2b5d8ea2b3b..58d0c01f990d0 100644 --- a/src/libstd/unstable/atomics.rs +++ b/src/libstd/unstable/atomics.rs @@ -75,7 +75,7 @@ pub enum Ordering { impl AtomicFlag { - fn new() -> AtomicFlag { + pub fn new() -> AtomicFlag { AtomicFlag { v: 0 } } @@ -83,7 +83,7 @@ impl AtomicFlag { * Clears the atomic flag */ #[inline(always)] - fn clear(&mut self, order: Ordering) { + pub fn clear(&mut self, order: Ordering) { unsafe {atomic_store(&mut self.v, 0, order)} } @@ -92,37 +92,37 @@ impl AtomicFlag { * flag. */ #[inline(always)] - fn test_and_set(&mut self, order: Ordering) -> bool { + pub fn test_and_set(&mut self, order: Ordering) -> bool { unsafe {atomic_compare_and_swap(&mut self.v, 0, 1, order) > 0} } } impl AtomicBool { - fn new(v: bool) -> AtomicBool { + pub fn new(v: bool) -> AtomicBool { AtomicBool { v: if v { 1 } else { 0 } } } #[inline(always)] - fn load(&self, order: Ordering) -> bool { + pub fn load(&self, order: Ordering) -> bool { unsafe { atomic_load(&self.v, order) > 0 } } #[inline(always)] - fn store(&mut self, val: bool, order: Ordering) { + pub fn store(&mut self, val: bool, order: Ordering) { let val = if val { 1 } else { 0 }; unsafe { atomic_store(&mut self.v, val, order); } } #[inline(always)] - fn swap(&mut self, val: bool, order: Ordering) -> bool { + pub fn swap(&mut self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; unsafe { atomic_swap(&mut self.v, val, order) > 0} } #[inline(always)] - fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool { + pub fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool { let old = if old { 1 } else { 0 }; let new = if new { 1 } else { 0 }; @@ -131,105 +131,105 @@ impl AtomicBool { } impl AtomicInt { - fn new(v: int) -> AtomicInt { + pub fn new(v: int) -> AtomicInt { AtomicInt { v:v } } #[inline(always)] - fn load(&self, order: Ordering) -> int { + pub fn load(&self, order: Ordering) -> int { unsafe { atomic_load(&self.v, order) } } #[inline(always)] - fn store(&mut self, val: int, order: Ordering) { + pub fn store(&mut self, val: int, order: Ordering) { unsafe { atomic_store(&mut self.v, val, order); } } #[inline(always)] - fn swap(&mut self, val: int, order: Ordering) -> int { + pub fn swap(&mut self, val: int, order: Ordering) -> int { unsafe { atomic_swap(&mut self.v, val, order) } } #[inline(always)] - fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int { + pub fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int { unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } } #[inline(always)] - fn fetch_add(&mut self, val: int, order: Ordering) -> int { + pub fn fetch_add(&mut self, val: int, order: Ordering) -> int { unsafe { atomic_add(&mut self.v, val, order) } } #[inline(always)] - fn fetch_sub(&mut self, val: int, order: Ordering) -> int { + pub fn fetch_sub(&mut self, val: int, order: Ordering) -> int { unsafe { atomic_sub(&mut self.v, val, order) } } } impl AtomicUint { - fn new(v: uint) -> AtomicUint { + pub fn new(v: uint) -> AtomicUint { AtomicUint { v:v } } #[inline(always)] - fn load(&self, order: Ordering) -> uint { + pub fn load(&self, order: Ordering) -> uint { unsafe { atomic_load(&self.v, order) } } #[inline(always)] - fn store(&mut self, val: uint, order: Ordering) { + pub fn store(&mut self, val: uint, order: Ordering) { unsafe { atomic_store(&mut self.v, val, order); } } #[inline(always)] - fn swap(&mut self, val: uint, order: Ordering) -> uint { + pub fn swap(&mut self, val: uint, order: Ordering) -> uint { unsafe { atomic_swap(&mut self.v, val, order) } } #[inline(always)] - fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint { + pub fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint { unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } } #[inline(always)] - fn fetch_add(&mut self, val: uint, order: Ordering) -> uint { + pub fn fetch_add(&mut self, val: uint, order: Ordering) -> uint { unsafe { atomic_add(&mut self.v, val, order) } } #[inline(always)] - fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint { + pub fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint { unsafe { atomic_sub(&mut self.v, val, order) } } } impl AtomicPtr { - fn new(p: *mut T) -> AtomicPtr { + pub fn new(p: *mut T) -> AtomicPtr { AtomicPtr { p:p } } #[inline(always)] - fn load(&self, order: Ordering) -> *mut T { + pub fn load(&self, order: Ordering) -> *mut T { unsafe { atomic_load(&self.p, order) } } #[inline(always)] - fn store(&mut self, ptr: *mut T, order: Ordering) { + pub fn store(&mut self, ptr: *mut T, order: Ordering) { unsafe { atomic_store(&mut self.p, ptr, order); } } #[inline(always)] - fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { + pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { unsafe { atomic_swap(&mut self.p, ptr, order) } } #[inline(always)] - fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { + pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { unsafe { atomic_compare_and_swap(&mut self.p, old, new, order) } } } impl AtomicOption { - fn new(p: ~T) -> AtomicOption { + pub fn new(p: ~T) -> AtomicOption { unsafe { AtomicOption { p: cast::transmute(p) @@ -237,7 +237,7 @@ impl AtomicOption { } } - fn empty() -> AtomicOption { + pub fn empty() -> AtomicOption { unsafe { AtomicOption { p: cast::transmute(0) @@ -246,7 +246,7 @@ impl AtomicOption { } #[inline(always)] - fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { + pub fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { unsafe { let val = cast::transmute(val); @@ -262,7 +262,7 @@ impl AtomicOption { } #[inline(always)] - fn take(&mut self, order: Ordering) -> Option<~T> { + pub fn take(&mut self, order: Ordering) -> Option<~T> { unsafe { self.swap(cast::transmute(0), order) } From 2e6d51f9cea14ff271223855454034b27ced4ce9 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 2 Jun 2013 02:13:24 -0700 Subject: [PATCH 21/32] std::rt: Use AtomicUint instead of intrinsics in comm --- src/libstd/rt/comm.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 26d02fb6640ac..7f93dae00b7c6 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -22,7 +22,7 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; -use unstable::intrinsics::{atomic_xchg, atomic_load}; +use unstable::atomics::{AtomicUint, SeqCst}; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; @@ -34,14 +34,14 @@ use cell::Cell; /// * 2 - both endpoints are alive /// * 1 - either the sender or the receiver is dead, determined by context /// * - A pointer to a blocked Task that can be transmuted to ~Task -type State = int; +type State = uint; static STATE_BOTH: State = 2; static STATE_ONE: State = 1; /// The heap-allocated structure shared between two endpoints. struct Packet { - state: State, + state: AtomicUint, payload: Option, } @@ -70,7 +70,7 @@ pub struct PortOneHack { pub fn oneshot() -> (PortOne, ChanOne) { let packet: ~Packet = ~Packet { - state: STATE_BOTH, + state: AtomicUint::new(STATE_BOTH), payload: None }; @@ -114,7 +114,7 @@ impl ChanOne { // reordering of the payload write. This also issues an // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. - let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE); + let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do @@ -175,7 +175,7 @@ impl PortOne { // of the payload. Also issues a release barrier to prevent reordering // of any previous writes to the task structure. let task_as_state: State = cast::transmute(task); - let oldstate = atomic_xchg(&mut (*packet).state, task_as_state); + let oldstate = (*packet).state.swap(task_as_state, SeqCst); match oldstate { STATE_BOTH => { // Data has not been sent. Now we're blocked. @@ -227,7 +227,7 @@ impl Peekable for PortOne { fn peek(&self) -> bool { unsafe { let packet: *mut Packet = self.inner.packet(); - let oldstate = atomic_load(&mut (*packet).state); + let oldstate = (*packet).state.load(SeqCst); match oldstate { STATE_BOTH => false, STATE_ONE => (*packet).payload.is_some(), @@ -244,7 +244,7 @@ impl Drop for ChanOneHack { unsafe { let this = cast::transmute_mut(self); - let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); + let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port still active. It will destroy the Packet. @@ -271,7 +271,7 @@ impl Drop for PortOneHack { unsafe { let this = cast::transmute_mut(self); - let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); + let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Chan still active. It will destroy the packet. From f7e242ab8a4ceffd87ec339086b7f8510e94aef1 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 3 Jun 2013 18:58:26 -0700 Subject: [PATCH 22/32] std::rt: Destroy the task start closure while in task context --- src/libstd/rt/sched.rs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index b0080a010140d..1d1c3aae1f1a3 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -536,6 +536,7 @@ pub impl Coroutine { priv fn build_start_wrapper(start: ~fn()) -> ~fn() { // XXX: The old code didn't have this extra allocation + let start_cell = Cell(start); let wrapper: ~fn() = || { // This is the first code to execute after the initial // context switch to the task. The previous context may @@ -547,7 +548,19 @@ pub impl Coroutine { let sched = Local::unsafe_borrow::(); let task = (*sched).current_task.get_mut_ref(); // FIXME #6141: shouldn't neet to put `start()` in another closure - task.task.run(||start()); + let start_cell = Cell(start_cell.take()); + do task.task.run { + // N.B. Removing `start` from the start wrapper closure + // by emptying a cell is critical for correctness. The ~Task + // pointer, and in turn the closure used to initialize the first + // call frame, is destroyed in scheduler context, not task context. + // So any captured closures must not contain user-definable dtors + // that expect to be in task context. By moving `start` out of + // the closure, all the user code goes out of scope while + // the task is still running. + let start = start_cell.take(); + start(); + }; } let sched = Local::take::(); @@ -840,4 +853,26 @@ mod test { } + #[test] + fn start_closure_dtor() { + use ops::Drop; + + // Regression test that the `start` task entrypoint can contain dtors + // that use task resources + do run_in_newsched_task { + struct S { field: () } + + impl Drop for S { + fn finalize(&self) { + let _foo = @0; + } + } + + let s = S { field: () }; + + do spawntask { + let _ss = &s; + } + } + } } From 1507df87ccc93091ef5d918dc2c660f2e6f5a928 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 3 Jun 2013 19:15:45 -0700 Subject: [PATCH 23/32] std::rt: Remove in incorrect assert --- src/libstd/rt/sched.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 1d1c3aae1f1a3..df231f6d88aec 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -148,7 +148,9 @@ pub impl Scheduler { } let sched = Local::take::(); - assert!(sched.work_queue.is_empty()); + // XXX: Reenable this once we're using a per-task queue. With a shared + // queue this is not true + //assert!(sched.work_queue.is_empty()); rtdebug!("scheduler metrics: %s\n", sched.metrics.to_str()); return sched; } From 422f663a988370a93a6ae21b92215e49750c2e87 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 1 Jun 2013 02:11:59 -0700 Subject: [PATCH 24/32] core::rt: Implement SharedChan --- src/libstd/rt/comm.rs | 67 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 7f93dae00b7c6..b97a4df224576 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -22,10 +22,12 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; -use unstable::atomics::{AtomicUint, SeqCst}; +use unstable::atomics::{AtomicUint, AtomicOption, SeqCst}; +use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; +use clone::Clone; /// A combined refcount / ~Task pointer. /// @@ -312,16 +314,19 @@ struct StreamPayload { next: PortOne> } +type StreamChanOne = ChanOne>; +type StreamPortOne = PortOne>; + /// A channel with unbounded size. pub struct Chan { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell>> + next: Cell> } /// An port with unbounded size. pub struct Port { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell>> + next: Cell> } pub fn stream() -> (Port, Chan) { @@ -374,6 +379,43 @@ impl Peekable for Port { } } +pub struct SharedChan { + // Just like Chan, but a shared AtomicOption instead of Cell + priv next: UnsafeAtomicRcBox>> +} + +impl SharedChan { + pub fn new(chan: Chan) -> SharedChan { + let next = chan.next.take(); + let next = AtomicOption::new(~next); + SharedChan { next: UnsafeAtomicRcBox::new(next) } + } +} + +impl GenericChan for SharedChan { + fn send(&self, val: T) { + self.try_send(val); + } +} + +impl GenericSmartChan for SharedChan { + fn try_send(&self, val: T) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) + } + } +} + +impl Clone for SharedChan { + fn clone(&self) -> SharedChan { + SharedChan { + next: self.next.clone() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -641,5 +683,24 @@ mod test { for 10000.times { port.recv() } } } + + #[test] + fn shared_chan_stress() { + do run_in_mt_newsched_task { + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let total = stress_factor() + 100; + for total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + + for total.times { + port.recv(); + } + } + } } From 51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 1 Jun 2013 13:34:05 -0700 Subject: [PATCH 25/32] core::rt: Add SharedPort --- src/libstd/rt/comm.rs | 132 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index b97a4df224576..4772a8596bfb6 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -416,6 +416,61 @@ impl Clone for SharedChan { } } +pub struct SharedPort { + // The next port on which we will receive the next port on which we will receive T + priv next_link: UnsafeAtomicRcBox>>> +} + +impl SharedPort { + pub fn new(port: Port) -> SharedPort { + // Put the data port into a new link pipe + let next_data_port = port.next.take(); + let (next_link_port, next_link_chan) = oneshot(); + next_link_chan.send(next_data_port); + let next_link = AtomicOption::new(~next_link_port); + SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) } + } +} + +impl GenericPort for SharedPort { + fn recv(&self) -> T { + match self.try_recv() { + Some(val) => val, + None => { + fail!("receiving on a closed channel"); + } + } + } + + fn try_recv(&self) -> Option { + unsafe { + let (next_link_port, next_link_chan) = oneshot(); + let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst); + let link_port = link_port.unwrap(); + let data_port = link_port.recv(); + let (next_data_port, res) = match data_port.try_recv() { + Some(StreamPayload { val, next }) => { + (next, Some(val)) + } + None => { + let (next_data_port, _) = oneshot(); + (next_data_port, None) + } + }; + next_link_chan.send(next_data_port); + return res; + } + } +} + +impl Clone for SharedPort { + fn clone(&self) -> SharedPort { + SharedPort { + next_link: self.next_link.clone() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -702,5 +757,82 @@ mod test { } } } + + #[test] + fn shared_port_stress() { + do run_in_mt_newsched_task { + // XXX: Removing these type annotations causes an ICE + let (end_port, end_chan) = stream::<()>(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let total = stress_factor() + 100; + for total.times { + let end_chan_clone = end_chan.clone(); + let port_clone = port.clone(); + do spawntask_random { + port_clone.recv(); + end_chan_clone.send(()); + } + } + + for total.times { + chan.send(()); + } + + for total.times { + end_port.recv(); + } + } + } + + #[test] + fn shared_port_close_simple() { + do run_in_mt_newsched_task { + let (port, chan) = stream::<()>(); + let port = SharedPort::new(port); + { let _chan = chan; } + assert!(port.try_recv().is_none()); + } + } + + #[test] + fn shared_port_close() { + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let chan = SharedChan::new(chan); + let send_total = 10; + let recv_total = 20; + do spawntask_random { + for send_total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + } + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for recv_total.times { + let port_clone = port.clone(); + let end_chan_clone = end_chan_clone.clone(); + do spawntask_random { + let recvd = port_clone.try_recv().is_some(); + end_chan_clone.send(recvd); + } + } + } + + let mut recvd = 0; + for recv_total.times { + recvd += if end_port.recv() { 1 } else { 0 }; + } + + assert!(recvd == send_total); + } + } } From ece38b3c7e16be1bedb45e552a127fe75bdb726a Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 1 Jun 2013 14:03:38 -0700 Subject: [PATCH 26/32] core::rt: Add `MegaPipe`, an unbounded, multiple producer/consumer, lock-free queue --- src/libstd/rt/comm.rs | 71 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 4772a8596bfb6..ef2091f789c08 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -471,6 +471,44 @@ impl Clone for SharedPort { } } +// XXX: Need better name +type MegaPipe = (SharedPort, SharedChan); + +pub fn megapipe() -> MegaPipe { + let (port, chan) = stream(); + (SharedPort::new(port), SharedChan::new(chan)) +} + +impl GenericChan for MegaPipe { + fn send(&self, val: T) { + match *self { + (_, ref c) => c.send(val) + } + } +} + +impl GenericSmartChan for MegaPipe { + fn try_send(&self, val: T) -> bool { + match *self { + (_, ref c) => c.try_send(val) + } + } +} + +impl GenericPort for MegaPipe { + fn recv(&self) -> T { + match *self { + (ref p, _) => p.recv() + } + } + + fn try_recv(&self) -> Option { + match *self { + (ref p, _) => p.try_recv() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -834,5 +872,38 @@ mod test { assert!(recvd == send_total); } } + + #[test] + fn megapipe_stress() { + use rand; + use rand::RngUtil; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let pipe = megapipe(); + let total = stress_factor() + 10; + let mut rng = rand::rng(); + for total.times { + let msgs = rng.gen_uint_range(0, 10); + let pipe_clone = pipe.clone(); + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for msgs.times { + pipe_clone.send(()); + } + for msgs.times { + pipe_clone.recv(); + } + } + + end_chan_clone.send(()); + } + + for total.times { + end_port.recv(); + } + } + } } From 80849e78a847f7834f71b36a66251ba0ea37a982 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 1 Jun 2013 20:31:33 -0700 Subject: [PATCH 27/32] std: Fix stage0 build Conflicts: src/libstd/rt/comm.rs --- src/libstd/rt/comm.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index ef2091f789c08..449ac9e14a41b 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -399,6 +399,12 @@ impl GenericChan for SharedChan { } impl GenericSmartChan for SharedChan { + #[cfg(stage0)] // odd type checking errors + fn try_send(&self, _val: T) -> bool { + fail!() + } + + #[cfg(not(stage0))] fn try_send(&self, val: T) -> bool { unsafe { let (next_pone, next_cone) = oneshot(); @@ -442,6 +448,12 @@ impl GenericPort for SharedPort { } } + #[cfg(stage0)] // odd type checking errors + fn try_recv(&self) -> Option { + fail!() + } + + #[cfg(not(stage0))] fn try_recv(&self) -> Option { unsafe { let (next_link_port, next_link_chan) = oneshot(); From f9a5005f52d528797d6b98a3bee73ab2d71b9aa3 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 5 Jun 2013 22:34:35 -0700 Subject: [PATCH 28/32] rt: Add rust_get_num_cpus --- src/rt/rust_builtin.cpp | 7 +++++++ src/rt/rust_env.cpp | 6 +++--- src/rt/rustrt.def.in | 1 + 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 5e7357c9b7b25..fe4e75fb8d21f 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -930,6 +930,13 @@ rust_begin_unwind(uintptr_t token) { #endif } +extern int get_num_cpus(); + +extern "C" CDECL uintptr_t +rust_get_num_cpus() { + return get_num_cpus(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_env.cpp b/src/rt/rust_env.cpp index ed38be3550f74..c3d38851e7bb2 100644 --- a/src/rt/rust_env.cpp +++ b/src/rt/rust_env.cpp @@ -40,7 +40,7 @@ rust_drop_env_lock() { } #if defined(__WIN32__) -static int +int get_num_cpus() { SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); @@ -48,7 +48,7 @@ get_num_cpus() { return (int) sysinfo.dwNumberOfProcessors; } #elif defined(__BSD__) -static int +int get_num_cpus() { /* swiped from http://stackoverflow.com/questions/150355/ programmatically-find-the-number-of-cores-on-a-machine */ @@ -75,7 +75,7 @@ get_num_cpus() { return numCPU; } #elif defined(__GNUC__) -static int +int get_num_cpus() { return sysconf(_SC_NPROCESSORS_ONLN); } diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index e3e522aa7ceec..9b49583519eca 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -239,3 +239,4 @@ rust_valgrind_stack_deregister rust_take_env_lock rust_drop_env_lock rust_update_log_settings +rust_get_num_cpus \ No newline at end of file From 8afec77cb07394c5f2d54dcc0ebe075fc304efb7 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Wed, 5 Jun 2013 22:35:23 -0700 Subject: [PATCH 29/32] std::rt: Configure test threads with RUST_TEST_THREADS. Default is ncores x2 --- src/libstd/rt/test.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 907d289fb0748..c8df3a6120338 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -59,13 +59,24 @@ pub fn run_in_newsched_task(f: ~fn()) { /// in one of the schedulers. The schedulers will stay alive /// until the function `f` returns. pub fn run_in_mt_newsched_task(f: ~fn()) { + use libc; + use os; + use from_str::FromStr; use rt::uv::uvio::UvEventLoop; use rt::sched::Shutdown; let f_cell = Cell(f); do run_in_bare_thread { - static N: uint = 4; + let nthreads = match os::getenv("RUST_TEST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code + // to force the OS to preempt them frequently. + // Assuming that this help stress test concurrent types. + rust_get_num_cpus() * 2 + } + }; let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); @@ -73,7 +84,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let mut handles = ~[]; let mut scheds = ~[]; - for uint::range(0, N) |_| { + for uint::range(0, nthreads) |_| { let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); let handle = sched.make_handle(); @@ -111,6 +122,10 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { // Wait for schedulers let _threads = threads; } + + extern { + fn rust_get_num_cpus() -> libc::uintptr_t; + } } /// Test tasks will abort on failure instead of unwinding From d6ccc6bc99386ae20ac03b68e7ec504a16068242 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 6 Jun 2013 00:01:22 -0700 Subject: [PATCH 30/32] std::rt: Fix stream test to be parallel --- src/libstd/rt/comm.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 449ac9e14a41b..b00df78f433d7 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -748,7 +748,7 @@ mod test { #[test] fn stream_send_recv_stress() { for stress_factor().times { - do run_in_newsched_task { + do run_in_mt_newsched_task { let (port, chan) = stream::<~int>(); send(chan, 0); @@ -758,18 +758,18 @@ mod test { if i == 10 { return } let chan_cell = Cell(chan); - let _thread = do spawntask_thread { + do spawntask_random { let chan = chan_cell.take(); chan.send(~i); send(chan, i + 1); - }; + } } fn recv(port: Port<~int>, i: int) { if i == 10 { return } let port_cell = Cell(port); - let _thread = do spawntask_thread { + do spawntask_random { let port = port_cell.take(); assert!(port.recv() == ~i); recv(port, i + 1); From d4de99aa6c53b0eb0d5be2ccfc62e2c89b2cd2df Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 6 Jun 2013 17:46:45 -0700 Subject: [PATCH 31/32] std::rt: Fix a race in the UvRemoteCallback dtor --- src/libstd/rt/uv/uvio.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 0d9530239a3d9..0f98ab11513d6 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -24,7 +24,7 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; -use unstable::sync::{UnsafeAtomicRcBox, AtomicInt}; +use unstable::sync::{Exclusive, exclusive}; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -105,21 +105,20 @@ fn test_callback_run_once() { pub struct UvRemoteCallback { // The uv async handle for triggering the callback async: AsyncWatcher, - // An atomic flag to tell the callback to exit, - // set from the dtor. - exit_flag: UnsafeAtomicRcBox + // A flag to tell the callback to exit, set from the dtor. This is + // almost never contested - only in rare races with the dtor. + exit_flag: Exclusive } impl UvRemoteCallback { pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback { - let exit_flag = UnsafeAtomicRcBox::new(AtomicInt::new(0)); + let exit_flag = exclusive(false); let exit_flag_clone = exit_flag.clone(); let async = do AsyncWatcher::new(loop_) |watcher, status| { assert!(status.is_none()); f(); - let exit_flag_ptr = exit_flag_clone.get(); - unsafe { - if (*exit_flag_ptr).load() == 1 { + do exit_flag_clone.with_imm |&should_exit| { + if should_exit { watcher.close(||()); } } @@ -139,9 +138,14 @@ impl Drop for UvRemoteCallback { fn finalize(&self) { unsafe { let this: &mut UvRemoteCallback = cast::transmute_mut(self); - let exit_flag_ptr = this.exit_flag.get(); - (*exit_flag_ptr).store(1); - this.async.send(); + do this.exit_flag.with |should_exit| { + // NB: These two things need to happen atomically. Otherwise + // the event handler could wake up due to a *previous* + // signal and see the exit flag, destroying the handle + // before the final send. + *should_exit = true; + this.async.send(); + } } } } From 84d269592168b2e8ca9784ada5d86ea6cdb9de9f Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 10 Jun 2013 17:46:49 -0700 Subject: [PATCH 32/32] std::rt: Work around a dynamic borrowck bug --- src/libstd/rt/io/extensions.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/libstd/rt/io/extensions.rs b/src/libstd/rt/io/extensions.rs index fcbf31e87f2c0..ad9658e48ba1f 100644 --- a/src/libstd/rt/io/extensions.rs +++ b/src/libstd/rt/io/extensions.rs @@ -749,8 +749,6 @@ mod test { #[should_fail] #[ignore(cfg(windows))] fn push_bytes_fail_reset_len() { - use unstable::finally::Finally; - // push_bytes unsafely sets the vector length. This is testing that // upon failure the length is reset correctly. let mut reader = MockReader::new(); @@ -772,7 +770,8 @@ mod test { reader.push_bytes(&mut *buf, 4); }).finally { // NB: Using rtassert here to trigger abort on failure since this is a should_fail test - rtassert!(*buf == ~[8, 9, 10]); + // FIXME: #7049 This fails because buf is still borrowed + //rtassert!(*buf == ~[8, 9, 10]); } }