From 729d7ccd2d25b5cf94d9d194a7210272db70a19c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Fri, 4 May 2018 06:55:35 +0200 Subject: [PATCH 01/13] Add thread-local values which are preserved with jobs --- rayon-core/Cargo.toml | 4 ++++ rayon-core/src/job.rs | 14 ++++++++++++++ rayon-core/src/lib.rs | 3 +++ rayon-core/src/tlv.rs | 30 ++++++++++++++++++++++++++++++ 4 files changed, 51 insertions(+) create mode 100644 rayon-core/src/tlv.rs diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 14230aa2b..209ffe7ef 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -13,6 +13,10 @@ readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] +[features] +default = ["tlv"] +tlv = [] + [dependencies] num_cpus = "1.2" libc = "0.2.16" diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 8ddc3371c..3acce305c 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -3,6 +3,8 @@ use std::any::Any; use std::cell::UnsafeCell; use std::mem; use unwind; +#[cfg(feature = "tlv")] +use tlv; pub enum JobResult { None, @@ -75,6 +77,8 @@ where pub latch: L, func: UnsafeCell>, result: UnsafeCell>, + #[cfg(feature = "tlv")] + tlv: usize, } impl StackJob @@ -88,6 +92,8 @@ where latch: latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), + #[cfg(feature = "tlv")] + tlv: tlv::get(), } } @@ -112,6 +118,8 @@ where { unsafe fn execute(this: *const Self) { let this = &*this; + #[cfg(feature = "tlv")] + tlv::set(this.tlv); let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = match unwind::halt_unwinding(|| func(true)) { @@ -134,6 +142,8 @@ where BODY: FnOnce() + Send, { job: UnsafeCell>, + #[cfg(feature = "tlv")] + tlv: usize, } impl HeapJob @@ -143,6 +153,8 @@ where pub fn new(func: BODY) -> Self { HeapJob { job: UnsafeCell::new(Some(func)), + #[cfg(feature = "tlv")] + tlv: tlv::get(), } } @@ -161,6 +173,8 @@ where { unsafe fn execute(this: *const Self) { let this: Box = mem::transmute(this); + #[cfg(feature = "tlv")] + tlv::set(this.tlv); let job = (*this.job.get()).take().unwrap(); job(); } diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 3c713080d..08b5f49c7 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -57,6 +57,9 @@ mod util; mod compile_fail; mod test; +#[cfg(feature = "tlv")] +pub mod tlv; + #[cfg(rayon_unstable)] pub mod internal; pub use join::{join, join_context}; diff --git a/rayon-core/src/tlv.rs b/rayon-core/src/tlv.rs new file mode 100644 index 000000000..f035d12d9 --- /dev/null +++ b/rayon-core/src/tlv.rs @@ -0,0 +1,30 @@ +//! Allows access to the Rayon's thread local value +//! which is preserved when moving jobs across threads + +use std::cell::Cell; + +thread_local!(pub(crate) static TLV: Cell = Cell::new(0)); + +/// Sets the current thread-local value to `value` inside the closure. +/// The old value is restored when the closure ends +pub fn with R, R>(value: usize, f: F) -> R { + struct Reset(usize); + impl Drop for Reset { + fn drop(&mut self) { + TLV.with(|tlv| tlv.set(self.0)); + } + } + let _reset = Reset(get()); + TLV.with(|tlv| tlv.set(value)); + f() +} + +/// Sets the current thread-local value +pub fn set(value: usize) { + TLV.with(|tlv| tlv.set(value)); +} + +/// Returns the current thread-local value +pub fn get() -> usize { + TLV.with(|tlv| tlv.get()) +} From e58b9c3e3ebe6a2b72079f9c451d25ebd5d7d672 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Fri, 4 May 2018 06:57:58 +0200 Subject: [PATCH 02/13] Add a main_handler which is passed an argument to run the proper main loop --- rayon-core/src/lib.rs | 29 +++++++++++++++++++++++++++++ rayon-core/src/registry.rs | 21 +++++++++++++++++++-- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 08b5f49c7..3020868a1 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -144,6 +144,9 @@ pub struct ThreadPoolBuilder { /// Closure invoked on worker thread exit. exit_handler: Option>, + /// Closure invoked on worker thread start. + main_handler: Option>, + /// If false, worker threads will execute spawned jobs in a /// "depth-first" fashion. If true, they will do a "breadth-first" /// fashion. Depth-first is the default. @@ -173,6 +176,12 @@ type StartHandler = Fn(usize) + Send + Sync; /// Note that this same closure may be invoked multiple times in parallel. type ExitHandler = Fn(usize) + Send + Sync; +/// The type for a closure that gets invoked with a +/// function which runs rayon tasks. +/// The closure is passed the index of the thread on which it is invoked. +/// Note that this same closure may be invoked multiple times in parallel. +type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync; + impl ThreadPoolBuilder { /// Creates and returns a valid rayon thread pool builder, but does not initialize it. pub fn new() -> ThreadPoolBuilder { @@ -383,6 +392,23 @@ impl ThreadPoolBuilder { self.exit_handler = Some(Box::new(exit_handler)); self } + + /// Takes the current thread main callback, leaving `None`. + fn take_main_handler(&mut self) -> Option> { + self.main_handler.take() + } + + /// Set a callback to be invoked on thread main. + /// + /// The closure is passed the index of the thread on which it is invoked. + /// Note that this same closure may be invoked multiple times in parallel. + /// If this closure panics, the panic will be passed to the panic handler. + pub fn main_handler(mut self, main_handler: H) -> ThreadPoolBuilder + where H: Fn(usize, &mut FnMut()) + Send + Sync + 'static + { + self.main_handler = Some(Box::new(main_handler)); + self + } } #[allow(deprecated)] @@ -500,6 +526,7 @@ impl fmt::Debug for ThreadPoolBuilder { ref panic_handler, ref stack_size, ref start_handler, + ref main_handler, ref exit_handler, ref breadth_first, } = *self; @@ -516,6 +543,7 @@ impl fmt::Debug for ThreadPoolBuilder { let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); + let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder); f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) @@ -524,6 +552,7 @@ impl fmt::Debug for ThreadPoolBuilder { .field("stack_size", &stack_size) .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) + .field("main_handler", &main_handler) .field("breadth_first", &breadth_first) .finish() } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index a233768fb..bfa7eee8c 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -18,7 +18,8 @@ use std::thread; use std::usize; use unwind; use util::leak; -use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder}; +use {ErrorKind, ExitHandler, PanicHandler, StartHandler, + MainHandler, ThreadPoolBuildError, ThreadPoolBuilder}; pub struct Registry { thread_infos: Vec, @@ -28,6 +29,7 @@ pub struct Registry { panic_handler: Option>, start_handler: Option>, exit_handler: Option>, + main_handler: Option>, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: @@ -117,6 +119,7 @@ impl Registry { terminate_latch: CountLatch::new(), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), + main_handler: builder.take_main_handler(), exit_handler: builder.take_exit_handler(), }); @@ -689,7 +692,21 @@ unsafe fn main_loop( } } - worker_thread.wait_until(®istry.terminate_latch); + let mut work = || { + worker_thread.wait_until(®istry.terminate_latch); + }; + + if let Some(ref handler) = registry.main_handler { + match unwind::halt_unwinding(|| handler(index, &mut work)) { + Ok(()) => { + } + Err(err) => { + registry.handle_panic(err); + } + } + } else { + work(); + } // Should not be any work left in our queue. debug_assert!(worker_thread.take_local_job().is_none()); From 9815d97db161c0226e8eba2ab69505f4961d0fee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Fri, 4 May 2018 06:59:33 +0200 Subject: [PATCH 03/13] Add the ability to create scoped thread pools --- rayon-core/src/registry.rs | 3 +-- rayon-core/src/thread_pool/mod.rs | 34 +++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index bfa7eee8c..96b514655 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -219,8 +219,7 @@ impl Registry { /// Waits for the worker threads to stop. This is used for testing /// -- so we can check that termination actually works. - #[cfg(test)] - pub fn wait_until_stopped(&self) { + pub(crate) fn wait_until_stopped(&self) { for info in &self.thread_infos { info.stopped.wait(); } diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index d13f4e5f9..690052a90 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -86,6 +86,40 @@ impl ThreadPool { &DEFAULT_THREAD_POOL } + /// Creates a scoped thread pool + pub fn scoped_pool(builder: ThreadPoolBuilder, + main_handler: H, + with_pool: F) -> Result + where F: FnOnce(&ThreadPool) -> R, + H: Fn(&mut FnMut()) + Send + Sync + { + struct Handler(*const ()); + unsafe impl Send for Handler {} + unsafe impl Sync for Handler {} + + let handler = Handler(&main_handler as *const _ as *const ()); + + let builder = builder.main_handler(move |_, worker| { + let handler = unsafe { &*(handler.0 as *const H) }; + handler(worker); + }); + + let pool = builder.build()?; + + struct JoinRegistry(Arc); + + impl Drop for JoinRegistry { + fn drop(&mut self) { + self.0.terminate(); + self.0.wait_until_stopped(); + } + } + + let _join_registry = JoinRegistry(pool.registry.clone()); + + Ok(with_pool(&pool)) + } + /// Executes `op` within the threadpool. Any attempts to use /// `join`, `scope`, or parallel iterators will then operate /// within that threadpool. From 046336e36dbea98006b11457e360b5fc5a436cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Fri, 4 May 2018 07:00:29 +0200 Subject: [PATCH 04/13] Add a WorkerLocal type which allow you to hold a value per Rayon worker thread --- rayon-core/src/lib.rs | 3 ++ rayon-core/src/registry.rs | 4 +- rayon-core/src/worker_local.rs | 74 ++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 rayon-core/src/worker_local.rs diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 3020868a1..541325599 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -50,6 +50,7 @@ mod registry; mod scope; mod sleep; mod spawn; +mod worker_local; mod thread_pool; mod unwind; mod util; @@ -65,6 +66,8 @@ pub mod internal; pub use join::{join, join_context}; pub use scope::{scope, Scope}; pub use spawn::spawn; +pub use worker_local::WorkerLocal; + pub use thread_pool::current_thread_has_pending_tasks; pub use thread_pool::current_thread_index; pub use thread_pool::ThreadPool; diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 96b514655..795662e75 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -480,7 +480,7 @@ pub struct WorkerThread { /// the "worker" half of our local deque worker: Deque, - index: usize, + pub(crate) index: usize, /// are these workers configured to steal breadth-first or not? breadth_first: bool, @@ -488,7 +488,7 @@ pub struct WorkerThread { /// A weak random number generator. rng: XorShift64Star, - registry: Arc, + pub(crate) registry: Arc, } // This is a bit sketchy, but basically: the WorkerThread is diff --git a/rayon-core/src/worker_local.rs b/rayon-core/src/worker_local.rs new file mode 100644 index 000000000..4b92bd939 --- /dev/null +++ b/rayon-core/src/worker_local.rs @@ -0,0 +1,74 @@ +use registry::{Registry, WorkerThread}; +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; + +#[repr(align(64))] +#[derive(Debug)] +struct CacheAligned(T); + +/// Holds worker-locals values for each thread in a thread pool. +/// You can only access the worker local value through the Deref impl +/// on the thread pool it was constructed on. It will panic otherwise +pub struct WorkerLocal { + locals: Vec>, + registry: Arc, +} + +unsafe impl Send for WorkerLocal {} +unsafe impl Sync for WorkerLocal {} + +impl WorkerLocal { + /// Creates a new worker local where the `initial` closure computes the + /// value this worker local should take for each thread in the thread pool. + #[inline] + pub fn new T>(mut initial: F) -> WorkerLocal { + let registry = Registry::current(); + WorkerLocal { + locals: (0..registry.num_threads()) + .map(|i| CacheAligned(initial(i))) + .collect(), + registry, + } + } + + /// Returns the worker-local value for each thread + #[inline] + pub fn into_inner(self) -> Vec { + self.locals.into_iter().map(|c| c.0).collect() + } + + fn current(&self) -> &T { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() + || &*(*worker_thread).registry as *const _ != &*self.registry as *const _ + { + panic!("WorkerLocal can only be used on the thread pool it was created on") + } + &self.locals[(*worker_thread).index].0 + } + } +} + +impl WorkerLocal> { + /// Joins the elements of all the worker locals into one Vec + pub fn join(self) -> Vec { + self.into_inner().into_iter().flat_map(|v| v).collect() + } +} + +impl fmt::Debug for WorkerLocal { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.locals, f) + } +} + +impl Deref for WorkerLocal { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &T { + self.current() + } +} From 0f55a9956583f9b372d24bf012b781b9a5d56f7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Mon, 14 May 2018 02:58:44 +0200 Subject: [PATCH 05/13] Add deadlock detection --- rayon-core/src/lib.rs | 25 ++++++++++- rayon-core/src/registry.rs | 43 ++++++++++++++----- rayon-core/src/sleep/README.md | 33 +++++++++++++++ rayon-core/src/sleep/mod.rs | 76 ++++++++++++++++++++++++++++++---- 4 files changed, 156 insertions(+), 21 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 541325599..ea39ce35a 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -20,8 +20,6 @@ //! succeed. #![doc(html_root_url = "https://docs.rs/rayon-core/1.4")] -#![deny(missing_debug_implementations)] -#![deny(missing_docs)] use std::any::Any; use std::env; @@ -65,6 +63,7 @@ pub mod tlv; pub mod internal; pub use join::{join, join_context}; pub use scope::{scope, Scope}; +pub use registry::{Registry, mark_blocked, mark_unblocked}; pub use spawn::spawn; pub use worker_local::WorkerLocal; @@ -141,6 +140,9 @@ pub struct ThreadPoolBuilder { /// The stack size for the created worker threads stack_size: Option, + /// Closure invoked on deadlock. + deadlock_handler: Option>, + /// Closure invoked on worker thread start. start_handler: Option>, @@ -169,6 +171,9 @@ pub struct Configuration { /// may be invoked multiple times in parallel. type PanicHandler = Fn(Box) + Send + Sync; +/// The type for a closure that gets invoked when the Rayon thread pool deadlocks +type DeadlockHandler = Fn() + Send + Sync; + /// The type for a closure that gets invoked when a thread starts. The /// closure is passed the index of the thread on which it is invoked. /// Note that this same closure may be invoked multiple times in parallel. @@ -358,6 +363,19 @@ impl ThreadPoolBuilder { self.breadth_first } + /// Takes the current deadlock callback, leaving `None`. + fn take_deadlock_handler(&mut self) -> Option> { + self.deadlock_handler.take() + } + + /// Set a callback to be invoked on current deadlock. + pub fn deadlock_handler(mut self, deadlock_handler: H) -> ThreadPoolBuilder + where H: Fn() + Send + Sync + 'static + { + self.deadlock_handler = Some(Box::new(deadlock_handler)); + self + } + /// Takes the current thread start callback, leaving `None`. fn take_start_handler(&mut self) -> Option> { self.start_handler.take() @@ -528,6 +546,7 @@ impl fmt::Debug for ThreadPoolBuilder { ref get_thread_name, ref panic_handler, ref stack_size, + ref deadlock_handler, ref start_handler, ref main_handler, ref exit_handler, @@ -544,6 +563,7 @@ impl fmt::Debug for ThreadPoolBuilder { } let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); + let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder); let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder); @@ -553,6 +573,7 @@ impl fmt::Debug for ThreadPoolBuilder { .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) + .field("deadlock_handler", &deadlock_handler) .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) .field("main_handler", &main_handler) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 795662e75..d8cb8cf74 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -18,7 +18,7 @@ use std::thread; use std::usize; use unwind; use util::leak; -use {ErrorKind, ExitHandler, PanicHandler, StartHandler, +use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler, MainHandler, ThreadPoolBuildError, ThreadPoolBuilder}; pub struct Registry { @@ -27,6 +27,7 @@ pub struct Registry { sleep: Sleep, job_uninjector: Stealer, panic_handler: Option>, + deadlock_handler: Option>, start_handler: Option>, exit_handler: Option>, main_handler: Option>, @@ -114,10 +115,11 @@ impl Registry { let registry = Arc::new(Registry { thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(), state: Mutex::new(RegistryState::new(inj_worker)), - sleep: Sleep::new(), + sleep: Sleep::new(n_threads), job_uninjector: inj_stealer, terminate_latch: CountLatch::new(), panic_handler: builder.take_panic_handler(), + deadlock_handler: builder.take_deadlock_handler(), start_handler: builder.take_start_handler(), main_handler: builder.take_main_handler(), exit_handler: builder.take_exit_handler(), @@ -367,14 +369,11 @@ impl Registry { { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); - let job = StackJob::new( - |injected| { - let worker_thread = WorkerThread::current(); - assert!(injected && !worker_thread.is_null()); - op(&*worker_thread, true) - }, - LockLatch::new(), - ); + let job = StackJob::new(|injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, LockLatch::new()); self.inject(&[job.as_job_ref()]); job.latch.wait(); job.into_result() @@ -436,6 +435,24 @@ impl Registry { } } +/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler +/// if no other worker thread is active +#[inline] +pub fn mark_blocked() { + let worker_thread = WorkerThread::current(); + assert!(!worker_thread.is_null()); + unsafe { + let registry = &(*worker_thread).registry; + registry.sleep.mark_blocked(®istry.deadlock_handler) + } +} + +/// Mark a previously blocked Rayon worker thread as unblocked +#[inline] +pub fn mark_unblocked(registry: &Registry) { + registry.sleep.mark_unblocked() +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct RegistryId { addr: usize, @@ -594,7 +611,11 @@ impl WorkerThread { yields = self.registry.sleep.work_found(self.index, yields); self.execute(job); } else { - yields = self.registry.sleep.no_work_found(self.index, yields); + yields = self.registry.sleep.no_work_found( + self.index, + yields, + &self.registry.deadlock_handler + ); } } diff --git a/rayon-core/src/sleep/README.md b/rayon-core/src/sleep/README.md index bc2af869f..d889de5a5 100644 --- a/rayon-core/src/sleep/README.md +++ b/rayon-core/src/sleep/README.md @@ -386,3 +386,36 @@ some of them were hit hard: - 8-10% overhead on nbody-parreduce - 35% overhead on increment-all - 245% overhead on join-recursively + +# Deadlock detection + +This module tracks a number of variables in order to detect deadlocks due to user code blocking. +These variables are stored in the `SleepData` struct which itself is kept behind a mutex. +It contains the following fields: +- `worker_count` - The number of threads in the thread pool. +- `active_threads` - The number of threads in the thread pool which are running + and aren't blocked in user code or sleeping. +- `blocked_threads` - The number of threads which are blocked in user code. + This doesn't include threads blocked by Rayon. + +User code can indicate blocking by calling `mark_blocked` before blocking and +calling `mark_unblocked` before unblocking a thread. +This will adjust `active_threads` and `blocked_threads` accordingly. + +When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to +`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked +by user code. + +A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0. +If we ignored `blocked_threads` we would have a deadlock +immediately when creating the thread pool. +We would also deadlock once the thread pool ran out of work. +It is not possible for Rayon itself to deadlock. +Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks. + +We check for the deadlock condition when +threads fall asleep in `mark_unblocked` and in `Sleep::sleep`. +If there's a deadlock detected we call the user provided deadlock handler while we hold the +lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and +`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread. +Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep. \ No newline at end of file diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 7582f98f7..5853b8a40 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -1,15 +1,39 @@ //! Code that decides when workers should go to sleep. See README.md //! for an overview. +use DeadlockHandler; use log::Event::*; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Condvar, Mutex}; use std::thread; use std::usize; +struct SleepData { + /// The number of threads in the thread pool. + worker_count: usize, + + /// The number of threads in the thread pool which are running and + /// aren't blocked in user code or sleeping. + active_threads: usize, + + /// The number of threads which are blocked in user code. + /// This doesn't include threads blocked by this module. + blocked_threads: usize, +} + +impl SleepData { + /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler + #[inline] + pub fn deadlock_check(&self, deadlock_handler: &Option>) { + if self.active_threads == 0 && self.blocked_threads > 0 { + (deadlock_handler.as_ref().unwrap())(); + } + } +} + pub struct Sleep { state: AtomicUsize, - data: Mutex<()>, + data: Mutex, tickle: Condvar, } @@ -20,14 +44,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32; const ROUNDS_UNTIL_ASLEEP: usize = 64; impl Sleep { - pub fn new() -> Sleep { + pub fn new(worker_count: usize) -> Sleep { Sleep { state: AtomicUsize::new(AWAKE), - data: Mutex::new(()), + data: Mutex::new(SleepData { + worker_count, + active_threads: worker_count, + blocked_threads: 0, + }), tickle: Condvar::new(), } } + /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler + /// if no other worker thread is active + #[inline] + pub fn mark_blocked(&self, deadlock_handler: &Option>) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads > 0); + debug_assert!(data.blocked_threads < data.worker_count); + debug_assert!(data.active_threads > 0); + data.active_threads -= 1; + data.blocked_threads += 1; + + data.deadlock_check(deadlock_handler); + } + + /// Mark a previously blocked Rayon worker thread as unblocked + #[inline] + pub fn mark_unblocked(&self) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads < data.worker_count); + debug_assert!(data.blocked_threads > 0); + data.active_threads += 1; + data.blocked_threads -= 1; + } + fn anyone_sleeping(&self, state: usize) -> bool { state & SLEEPING != 0 } @@ -61,7 +113,7 @@ impl Sleep { } #[inline] - pub fn no_work_found(&self, worker_index: usize, yields: usize) -> usize { + pub fn no_work_found(&self, worker_index: usize, yields: usize, deadlock_handler: &Option>) -> usize { log!(DidNotFindWork { worker: worker_index, yields: yields, @@ -88,7 +140,7 @@ impl Sleep { } } else { debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP); - self.sleep(worker_index); + self.sleep(worker_index, deadlock_handler); 0 } } @@ -122,7 +174,10 @@ impl Sleep { old_state: old_state, }); if self.anyone_sleeping(old_state) { - let _data = self.data.lock().unwrap(); + let mut data = self.data.lock().unwrap(); + // Set the active threads to the number of workers, + // excluding threads blocked by the user since we won't wake those up + data.active_threads = data.worker_count - data.blocked_threads; self.tickle.notify_all(); } } @@ -188,7 +243,7 @@ impl Sleep { self.worker_is_sleepy(state, worker_index) } - fn sleep(&self, worker_index: usize) { + fn sleep(&self, worker_index: usize, deadlock_handler: &Option>) { loop { // Acquire here suffices. If we observe that the current worker is still // sleepy, then in fact we know that no writes have occurred, and anyhow @@ -235,7 +290,7 @@ impl Sleep { // reason for the `compare_exchange` to fail is if an // awaken comes, in which case the next cycle around // the loop will just return. - let data = self.data.lock().unwrap(); + let mut data = self.data.lock().unwrap(); // This must be SeqCst on success because we want to // ensure: @@ -264,6 +319,11 @@ impl Sleep { log!(FellAsleep { worker: worker_index }); + + // Decrement the number of active threads and check for a deadlock + data.active_threads -= 1; + data.deadlock_check(deadlock_handler); + let _ = self.tickle.wait(data).unwrap(); log!(GotAwoken { worker: worker_index From 86060c2a978b6828901068ebcf2d57aad3d462fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Tue, 18 Dec 2018 06:55:27 +0100 Subject: [PATCH 06/13] Add callbacks for when threads start and stop doing work --- rayon-core/src/lib.rs | 48 ++++++++++++++++++++++++++++++++++++- rayon-core/src/registry.rs | 32 +++++++++++++++++++++---- rayon-core/src/sleep/mod.rs | 21 ++++++++++++---- 3 files changed, 92 insertions(+), 9 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index ea39ce35a..f47fc83f9 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -152,6 +152,12 @@ pub struct ThreadPoolBuilder { /// Closure invoked on worker thread start. main_handler: Option>, + /// Closure invoked when starting computations in a thread. + acquire_thread_handler: Option>, + + /// Closure invoked when blocking in a thread. + release_thread_handler: Option>, + /// If false, worker threads will execute spawned jobs in a /// "depth-first" fashion. If true, they will do a "breadth-first" /// fashion. Depth-first is the default. @@ -190,6 +196,14 @@ type ExitHandler = Fn(usize) + Send + Sync; /// Note that this same closure may be invoked multiple times in parallel. type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync; +/// The type for a closure that gets invoked before starting computations in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type AcquireThreadHandler = Fn() + Send + Sync; + +/// The type for a closure that gets invoked before blocking in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type ReleaseThreadHandler = Fn() + Send + Sync; + impl ThreadPoolBuilder { /// Creates and returns a valid rayon thread pool builder, but does not initialize it. pub fn new() -> ThreadPoolBuilder { @@ -363,6 +377,32 @@ impl ThreadPoolBuilder { self.breadth_first } + /// Takes the current acquire thread callback, leaving `None`. + fn take_acquire_thread_handler(&mut self) -> Option> { + self.acquire_thread_handler.take() + } + + /// Set a callback to be invoked when starting computations in a thread. + pub fn acquire_thread_handler(mut self, acquire_thread_handler: H) -> ThreadPoolBuilder + where H: Fn() + Send + Sync + 'static + { + self.acquire_thread_handler = Some(Box::new(acquire_thread_handler)); + self + } + + /// Takes the current release thread callback, leaving `None`. + fn take_release_thread_handler(&mut self) -> Option> { + self.release_thread_handler.take() + } + + /// Set a callback to be invoked when blocking in thread. + pub fn release_thread_handler(mut self, release_thread_handler: H) -> ThreadPoolBuilder + where H: Fn() + Send + Sync + 'static + { + self.release_thread_handler = Some(Box::new(release_thread_handler)); + self + } + /// Takes the current deadlock callback, leaving `None`. fn take_deadlock_handler(&mut self) -> Option> { self.deadlock_handler.take() @@ -546,10 +586,12 @@ impl fmt::Debug for ThreadPoolBuilder { ref get_thread_name, ref panic_handler, ref stack_size, - ref deadlock_handler, + ref deadlock_handler, ref start_handler, ref main_handler, ref exit_handler, + ref acquire_thread_handler, + ref release_thread_handler, ref breadth_first, } = *self; @@ -567,6 +609,8 @@ impl fmt::Debug for ThreadPoolBuilder { let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder); + let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder); + let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder); f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) @@ -577,6 +621,8 @@ impl fmt::Debug for ThreadPoolBuilder { .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) .field("main_handler", &main_handler) + .field("acquire_thread_handler", &acquire_thread_handler) + .field("release_thread_handler", &release_thread_handler) .field("breadth_first", &breadth_first) .finish() } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index d8cb8cf74..8d5d4e151 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -18,8 +18,8 @@ use std::thread; use std::usize; use unwind; use util::leak; -use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler, - MainHandler, ThreadPoolBuildError, ThreadPoolBuilder}; +use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler, AcquireThreadHandler, + MainHandler, ThreadPoolBuildError, ThreadPoolBuilder, ReleaseThreadHandler}; pub struct Registry { thread_infos: Vec, @@ -27,10 +27,12 @@ pub struct Registry { sleep: Sleep, job_uninjector: Stealer, panic_handler: Option>, - deadlock_handler: Option>, + pub(crate) deadlock_handler: Option>, start_handler: Option>, exit_handler: Option>, main_handler: Option>, + pub(crate) acquire_thread_handler: Option>, + pub(crate) release_thread_handler: Option>, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: @@ -123,6 +125,8 @@ impl Registry { start_handler: builder.take_start_handler(), main_handler: builder.take_main_handler(), exit_handler: builder.take_exit_handler(), + acquire_thread_handler: builder.take_acquire_thread_handler(), + release_thread_handler: builder.take_release_thread_handler(), }); // If we return early or panic, make sure to terminate existing threads. @@ -222,9 +226,23 @@ impl Registry { /// Waits for the worker threads to stop. This is used for testing /// -- so we can check that termination actually works. pub(crate) fn wait_until_stopped(&self) { + self.release_thread(); for info in &self.thread_infos { info.stopped.wait(); } + self.acquire_thread(); + } + + pub(crate) fn acquire_thread(&self) { + if let Some(ref acquire_thread_handler) = self.acquire_thread_handler { + acquire_thread_handler(); + } + } + + pub(crate) fn release_thread(&self) { + if let Some(ref release_thread_handler) = self.release_thread_handler { + release_thread_handler(); + } } /// //////////////////////////////////////////////////////////////////////// @@ -375,7 +393,9 @@ impl Registry { op(&*worker_thread, true) }, LockLatch::new()); self.inject(&[job.as_job_ref()]); + self.release_thread(); job.latch.wait(); + self.acquire_thread(); job.into_result() } @@ -614,7 +634,7 @@ impl WorkerThread { yields = self.registry.sleep.no_work_found( self.index, yields, - &self.registry.deadlock_handler + &self.registry ); } } @@ -716,6 +736,8 @@ unsafe fn main_loop( worker_thread.wait_until(®istry.terminate_latch); }; + registry.acquire_thread(); + if let Some(ref handler) = registry.main_handler { match unwind::halt_unwinding(|| handler(index, &mut work)) { Ok(()) => { @@ -748,6 +770,8 @@ unsafe fn main_loop( } // We're already exiting the thread, there's nothing else to do. } + + registry.release_thread(); } /// If already in a worker-thread, just execute `op`. Otherwise, diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 5853b8a40..4fda9bd8c 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -3,6 +3,7 @@ use DeadlockHandler; use log::Event::*; +use registry::Registry; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Condvar, Mutex}; use std::thread; @@ -113,7 +114,12 @@ impl Sleep { } #[inline] - pub fn no_work_found(&self, worker_index: usize, yields: usize, deadlock_handler: &Option>) -> usize { + pub fn no_work_found( + &self, + worker_index: usize, + yields: usize, + registry: &Registry, + ) -> usize { log!(DidNotFindWork { worker: worker_index, yields: yields, @@ -140,7 +146,7 @@ impl Sleep { } } else { debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP); - self.sleep(worker_index, deadlock_handler); + self.sleep(worker_index, registry); 0 } } @@ -243,7 +249,11 @@ impl Sleep { self.worker_is_sleepy(state, worker_index) } - fn sleep(&self, worker_index: usize, deadlock_handler: &Option>) { + fn sleep( + &self, + worker_index: usize, + registry: &Registry, + ) { loop { // Acquire here suffices. If we observe that the current worker is still // sleepy, then in fact we know that no writes have occurred, and anyhow @@ -322,12 +332,15 @@ impl Sleep { // Decrement the number of active threads and check for a deadlock data.active_threads -= 1; - data.deadlock_check(deadlock_handler); + data.deadlock_check(®istry.deadlock_handler); + + registry.release_thread(); let _ = self.tickle.wait(data).unwrap(); log!(GotAwoken { worker: worker_index }); + registry.acquire_thread(); return; } } else { From 51ee2fa7780573c4ad54a30da0bf88e16c75df14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Wed, 25 Apr 2018 23:34:16 +0200 Subject: [PATCH 07/13] Rename crates, update version and make clear this is not the real rayon crate --- Cargo.toml | 14 ++--- README.md | 126 +-------------------------------------- rayon-core/Cargo.toml | 11 ++-- rayon-core/README.md | 2 + rayon-demo/Cargo.toml | 2 +- rayon-futures/Cargo.toml | 3 +- src/lib.rs | 2 +- 7 files changed, 21 insertions(+), 139 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3bf64e859..1d8fc0e0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,13 @@ [package] -name = "rayon" +name = "rustc-rayon" # Reminder to update html_rool_url in lib.rs when updating version -version = "1.0.3" +version = "0.1.2" authors = ["Niko Matsakis ", "Josh Stone "] -description = "Simple work-stealing parallelism for Rust" +description = "Simple work-stealing parallelism for Rust - fork for rustc" license = "Apache-2.0/MIT" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon/" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] @@ -15,11 +15,11 @@ build = "build.rs" exclude = ["/ci/*", "/scripts/*", "/.travis.yml", "/appveyor.yml", "/bors.toml"] [workspace] -members = ["rayon-demo", "rayon-core", "rayon-futures"] +members = ["rayon-core"] exclude = ["ci"] [dependencies] -rayon-core = { version = "1.4", path = "rayon-core" } +rustc-rayon-core = { version = "0.1", path = "rayon-core" } crossbeam-deque = "0.2.0" # This is a public dependency! diff --git a/README.md b/README.md index 9d820d914..6a0d54651 100644 --- a/README.md +++ b/README.md @@ -1,130 +1,10 @@ -# Rayon +# rustc-rayon -[![Rayon crate](https://img.shields.io/crates/v/rayon.svg)](https://crates.io/crates/rayon) -[![Rayon documentation](https://docs.rs/rayon/badge.svg)](https://docs.rs/rayon) -[![Travis Status](https://travis-ci.org/rayon-rs/rayon.svg?branch=master)](https://travis-ci.org/rayon-rs/rayon) -[![Appveyor status](https://ci.appveyor.com/api/projects/status/wre5dkx08gayy8hc/branch/master?svg=true)](https://ci.appveyor.com/project/cuviper/rayon/branch/master) -[![Join the chat at https://gitter.im/rayon-rs/Lobby](https://badges.gitter.im/rayon-rs/Lobby.svg)](https://gitter.im/rayon-rs/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) - -Rayon is a data-parallelism library for Rust. It is extremely -lightweight and makes it easy to convert a sequential computation into -a parallel one. It also guarantees data-race freedom. (You may also -enjoy [this blog post][blog] about Rayon, which gives more background -and details about how it works, or [this video][video], from the Rust -Belt Rust conference.) Rayon is -[available on crates.io](https://crates.io/crates/rayon), and -[API Documentation is available on docs.rs](https://docs.rs/rayon/). - -[blog]: http://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/ -[video]: https://www.youtube.com/watch?v=gof_OEv71Aw - -## Parallel iterators and more - -Rayon makes it drop-dead simple to convert sequential iterators into -parallel ones: usually, you just change your `foo.iter()` call into -`foo.par_iter()`, and Rayon does the rest: - -```rust -use rayon::prelude::*; -fn sum_of_squares(input: &[i32]) -> i32 { - input.par_iter() // <-- just change that! - .map(|&i| i * i) - .sum() -} -``` - -[Parallel iterators] take care of deciding how to divide your data -into tasks; it will dynamically adapt for maximum performance. If you -need more flexibility than that, Rayon also offers the [join] and -[scope] functions, which let you create parallel tasks on your own. -For even more control, you can create [custom threadpools] rather than -using Rayon's default, global threadpool. - -[Parallel iterators]: https://docs.rs/rayon/*/rayon/iter/index.html -[join]: https://docs.rs/rayon/*/rayon/fn.join.html -[scope]: https://docs.rs/rayon/*/rayon/fn.scope.html -[custom threadpools]: https://docs.rs/rayon/*/rayon/struct.ThreadPool.html - -## No data races - -You may have heard that parallel execution can produce all kinds of -crazy bugs. Well, rest easy. Rayon's APIs all guarantee **data-race -freedom**, which generally rules out most parallel bugs (though not -all). In other words, **if your code compiles**, it typically does the -same thing it did before. - -For the most, parallel iterators in particular are guaranteed to -produce the same results as their sequential counterparts. One caveat: -If your iterator has side effects (for example, sending methods to -other threads through a [Rust channel] or writing to disk), those side -effects may occur in a different order. Note also that, in some cases, -parallel iterators offer alternative versions of the sequential -iterator methods that can have higher performance. - -[Rust channel]: https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html - -## Using Rayon - -[Rayon is available on crates.io](https://crates.io/crates/rayon). The -recommended way to use it is to add a line into your Cargo.toml such -as: - -```toml -[dependencies] -rayon = "1.0" -``` - -and then add the following to to your `lib.rs`: - -```rust -extern crate rayon; -``` - -To use the Parallel Iterator APIs, a number of traits have to be in -scope. The easiest way to bring those things into scope is to use the -[Rayon prelude](https://docs.rs/rayon/*/rayon/prelude/index.html). In -each module where you would like to use the parallel iterator APIs, -just add: - -```rust -use rayon::prelude::*; -``` - -Rayon currently requires `rustc 1.13.0` or greater. - -## Contribution - -Rayon is an open source project! If you'd like to contribute to Rayon, check out [the list of "help wanted" issues](https://github.com/rayon-rs/rayon/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22). These are all (or should be) issues that are suitable for getting started, and they generally include a detailed set of instructions for what to do. Please ask questions if anything is unclear! Also, check out the [Guide to Development](https://github.com/rayon-rs/rayon/wiki/Guide-to-Development) page on the wiki. Note that all code submitted in PRs to Rayon is assumed to [be licensed under Rayon's dual MIT/Apache2 licensing](https://github.com/rayon-rs/rayon/blob/master/README.md#license). - -## Quick demo - -To see Rayon in action, check out the `rayon-demo` directory, which -includes a number of demos of code using Rayon. For example, run this -command to get a visualization of an nbody simulation. To see the -effect of using Rayon, press `s` to run sequentially and `p` to run in -parallel. - -``` -> cd rayon-demo -> cargo run --release -- nbody visualize -``` - -For more information on demos, try: - -``` -> cd rayon-demo -> cargo run --release -- --help -``` - -## Other questions? - -See [the Rayon FAQ][faq]. - -[faq]: https://github.com/rayon-rs/rayon/blob/master/FAQ.md +rustc-rayon is a fork of [the Rayon crate](https://github.com/rayon-rs/rayon/). It adds a few "in progress" features that rustc is using, mostly around deadlock detection. These features are not stable and should not be used by others -- though they may find their way into rayon proper at some point. In general, if you are not rustc, you should be using the real rayon crate, not rustc-rayon. =) ## License -Rayon is distributed under the terms of both the MIT license and the +rustc-rayon is a fork of rayon. rayon is distributed under the terms of both the MIT license and the Apache License (Version 2.0). See [LICENSE-APACHE](LICENSE-APACHE) and [LICENSE-MIT](LICENSE-MIT) for details. Opening a pull requests is assumed to signal agreement with these licensing terms. diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 209ffe7ef..3b4be430b 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,13 +1,12 @@ [package] -name = "rayon-core" -version = "1.4.1" # reminder to update html_root_url attribute +name = "rustc-rayon-core" +version = "0.1.2" # reminder to update html_root_url attribute authors = ["Niko Matsakis ", "Josh Stone "] -description = "Core APIs for Rayon" +description = "Core APIs for Rayon - fork for rustc" license = "Apache-2.0/MIT" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" -links = "rayon-core" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon-core/" build = "build.rs" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] diff --git a/rayon-core/README.md b/rayon-core/README.md index 8267d954a..0d709b8c9 100644 --- a/rayon-core/README.md +++ b/rayon-core/README.md @@ -1,3 +1,5 @@ +Note: This is an unstable fork made for use in rustc + Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool. Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml index 897b5c8a6..15a7697e0 100644 --- a/rayon-demo/Cargo.toml +++ b/rayon-demo/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Niko Matsakis "] publish = false [dependencies] -rayon = { path = "../" } +rustc-rayon = { path = "../" } cgmath = "0.16" docopt = "1" fixedbitset = "0.1.5" diff --git a/rayon-futures/Cargo.toml b/rayon-futures/Cargo.toml index 4522b2e0b..821f1a623 100644 --- a/rayon-futures/Cargo.toml +++ b/rayon-futures/Cargo.toml @@ -10,7 +10,8 @@ documentation = "https://docs.rs/rayon-futures/" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] +publish = false [dependencies] -rayon-core = { version = "1.3", path = "../rayon-core" } +rustc-rayon-core = { version = "0.1", path = "../rayon-core" } futures = "0.1.16" diff --git a/src/lib.rs b/src/lib.rs index 1644ff8ed..e41ae0128 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ extern crate crossbeam_deque; extern crate either; -extern crate rayon_core; +extern crate rustc_rayon_core as rayon_core; #[cfg(test)] extern crate rand; From 99e87af4e584d56a2e51581cd7014d947beb1c85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Wed, 2 Jan 2019 14:35:12 +0100 Subject: [PATCH 08/13] Add a way to create scopes with a single lifetime --- rayon-core/src/lib.rs | 2 +- rayon-core/src/scope/mod.rs | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index f47fc83f9..ab8529354 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -62,7 +62,7 @@ pub mod tlv; #[cfg(rayon_unstable)] pub mod internal; pub use join::{join, join_context}; -pub use scope::{scope, Scope}; +pub use scope::{scope, Scope, ScopeBuilder}; pub use registry::{Registry, mark_blocked, mark_unblocked}; pub use spawn::spawn; pub use worker_local::WorkerLocal; diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 47ced9f91..a2779a374 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -21,6 +21,40 @@ mod internal; #[cfg(test)] mod test; +pub struct ScopeBuilder<'scope> { + scope: Option>, +} + +impl<'scope> ScopeBuilder<'scope> { + pub fn new() -> Self { + Self { + scope: None, + } + } + + pub fn scope(&'scope mut self, op: OP) -> R + where + OP: FnOnce(&'scope Scope<'scope>) -> R + 'scope + Send, + R: Send, + { + in_worker(move |owner_thread, _| { + unsafe { + self.scope = Some(Scope { + owner_thread_index: owner_thread.index(), + registry: owner_thread.registry().clone(), + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: CountLatch::new(), + marker: PhantomData, + }); + let scope = self.scope.as_ref().unwrap(); + let result = scope.execute_job_closure(move |_| op(scope)); + scope.steal_till_jobs_complete(owner_thread); + result.unwrap() // only None if `op` panicked, and that would have been propagated + } + }) + } +} + ///Represents a fork-join scope which can be used to spawn any number of tasks. See [`scope()`] for more information. /// ///[`scope()`]: fn.scope.html @@ -44,7 +78,7 @@ pub struct Scope<'scope> { /// all of which outlive `'scope`. They're not actually required to be /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. - marker: PhantomData) + Send + Sync + 'scope>>, + marker: PhantomData<&'scope ()>, } /// Create a "fork-join" scope `s` and invokes the closure with a From fdc63f85dfbe12849995ca11daf581d5357ae6f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Fri, 12 Apr 2019 08:36:09 +0200 Subject: [PATCH 09/13] Remove Send bounds from ScopeBuilder --- rayon-core/src/scope/mod.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index a2779a374..6cb7ec601 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -34,24 +34,23 @@ impl<'scope> ScopeBuilder<'scope> { pub fn scope(&'scope mut self, op: OP) -> R where - OP: FnOnce(&'scope Scope<'scope>) -> R + 'scope + Send, - R: Send, + OP: FnOnce(&'scope Scope<'scope>) -> R + 'scope, { - in_worker(move |owner_thread, _| { - unsafe { - self.scope = Some(Scope { - owner_thread_index: owner_thread.index(), - registry: owner_thread.registry().clone(), - panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: CountLatch::new(), - marker: PhantomData, - }); - let scope = self.scope.as_ref().unwrap(); - let result = scope.execute_job_closure(move |_| op(scope)); + unsafe { + self.scope = Some(Scope { + owner_thread_index: 0, + registry: Registry::current(), + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: CountLatch::new(), + marker: PhantomData, + }); + let scope = self.scope.as_ref().unwrap(); + let result = scope.execute_job_closure(move |_| op(scope)); + in_worker(move |owner_thread, _| { scope.steal_till_jobs_complete(owner_thread); - result.unwrap() // only None if `op` panicked, and that would have been propagated - } - }) + }); + result.unwrap() // only None if `op` panicked, and that would have been propagated + } } } From 42f3f9e760297781aff3b18ce4bdf7d5c10527e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Sat, 13 Apr 2019 09:53:03 +0200 Subject: [PATCH 10/13] Fix bug with TLV and scopes --- rayon-core/Cargo.toml | 4 ---- rayon-core/src/job.rs | 11 ++--------- rayon-core/src/lib.rs | 1 - rayon-core/src/scope/mod.rs | 11 ++++++++++- rayon-core/src/spawn/mod.rs | 2 +- 5 files changed, 13 insertions(+), 16 deletions(-) diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 3b4be430b..3c23d6970 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -12,10 +12,6 @@ readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] -[features] -default = ["tlv"] -tlv = [] - [dependencies] num_cpus = "1.2" libc = "0.2.16" diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 3acce305c..260bdb865 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -3,7 +3,6 @@ use std::any::Any; use std::cell::UnsafeCell; use std::mem; use unwind; -#[cfg(feature = "tlv")] use tlv; pub enum JobResult { @@ -77,7 +76,6 @@ where pub latch: L, func: UnsafeCell>, result: UnsafeCell>, - #[cfg(feature = "tlv")] tlv: usize, } @@ -92,7 +90,6 @@ where latch: latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), - #[cfg(feature = "tlv")] tlv: tlv::get(), } } @@ -118,7 +115,6 @@ where { unsafe fn execute(this: *const Self) { let this = &*this; - #[cfg(feature = "tlv")] tlv::set(this.tlv); let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); @@ -142,7 +138,6 @@ where BODY: FnOnce() + Send, { job: UnsafeCell>, - #[cfg(feature = "tlv")] tlv: usize, } @@ -150,11 +145,10 @@ impl HeapJob where BODY: FnOnce() + Send, { - pub fn new(func: BODY) -> Self { + pub fn new(tlv: usize, func: BODY) -> Self { HeapJob { job: UnsafeCell::new(Some(func)), - #[cfg(feature = "tlv")] - tlv: tlv::get(), + tlv, } } @@ -173,7 +167,6 @@ where { unsafe fn execute(this: *const Self) { let this: Box = mem::transmute(this); - #[cfg(feature = "tlv")] tlv::set(this.tlv); let job = (*this.job.get()).take().unwrap(); job(); diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index ab8529354..b7d8ed758 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -56,7 +56,6 @@ mod util; mod compile_fail; mod test; -#[cfg(feature = "tlv")] pub mod tlv; #[cfg(rayon_unstable)] diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 6cb7ec601..5e35fa5bb 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -16,6 +16,7 @@ use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; use unwind; +use tlv; mod internal; #[cfg(test)] @@ -43,6 +44,7 @@ impl<'scope> ScopeBuilder<'scope> { panic: AtomicPtr::new(ptr::null_mut()), job_completed_latch: CountLatch::new(), marker: PhantomData, + tlv: tlv::get(), }); let scope = self.scope.as_ref().unwrap(); let result = scope.execute_job_closure(move |_| op(scope)); @@ -78,6 +80,9 @@ pub struct Scope<'scope> { /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. marker: PhantomData<&'scope ()>, + + /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. + tlv: usize, } /// Create a "fork-join" scope `s` and invokes the closure with a @@ -302,6 +307,7 @@ where panic: AtomicPtr::new(ptr::null_mut()), job_completed_latch: CountLatch::new(), marker: PhantomData, + tlv: tlv::get(), }; let result = scope.execute_job_closure(op); scope.steal_till_jobs_complete(owner_thread); @@ -369,7 +375,10 @@ impl<'scope> Scope<'scope> { { unsafe { self.job_completed_latch.increment(); - let job_ref = Box::new(HeapJob::new(move || self.execute_job(body))).as_job_ref(); + let job_ref = Box::new(HeapJob::new( + self.tlv, + move || self.execute_job(body), + )).as_job_ref(); // Since `Scope` implements `Sync`, we can't be sure // that we're still in a thread of this pool, so we diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index 8dcd26693..72434d7f3 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -68,7 +68,7 @@ where // executed. This ref is decremented at the (*) below. registry.increment_terminate_count(); - let async_job = Box::new(HeapJob::new({ + let async_job = Box::new(HeapJob::new(0, { let registry = registry.clone(); move || { match unwind::halt_unwinding(func) { From 12b4cab3dc34543b2509b31c910c912939f5b36f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Sat, 13 Apr 2019 10:34:02 +0200 Subject: [PATCH 11/13] Restore TLV when needed --- rayon-core/src/job.rs | 4 ++-- rayon-core/src/join/mod.rs | 17 ++++++++++++++++- rayon-core/src/registry.rs | 3 ++- rayon-core/src/scope/mod.rs | 6 ++++++ 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 260bdb865..4f71be98c 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -85,12 +85,12 @@ where F: FnOnce(bool) -> R + Send, R: Send, { - pub fn new(func: F, latch: L) -> StackJob { + pub fn new(tlv: usize, func: F, latch: L) -> StackJob { StackJob { latch: latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), - tlv: tlv::get(), + tlv, } } diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index 4498bdea3..f73da0eb2 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -4,6 +4,7 @@ use log::Event::*; use registry::{self, WorkerThread}; use std::any::Any; use unwind; +use tlv; use FnContext; @@ -120,10 +121,12 @@ where worker: worker_thread.index() }); + let tlv = tlv::get(); // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. let job_b = StackJob::new( + tlv, |migrated| oper_b(FnContext::new(migrated)), SpinLatch::new(), ); @@ -134,7 +137,7 @@ where let status_a = unwind::halt_unwinding(move || oper_a(FnContext::new(injected))); let result_a = match status_a { Ok(v) => v, - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), + Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv), }; // Now that task A has finished, try to pop job B from the @@ -151,7 +154,11 @@ where log!(PoppedRhs { worker: worker_thread.index() }); + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + let result_b = job_b.run_inline(injected); + return (result_a, result_b); } else { log!(PoppedJob { @@ -171,6 +178,9 @@ where } } + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + return (result_a, job_b.into_result()); }) } @@ -183,7 +193,12 @@ unsafe fn join_recover_from_panic( worker_thread: &WorkerThread, job_b_latch: &SpinLatch, err: Box, + tlv: usize, ) -> ! { worker_thread.wait_until(job_b_latch); + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + unwind::resume_unwinding(err) } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 8d5d4e151..e8f400f40 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -387,7 +387,7 @@ impl Registry { { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); - let job = StackJob::new(|injected| { + let job = StackJob::new(0, |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) @@ -410,6 +410,7 @@ impl Registry { debug_assert!(current_thread.registry().id() != self.id()); let latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); let job = StackJob::new( + 0, |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 5e35fa5bb..13089247e 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -51,6 +51,8 @@ impl<'scope> ScopeBuilder<'scope> { in_worker(move |owner_thread, _| { scope.steal_till_jobs_complete(owner_thread); }); + // Restore the TLV if we ran some jobs while waiting + tlv::set(scope.tlv); result.unwrap() // only None if `op` panicked, and that would have been propagated } } @@ -311,6 +313,8 @@ where }; let result = scope.execute_job_closure(op); scope.steal_till_jobs_complete(owner_thread); + // Restore the TLV if we ran some jobs while waiting + tlv::set(scope.tlv); result.unwrap() // only None if `op` panicked, and that would have been propagated } }) @@ -460,6 +464,8 @@ impl<'scope> Scope<'scope> { log!(ScopeCompletePanicked { owner_thread: owner_thread.index() }); + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); let value: Box> = mem::transmute(panic); unwind::resume_unwinding(*value); } else { From b6f36bdb56d85799115f86c097a64c8ffc0873d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Mon, 15 Apr 2019 21:28:39 +0200 Subject: [PATCH 12/13] Fix Scope lifetime variance --- rayon-core/src/scope/mod.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 13089247e..d7050ebd6 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -33,6 +33,9 @@ impl<'scope> ScopeBuilder<'scope> { } } + // FIXME: &'scope Scope<'scope> means that we can spawn scopes during `scope` and after it. + // and before it. + // FIXME: Scopes spawned outside this function must not have access to `tlv` pub fn scope(&'scope mut self, op: OP) -> R where OP: FnOnce(&'scope Scope<'scope>) -> R + 'scope, @@ -77,11 +80,9 @@ pub struct Scope<'scope> { /// latch to set when the counter drops to zero (and hence this scope is complete) job_completed_latch: CountLatch, - /// You can think of a scope as containing a list of closures to execute, - /// all of which outlive `'scope`. They're not actually required to be - /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because - /// the closures are only *moved* across threads to be executed. - marker: PhantomData<&'scope ()>, + /// This ensures 'scope is invariant, which is required for safety, + /// since otherwise you could shrink it as spawn closures which reference shorter data. + marker: PhantomData<&'scope mut &'scope ()>, /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. tlv: usize, From bcccb0209f1177c4ab1fc5903d19ffbdcbf5cb38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Mon, 22 Apr 2019 15:26:05 +0200 Subject: [PATCH 13/13] Bump version --- Cargo.toml | 2 +- rayon-core/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1d8fc0e0f..9613cceaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "rustc-rayon" # Reminder to update html_rool_url in lib.rs when updating version -version = "0.1.2" +version = "0.1.3" authors = ["Niko Matsakis ", "Josh Stone "] description = "Simple work-stealing parallelism for Rust - fork for rustc" diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 3c23d6970..7b90c3388 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustc-rayon-core" -version = "0.1.2" # reminder to update html_root_url attribute +version = "0.1.3" # reminder to update html_root_url attribute authors = ["Niko Matsakis ", "Josh Stone "] description = "Core APIs for Rayon - fork for rustc"