diff --git a/Cargo.toml b/Cargo.toml index 3bf64e859..9613cceaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,13 @@ [package] -name = "rayon" +name = "rustc-rayon" # Reminder to update html_rool_url in lib.rs when updating version -version = "1.0.3" +version = "0.1.3" authors = ["Niko Matsakis ", "Josh Stone "] -description = "Simple work-stealing parallelism for Rust" +description = "Simple work-stealing parallelism for Rust - fork for rustc" license = "Apache-2.0/MIT" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon/" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] @@ -15,11 +15,11 @@ build = "build.rs" exclude = ["/ci/*", "/scripts/*", "/.travis.yml", "/appveyor.yml", "/bors.toml"] [workspace] -members = ["rayon-demo", "rayon-core", "rayon-futures"] +members = ["rayon-core"] exclude = ["ci"] [dependencies] -rayon-core = { version = "1.4", path = "rayon-core" } +rustc-rayon-core = { version = "0.1", path = "rayon-core" } crossbeam-deque = "0.2.0" # This is a public dependency! diff --git a/README.md b/README.md index 9d820d914..6a0d54651 100644 --- a/README.md +++ b/README.md @@ -1,130 +1,10 @@ -# Rayon +# rustc-rayon -[![Rayon crate](https://img.shields.io/crates/v/rayon.svg)](https://crates.io/crates/rayon) -[![Rayon documentation](https://docs.rs/rayon/badge.svg)](https://docs.rs/rayon) -[![Travis Status](https://travis-ci.org/rayon-rs/rayon.svg?branch=master)](https://travis-ci.org/rayon-rs/rayon) -[![Appveyor status](https://ci.appveyor.com/api/projects/status/wre5dkx08gayy8hc/branch/master?svg=true)](https://ci.appveyor.com/project/cuviper/rayon/branch/master) -[![Join the chat at https://gitter.im/rayon-rs/Lobby](https://badges.gitter.im/rayon-rs/Lobby.svg)](https://gitter.im/rayon-rs/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) - -Rayon is a data-parallelism library for Rust. It is extremely -lightweight and makes it easy to convert a sequential computation into -a parallel one. It also guarantees data-race freedom. (You may also -enjoy [this blog post][blog] about Rayon, which gives more background -and details about how it works, or [this video][video], from the Rust -Belt Rust conference.) Rayon is -[available on crates.io](https://crates.io/crates/rayon), and -[API Documentation is available on docs.rs](https://docs.rs/rayon/). - -[blog]: http://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/ -[video]: https://www.youtube.com/watch?v=gof_OEv71Aw - -## Parallel iterators and more - -Rayon makes it drop-dead simple to convert sequential iterators into -parallel ones: usually, you just change your `foo.iter()` call into -`foo.par_iter()`, and Rayon does the rest: - -```rust -use rayon::prelude::*; -fn sum_of_squares(input: &[i32]) -> i32 { - input.par_iter() // <-- just change that! - .map(|&i| i * i) - .sum() -} -``` - -[Parallel iterators] take care of deciding how to divide your data -into tasks; it will dynamically adapt for maximum performance. If you -need more flexibility than that, Rayon also offers the [join] and -[scope] functions, which let you create parallel tasks on your own. -For even more control, you can create [custom threadpools] rather than -using Rayon's default, global threadpool. - -[Parallel iterators]: https://docs.rs/rayon/*/rayon/iter/index.html -[join]: https://docs.rs/rayon/*/rayon/fn.join.html -[scope]: https://docs.rs/rayon/*/rayon/fn.scope.html -[custom threadpools]: https://docs.rs/rayon/*/rayon/struct.ThreadPool.html - -## No data races - -You may have heard that parallel execution can produce all kinds of -crazy bugs. Well, rest easy. Rayon's APIs all guarantee **data-race -freedom**, which generally rules out most parallel bugs (though not -all). In other words, **if your code compiles**, it typically does the -same thing it did before. - -For the most, parallel iterators in particular are guaranteed to -produce the same results as their sequential counterparts. One caveat: -If your iterator has side effects (for example, sending methods to -other threads through a [Rust channel] or writing to disk), those side -effects may occur in a different order. Note also that, in some cases, -parallel iterators offer alternative versions of the sequential -iterator methods that can have higher performance. - -[Rust channel]: https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html - -## Using Rayon - -[Rayon is available on crates.io](https://crates.io/crates/rayon). The -recommended way to use it is to add a line into your Cargo.toml such -as: - -```toml -[dependencies] -rayon = "1.0" -``` - -and then add the following to to your `lib.rs`: - -```rust -extern crate rayon; -``` - -To use the Parallel Iterator APIs, a number of traits have to be in -scope. The easiest way to bring those things into scope is to use the -[Rayon prelude](https://docs.rs/rayon/*/rayon/prelude/index.html). In -each module where you would like to use the parallel iterator APIs, -just add: - -```rust -use rayon::prelude::*; -``` - -Rayon currently requires `rustc 1.13.0` or greater. - -## Contribution - -Rayon is an open source project! If you'd like to contribute to Rayon, check out [the list of "help wanted" issues](https://github.com/rayon-rs/rayon/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22). These are all (or should be) issues that are suitable for getting started, and they generally include a detailed set of instructions for what to do. Please ask questions if anything is unclear! Also, check out the [Guide to Development](https://github.com/rayon-rs/rayon/wiki/Guide-to-Development) page on the wiki. Note that all code submitted in PRs to Rayon is assumed to [be licensed under Rayon's dual MIT/Apache2 licensing](https://github.com/rayon-rs/rayon/blob/master/README.md#license). - -## Quick demo - -To see Rayon in action, check out the `rayon-demo` directory, which -includes a number of demos of code using Rayon. For example, run this -command to get a visualization of an nbody simulation. To see the -effect of using Rayon, press `s` to run sequentially and `p` to run in -parallel. - -``` -> cd rayon-demo -> cargo run --release -- nbody visualize -``` - -For more information on demos, try: - -``` -> cd rayon-demo -> cargo run --release -- --help -``` - -## Other questions? - -See [the Rayon FAQ][faq]. - -[faq]: https://github.com/rayon-rs/rayon/blob/master/FAQ.md +rustc-rayon is a fork of [the Rayon crate](https://github.com/rayon-rs/rayon/). It adds a few "in progress" features that rustc is using, mostly around deadlock detection. These features are not stable and should not be used by others -- though they may find their way into rayon proper at some point. In general, if you are not rustc, you should be using the real rayon crate, not rustc-rayon. =) ## License -Rayon is distributed under the terms of both the MIT license and the +rustc-rayon is a fork of rayon. rayon is distributed under the terms of both the MIT license and the Apache License (Version 2.0). See [LICENSE-APACHE](LICENSE-APACHE) and [LICENSE-MIT](LICENSE-MIT) for details. Opening a pull requests is assumed to signal agreement with these licensing terms. diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 14230aa2b..7b90c3388 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,13 +1,12 @@ [package] -name = "rayon-core" -version = "1.4.1" # reminder to update html_root_url attribute +name = "rustc-rayon-core" +version = "0.1.3" # reminder to update html_root_url attribute authors = ["Niko Matsakis ", "Josh Stone "] -description = "Core APIs for Rayon" +description = "Core APIs for Rayon - fork for rustc" license = "Apache-2.0/MIT" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" -links = "rayon-core" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon-core/" build = "build.rs" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] diff --git a/rayon-core/README.md b/rayon-core/README.md index 8267d954a..0d709b8c9 100644 --- a/rayon-core/README.md +++ b/rayon-core/README.md @@ -1,3 +1,5 @@ +Note: This is an unstable fork made for use in rustc + Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool. Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 8ddc3371c..4f71be98c 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -3,6 +3,7 @@ use std::any::Any; use std::cell::UnsafeCell; use std::mem; use unwind; +use tlv; pub enum JobResult { None, @@ -75,6 +76,7 @@ where pub latch: L, func: UnsafeCell>, result: UnsafeCell>, + tlv: usize, } impl StackJob @@ -83,11 +85,12 @@ where F: FnOnce(bool) -> R + Send, R: Send, { - pub fn new(func: F, latch: L) -> StackJob { + pub fn new(tlv: usize, func: F, latch: L) -> StackJob { StackJob { latch: latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), + tlv, } } @@ -112,6 +115,7 @@ where { unsafe fn execute(this: *const Self) { let this = &*this; + tlv::set(this.tlv); let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = match unwind::halt_unwinding(|| func(true)) { @@ -134,15 +138,17 @@ where BODY: FnOnce() + Send, { job: UnsafeCell>, + tlv: usize, } impl HeapJob where BODY: FnOnce() + Send, { - pub fn new(func: BODY) -> Self { + pub fn new(tlv: usize, func: BODY) -> Self { HeapJob { job: UnsafeCell::new(Some(func)), + tlv, } } @@ -161,6 +167,7 @@ where { unsafe fn execute(this: *const Self) { let this: Box = mem::transmute(this); + tlv::set(this.tlv); let job = (*this.job.get()).take().unwrap(); job(); } diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index 4498bdea3..f73da0eb2 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -4,6 +4,7 @@ use log::Event::*; use registry::{self, WorkerThread}; use std::any::Any; use unwind; +use tlv; use FnContext; @@ -120,10 +121,12 @@ where worker: worker_thread.index() }); + let tlv = tlv::get(); // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. let job_b = StackJob::new( + tlv, |migrated| oper_b(FnContext::new(migrated)), SpinLatch::new(), ); @@ -134,7 +137,7 @@ where let status_a = unwind::halt_unwinding(move || oper_a(FnContext::new(injected))); let result_a = match status_a { Ok(v) => v, - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), + Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv), }; // Now that task A has finished, try to pop job B from the @@ -151,7 +154,11 @@ where log!(PoppedRhs { worker: worker_thread.index() }); + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + let result_b = job_b.run_inline(injected); + return (result_a, result_b); } else { log!(PoppedJob { @@ -171,6 +178,9 @@ where } } + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + return (result_a, job_b.into_result()); }) } @@ -183,7 +193,12 @@ unsafe fn join_recover_from_panic( worker_thread: &WorkerThread, job_b_latch: &SpinLatch, err: Box, + tlv: usize, ) -> ! { worker_thread.wait_until(job_b_latch); + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + unwind::resume_unwinding(err) } diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 3c713080d..b7d8ed758 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -20,8 +20,6 @@ //! succeed. #![doc(html_root_url = "https://docs.rs/rayon-core/1.4")] -#![deny(missing_debug_implementations)] -#![deny(missing_docs)] use std::any::Any; use std::env; @@ -50,6 +48,7 @@ mod registry; mod scope; mod sleep; mod spawn; +mod worker_local; mod thread_pool; mod unwind; mod util; @@ -57,11 +56,16 @@ mod util; mod compile_fail; mod test; +pub mod tlv; + #[cfg(rayon_unstable)] pub mod internal; pub use join::{join, join_context}; -pub use scope::{scope, Scope}; +pub use scope::{scope, Scope, ScopeBuilder}; +pub use registry::{Registry, mark_blocked, mark_unblocked}; pub use spawn::spawn; +pub use worker_local::WorkerLocal; + pub use thread_pool::current_thread_has_pending_tasks; pub use thread_pool::current_thread_index; pub use thread_pool::ThreadPool; @@ -135,12 +139,24 @@ pub struct ThreadPoolBuilder { /// The stack size for the created worker threads stack_size: Option, + /// Closure invoked on deadlock. + deadlock_handler: Option>, + /// Closure invoked on worker thread start. start_handler: Option>, /// Closure invoked on worker thread exit. exit_handler: Option>, + /// Closure invoked on worker thread start. + main_handler: Option>, + + /// Closure invoked when starting computations in a thread. + acquire_thread_handler: Option>, + + /// Closure invoked when blocking in a thread. + release_thread_handler: Option>, + /// If false, worker threads will execute spawned jobs in a /// "depth-first" fashion. If true, they will do a "breadth-first" /// fashion. Depth-first is the default. @@ -160,6 +176,9 @@ pub struct Configuration { /// may be invoked multiple times in parallel. type PanicHandler = Fn(Box) + Send + Sync; +/// The type for a closure that gets invoked when the Rayon thread pool deadlocks +type DeadlockHandler = Fn() + Send + Sync; + /// The type for a closure that gets invoked when a thread starts. The /// closure is passed the index of the thread on which it is invoked. /// Note that this same closure may be invoked multiple times in parallel. @@ -170,6 +189,20 @@ type StartHandler = Fn(usize) + Send + Sync; /// Note that this same closure may be invoked multiple times in parallel. type ExitHandler = Fn(usize) + Send + Sync; +/// The type for a closure that gets invoked with a +/// function which runs rayon tasks. +/// The closure is passed the index of the thread on which it is invoked. +/// Note that this same closure may be invoked multiple times in parallel. +type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync; + +/// The type for a closure that gets invoked before starting computations in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type AcquireThreadHandler = Fn() + Send + Sync; + +/// The type for a closure that gets invoked before blocking in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type ReleaseThreadHandler = Fn() + Send + Sync; + impl ThreadPoolBuilder { /// Creates and returns a valid rayon thread pool builder, but does not initialize it. pub fn new() -> ThreadPoolBuilder { @@ -343,6 +376,45 @@ impl ThreadPoolBuilder { self.breadth_first } + /// Takes the current acquire thread callback, leaving `None`. + fn take_acquire_thread_handler(&mut self) -> Option> { + self.acquire_thread_handler.take() + } + + /// Set a callback to be invoked when starting computations in a thread. + pub fn acquire_thread_handler(mut self, acquire_thread_handler: H) -> ThreadPoolBuilder + where H: Fn() + Send + Sync + 'static + { + self.acquire_thread_handler = Some(Box::new(acquire_thread_handler)); + self + } + + /// Takes the current release thread callback, leaving `None`. + fn take_release_thread_handler(&mut self) -> Option> { + self.release_thread_handler.take() + } + + /// Set a callback to be invoked when blocking in thread. + pub fn release_thread_handler(mut self, release_thread_handler: H) -> ThreadPoolBuilder + where H: Fn() + Send + Sync + 'static + { + self.release_thread_handler = Some(Box::new(release_thread_handler)); + self + } + + /// Takes the current deadlock callback, leaving `None`. + fn take_deadlock_handler(&mut self) -> Option> { + self.deadlock_handler.take() + } + + /// Set a callback to be invoked on current deadlock. + pub fn deadlock_handler(mut self, deadlock_handler: H) -> ThreadPoolBuilder + where H: Fn() + Send + Sync + 'static + { + self.deadlock_handler = Some(Box::new(deadlock_handler)); + self + } + /// Takes the current thread start callback, leaving `None`. fn take_start_handler(&mut self) -> Option> { self.start_handler.take() @@ -380,6 +452,23 @@ impl ThreadPoolBuilder { self.exit_handler = Some(Box::new(exit_handler)); self } + + /// Takes the current thread main callback, leaving `None`. + fn take_main_handler(&mut self) -> Option> { + self.main_handler.take() + } + + /// Set a callback to be invoked on thread main. + /// + /// The closure is passed the index of the thread on which it is invoked. + /// Note that this same closure may be invoked multiple times in parallel. + /// If this closure panics, the panic will be passed to the panic handler. + pub fn main_handler(mut self, main_handler: H) -> ThreadPoolBuilder + where H: Fn(usize, &mut FnMut()) + Send + Sync + 'static + { + self.main_handler = Some(Box::new(main_handler)); + self + } } #[allow(deprecated)] @@ -496,8 +585,12 @@ impl fmt::Debug for ThreadPoolBuilder { ref get_thread_name, ref panic_handler, ref stack_size, + ref deadlock_handler, ref start_handler, + ref main_handler, ref exit_handler, + ref acquire_thread_handler, + ref release_thread_handler, ref breadth_first, } = *self; @@ -511,16 +604,24 @@ impl fmt::Debug for ThreadPoolBuilder { } let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); + let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder); let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); + let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder); + let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder); + let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder); f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) + .field("deadlock_handler", &deadlock_handler) .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) + .field("main_handler", &main_handler) + .field("acquire_thread_handler", &acquire_thread_handler) + .field("release_thread_handler", &release_thread_handler) .field("breadth_first", &breadth_first) .finish() } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index a233768fb..e8f400f40 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -18,7 +18,8 @@ use std::thread; use std::usize; use unwind; use util::leak; -use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder}; +use {ErrorKind, ExitHandler, PanicHandler, DeadlockHandler, StartHandler, AcquireThreadHandler, + MainHandler, ThreadPoolBuildError, ThreadPoolBuilder, ReleaseThreadHandler}; pub struct Registry { thread_infos: Vec, @@ -26,8 +27,12 @@ pub struct Registry { sleep: Sleep, job_uninjector: Stealer, panic_handler: Option>, + pub(crate) deadlock_handler: Option>, start_handler: Option>, exit_handler: Option>, + main_handler: Option>, + pub(crate) acquire_thread_handler: Option>, + pub(crate) release_thread_handler: Option>, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: @@ -112,12 +117,16 @@ impl Registry { let registry = Arc::new(Registry { thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(), state: Mutex::new(RegistryState::new(inj_worker)), - sleep: Sleep::new(), + sleep: Sleep::new(n_threads), job_uninjector: inj_stealer, terminate_latch: CountLatch::new(), panic_handler: builder.take_panic_handler(), + deadlock_handler: builder.take_deadlock_handler(), start_handler: builder.take_start_handler(), + main_handler: builder.take_main_handler(), exit_handler: builder.take_exit_handler(), + acquire_thread_handler: builder.take_acquire_thread_handler(), + release_thread_handler: builder.take_release_thread_handler(), }); // If we return early or panic, make sure to terminate existing threads. @@ -216,11 +225,24 @@ impl Registry { /// Waits for the worker threads to stop. This is used for testing /// -- so we can check that termination actually works. - #[cfg(test)] - pub fn wait_until_stopped(&self) { + pub(crate) fn wait_until_stopped(&self) { + self.release_thread(); for info in &self.thread_infos { info.stopped.wait(); } + self.acquire_thread(); + } + + pub(crate) fn acquire_thread(&self) { + if let Some(ref acquire_thread_handler) = self.acquire_thread_handler { + acquire_thread_handler(); + } + } + + pub(crate) fn release_thread(&self) { + if let Some(ref release_thread_handler) = self.release_thread_handler { + release_thread_handler(); + } } /// //////////////////////////////////////////////////////////////////////// @@ -365,16 +387,15 @@ impl Registry { { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); - let job = StackJob::new( - |injected| { - let worker_thread = WorkerThread::current(); - assert!(injected && !worker_thread.is_null()); - op(&*worker_thread, true) - }, - LockLatch::new(), - ); + let job = StackJob::new(0, |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, LockLatch::new()); self.inject(&[job.as_job_ref()]); + self.release_thread(); job.latch.wait(); + self.acquire_thread(); job.into_result() } @@ -389,6 +410,7 @@ impl Registry { debug_assert!(current_thread.registry().id() != self.id()); let latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); let job = StackJob::new( + 0, |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); @@ -434,6 +456,24 @@ impl Registry { } } +/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler +/// if no other worker thread is active +#[inline] +pub fn mark_blocked() { + let worker_thread = WorkerThread::current(); + assert!(!worker_thread.is_null()); + unsafe { + let registry = &(*worker_thread).registry; + registry.sleep.mark_blocked(®istry.deadlock_handler) + } +} + +/// Mark a previously blocked Rayon worker thread as unblocked +#[inline] +pub fn mark_unblocked(registry: &Registry) { + registry.sleep.mark_unblocked() +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct RegistryId { addr: usize, @@ -478,7 +518,7 @@ pub struct WorkerThread { /// the "worker" half of our local deque worker: Deque, - index: usize, + pub(crate) index: usize, /// are these workers configured to steal breadth-first or not? breadth_first: bool, @@ -486,7 +526,7 @@ pub struct WorkerThread { /// A weak random number generator. rng: XorShift64Star, - registry: Arc, + pub(crate) registry: Arc, } // This is a bit sketchy, but basically: the WorkerThread is @@ -592,7 +632,11 @@ impl WorkerThread { yields = self.registry.sleep.work_found(self.index, yields); self.execute(job); } else { - yields = self.registry.sleep.no_work_found(self.index, yields); + yields = self.registry.sleep.no_work_found( + self.index, + yields, + &self.registry + ); } } @@ -689,7 +733,23 @@ unsafe fn main_loop( } } - worker_thread.wait_until(®istry.terminate_latch); + let mut work = || { + worker_thread.wait_until(®istry.terminate_latch); + }; + + registry.acquire_thread(); + + if let Some(ref handler) = registry.main_handler { + match unwind::halt_unwinding(|| handler(index, &mut work)) { + Ok(()) => { + } + Err(err) => { + registry.handle_panic(err); + } + } + } else { + work(); + } // Should not be any work left in our queue. debug_assert!(worker_thread.take_local_job().is_none()); @@ -711,6 +771,8 @@ unsafe fn main_loop( } // We're already exiting the thread, there's nothing else to do. } + + registry.release_thread(); } /// If already in a worker-thread, just execute `op`. Otherwise, diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 47ced9f91..d7050ebd6 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -16,11 +16,51 @@ use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; use unwind; +use tlv; mod internal; #[cfg(test)] mod test; +pub struct ScopeBuilder<'scope> { + scope: Option>, +} + +impl<'scope> ScopeBuilder<'scope> { + pub fn new() -> Self { + Self { + scope: None, + } + } + + // FIXME: &'scope Scope<'scope> means that we can spawn scopes during `scope` and after it. + // and before it. + // FIXME: Scopes spawned outside this function must not have access to `tlv` + pub fn scope(&'scope mut self, op: OP) -> R + where + OP: FnOnce(&'scope Scope<'scope>) -> R + 'scope, + { + unsafe { + self.scope = Some(Scope { + owner_thread_index: 0, + registry: Registry::current(), + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: CountLatch::new(), + marker: PhantomData, + tlv: tlv::get(), + }); + let scope = self.scope.as_ref().unwrap(); + let result = scope.execute_job_closure(move |_| op(scope)); + in_worker(move |owner_thread, _| { + scope.steal_till_jobs_complete(owner_thread); + }); + // Restore the TLV if we ran some jobs while waiting + tlv::set(scope.tlv); + result.unwrap() // only None if `op` panicked, and that would have been propagated + } + } +} + ///Represents a fork-join scope which can be used to spawn any number of tasks. See [`scope()`] for more information. /// ///[`scope()`]: fn.scope.html @@ -40,11 +80,12 @@ pub struct Scope<'scope> { /// latch to set when the counter drops to zero (and hence this scope is complete) job_completed_latch: CountLatch, - /// You can think of a scope as containing a list of closures to execute, - /// all of which outlive `'scope`. They're not actually required to be - /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because - /// the closures are only *moved* across threads to be executed. - marker: PhantomData) + Send + Sync + 'scope>>, + /// This ensures 'scope is invariant, which is required for safety, + /// since otherwise you could shrink it as spawn closures which reference shorter data. + marker: PhantomData<&'scope mut &'scope ()>, + + /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. + tlv: usize, } /// Create a "fork-join" scope `s` and invokes the closure with a @@ -269,9 +310,12 @@ where panic: AtomicPtr::new(ptr::null_mut()), job_completed_latch: CountLatch::new(), marker: PhantomData, + tlv: tlv::get(), }; let result = scope.execute_job_closure(op); scope.steal_till_jobs_complete(owner_thread); + // Restore the TLV if we ran some jobs while waiting + tlv::set(scope.tlv); result.unwrap() // only None if `op` panicked, and that would have been propagated } }) @@ -336,7 +380,10 @@ impl<'scope> Scope<'scope> { { unsafe { self.job_completed_latch.increment(); - let job_ref = Box::new(HeapJob::new(move || self.execute_job(body))).as_job_ref(); + let job_ref = Box::new(HeapJob::new( + self.tlv, + move || self.execute_job(body), + )).as_job_ref(); // Since `Scope` implements `Sync`, we can't be sure // that we're still in a thread of this pool, so we @@ -418,6 +465,8 @@ impl<'scope> Scope<'scope> { log!(ScopeCompletePanicked { owner_thread: owner_thread.index() }); + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); let value: Box> = mem::transmute(panic); unwind::resume_unwinding(*value); } else { diff --git a/rayon-core/src/sleep/README.md b/rayon-core/src/sleep/README.md index bc2af869f..d889de5a5 100644 --- a/rayon-core/src/sleep/README.md +++ b/rayon-core/src/sleep/README.md @@ -386,3 +386,36 @@ some of them were hit hard: - 8-10% overhead on nbody-parreduce - 35% overhead on increment-all - 245% overhead on join-recursively + +# Deadlock detection + +This module tracks a number of variables in order to detect deadlocks due to user code blocking. +These variables are stored in the `SleepData` struct which itself is kept behind a mutex. +It contains the following fields: +- `worker_count` - The number of threads in the thread pool. +- `active_threads` - The number of threads in the thread pool which are running + and aren't blocked in user code or sleeping. +- `blocked_threads` - The number of threads which are blocked in user code. + This doesn't include threads blocked by Rayon. + +User code can indicate blocking by calling `mark_blocked` before blocking and +calling `mark_unblocked` before unblocking a thread. +This will adjust `active_threads` and `blocked_threads` accordingly. + +When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to +`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked +by user code. + +A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0. +If we ignored `blocked_threads` we would have a deadlock +immediately when creating the thread pool. +We would also deadlock once the thread pool ran out of work. +It is not possible for Rayon itself to deadlock. +Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks. + +We check for the deadlock condition when +threads fall asleep in `mark_unblocked` and in `Sleep::sleep`. +If there's a deadlock detected we call the user provided deadlock handler while we hold the +lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and +`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread. +Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep. \ No newline at end of file diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 7582f98f7..4fda9bd8c 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -1,15 +1,40 @@ //! Code that decides when workers should go to sleep. See README.md //! for an overview. +use DeadlockHandler; use log::Event::*; +use registry::Registry; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Condvar, Mutex}; use std::thread; use std::usize; +struct SleepData { + /// The number of threads in the thread pool. + worker_count: usize, + + /// The number of threads in the thread pool which are running and + /// aren't blocked in user code or sleeping. + active_threads: usize, + + /// The number of threads which are blocked in user code. + /// This doesn't include threads blocked by this module. + blocked_threads: usize, +} + +impl SleepData { + /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler + #[inline] + pub fn deadlock_check(&self, deadlock_handler: &Option>) { + if self.active_threads == 0 && self.blocked_threads > 0 { + (deadlock_handler.as_ref().unwrap())(); + } + } +} + pub struct Sleep { state: AtomicUsize, - data: Mutex<()>, + data: Mutex, tickle: Condvar, } @@ -20,14 +45,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32; const ROUNDS_UNTIL_ASLEEP: usize = 64; impl Sleep { - pub fn new() -> Sleep { + pub fn new(worker_count: usize) -> Sleep { Sleep { state: AtomicUsize::new(AWAKE), - data: Mutex::new(()), + data: Mutex::new(SleepData { + worker_count, + active_threads: worker_count, + blocked_threads: 0, + }), tickle: Condvar::new(), } } + /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler + /// if no other worker thread is active + #[inline] + pub fn mark_blocked(&self, deadlock_handler: &Option>) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads > 0); + debug_assert!(data.blocked_threads < data.worker_count); + debug_assert!(data.active_threads > 0); + data.active_threads -= 1; + data.blocked_threads += 1; + + data.deadlock_check(deadlock_handler); + } + + /// Mark a previously blocked Rayon worker thread as unblocked + #[inline] + pub fn mark_unblocked(&self) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads < data.worker_count); + debug_assert!(data.blocked_threads > 0); + data.active_threads += 1; + data.blocked_threads -= 1; + } + fn anyone_sleeping(&self, state: usize) -> bool { state & SLEEPING != 0 } @@ -61,7 +114,12 @@ impl Sleep { } #[inline] - pub fn no_work_found(&self, worker_index: usize, yields: usize) -> usize { + pub fn no_work_found( + &self, + worker_index: usize, + yields: usize, + registry: &Registry, + ) -> usize { log!(DidNotFindWork { worker: worker_index, yields: yields, @@ -88,7 +146,7 @@ impl Sleep { } } else { debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP); - self.sleep(worker_index); + self.sleep(worker_index, registry); 0 } } @@ -122,7 +180,10 @@ impl Sleep { old_state: old_state, }); if self.anyone_sleeping(old_state) { - let _data = self.data.lock().unwrap(); + let mut data = self.data.lock().unwrap(); + // Set the active threads to the number of workers, + // excluding threads blocked by the user since we won't wake those up + data.active_threads = data.worker_count - data.blocked_threads; self.tickle.notify_all(); } } @@ -188,7 +249,11 @@ impl Sleep { self.worker_is_sleepy(state, worker_index) } - fn sleep(&self, worker_index: usize) { + fn sleep( + &self, + worker_index: usize, + registry: &Registry, + ) { loop { // Acquire here suffices. If we observe that the current worker is still // sleepy, then in fact we know that no writes have occurred, and anyhow @@ -235,7 +300,7 @@ impl Sleep { // reason for the `compare_exchange` to fail is if an // awaken comes, in which case the next cycle around // the loop will just return. - let data = self.data.lock().unwrap(); + let mut data = self.data.lock().unwrap(); // This must be SeqCst on success because we want to // ensure: @@ -264,10 +329,18 @@ impl Sleep { log!(FellAsleep { worker: worker_index }); + + // Decrement the number of active threads and check for a deadlock + data.active_threads -= 1; + data.deadlock_check(®istry.deadlock_handler); + + registry.release_thread(); + let _ = self.tickle.wait(data).unwrap(); log!(GotAwoken { worker: worker_index }); + registry.acquire_thread(); return; } } else { diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index 8dcd26693..72434d7f3 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -68,7 +68,7 @@ where // executed. This ref is decremented at the (*) below. registry.increment_terminate_count(); - let async_job = Box::new(HeapJob::new({ + let async_job = Box::new(HeapJob::new(0, { let registry = registry.clone(); move || { match unwind::halt_unwinding(func) { diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index d13f4e5f9..690052a90 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -86,6 +86,40 @@ impl ThreadPool { &DEFAULT_THREAD_POOL } + /// Creates a scoped thread pool + pub fn scoped_pool(builder: ThreadPoolBuilder, + main_handler: H, + with_pool: F) -> Result + where F: FnOnce(&ThreadPool) -> R, + H: Fn(&mut FnMut()) + Send + Sync + { + struct Handler(*const ()); + unsafe impl Send for Handler {} + unsafe impl Sync for Handler {} + + let handler = Handler(&main_handler as *const _ as *const ()); + + let builder = builder.main_handler(move |_, worker| { + let handler = unsafe { &*(handler.0 as *const H) }; + handler(worker); + }); + + let pool = builder.build()?; + + struct JoinRegistry(Arc); + + impl Drop for JoinRegistry { + fn drop(&mut self) { + self.0.terminate(); + self.0.wait_until_stopped(); + } + } + + let _join_registry = JoinRegistry(pool.registry.clone()); + + Ok(with_pool(&pool)) + } + /// Executes `op` within the threadpool. Any attempts to use /// `join`, `scope`, or parallel iterators will then operate /// within that threadpool. diff --git a/rayon-core/src/tlv.rs b/rayon-core/src/tlv.rs new file mode 100644 index 000000000..f035d12d9 --- /dev/null +++ b/rayon-core/src/tlv.rs @@ -0,0 +1,30 @@ +//! Allows access to the Rayon's thread local value +//! which is preserved when moving jobs across threads + +use std::cell::Cell; + +thread_local!(pub(crate) static TLV: Cell = Cell::new(0)); + +/// Sets the current thread-local value to `value` inside the closure. +/// The old value is restored when the closure ends +pub fn with R, R>(value: usize, f: F) -> R { + struct Reset(usize); + impl Drop for Reset { + fn drop(&mut self) { + TLV.with(|tlv| tlv.set(self.0)); + } + } + let _reset = Reset(get()); + TLV.with(|tlv| tlv.set(value)); + f() +} + +/// Sets the current thread-local value +pub fn set(value: usize) { + TLV.with(|tlv| tlv.set(value)); +} + +/// Returns the current thread-local value +pub fn get() -> usize { + TLV.with(|tlv| tlv.get()) +} diff --git a/rayon-core/src/worker_local.rs b/rayon-core/src/worker_local.rs new file mode 100644 index 000000000..4b92bd939 --- /dev/null +++ b/rayon-core/src/worker_local.rs @@ -0,0 +1,74 @@ +use registry::{Registry, WorkerThread}; +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; + +#[repr(align(64))] +#[derive(Debug)] +struct CacheAligned(T); + +/// Holds worker-locals values for each thread in a thread pool. +/// You can only access the worker local value through the Deref impl +/// on the thread pool it was constructed on. It will panic otherwise +pub struct WorkerLocal { + locals: Vec>, + registry: Arc, +} + +unsafe impl Send for WorkerLocal {} +unsafe impl Sync for WorkerLocal {} + +impl WorkerLocal { + /// Creates a new worker local where the `initial` closure computes the + /// value this worker local should take for each thread in the thread pool. + #[inline] + pub fn new T>(mut initial: F) -> WorkerLocal { + let registry = Registry::current(); + WorkerLocal { + locals: (0..registry.num_threads()) + .map(|i| CacheAligned(initial(i))) + .collect(), + registry, + } + } + + /// Returns the worker-local value for each thread + #[inline] + pub fn into_inner(self) -> Vec { + self.locals.into_iter().map(|c| c.0).collect() + } + + fn current(&self) -> &T { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() + || &*(*worker_thread).registry as *const _ != &*self.registry as *const _ + { + panic!("WorkerLocal can only be used on the thread pool it was created on") + } + &self.locals[(*worker_thread).index].0 + } + } +} + +impl WorkerLocal> { + /// Joins the elements of all the worker locals into one Vec + pub fn join(self) -> Vec { + self.into_inner().into_iter().flat_map(|v| v).collect() + } +} + +impl fmt::Debug for WorkerLocal { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.locals, f) + } +} + +impl Deref for WorkerLocal { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &T { + self.current() + } +} diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml index 897b5c8a6..15a7697e0 100644 --- a/rayon-demo/Cargo.toml +++ b/rayon-demo/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Niko Matsakis "] publish = false [dependencies] -rayon = { path = "../" } +rustc-rayon = { path = "../" } cgmath = "0.16" docopt = "1" fixedbitset = "0.1.5" diff --git a/rayon-futures/Cargo.toml b/rayon-futures/Cargo.toml index 4522b2e0b..821f1a623 100644 --- a/rayon-futures/Cargo.toml +++ b/rayon-futures/Cargo.toml @@ -10,7 +10,8 @@ documentation = "https://docs.rs/rayon-futures/" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] +publish = false [dependencies] -rayon-core = { version = "1.3", path = "../rayon-core" } +rustc-rayon-core = { version = "0.1", path = "../rayon-core" } futures = "0.1.16" diff --git a/src/lib.rs b/src/lib.rs index 1644ff8ed..e41ae0128 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ extern crate crossbeam_deque; extern crate either; -extern crate rayon_core; +extern crate rustc_rayon_core as rayon_core; #[cfg(test)] extern crate rand;