diff --git a/Cargo.toml b/Cargo.toml index 3daa21ff8..3a3bb097e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,19 +39,24 @@ optional = true blas-sys = { version = "0.6.5", optional = true, default-features = false } matrixmultiply = { version = "0.1.13" } +rayon = { version = "0.6.0", optional = true } + [dependencies.serde] version = "0.8.20" optional = true +[dev-dependencies] +num_cpus = "1.2" + [features] blas = ["blas-sys"] # These features are used for testing blas-openblas-sys = ["blas"] -test = ["blas-openblas-sys"] +test = ["blas-openblas-sys", "rayon"] # This feature is used for docs -docs = ["rustc-serialize", "serde"] +docs = ["rustc-serialize", "serde", "rayon"] [profile.release] [profile.bench] diff --git a/README.rst b/README.rst index a771f307f..ff018ea78 100644 --- a/README.rst +++ b/README.rst @@ -69,6 +69,11 @@ your `Cargo.toml`. Uses ``blas-sys`` for pluggable backend, which needs to be configured separately. +- ``rayon`` + + - Optional, compatible with Rust stable + - Implement rayon 0.6 parallelization. + How to use with cargo:: [dependencies] diff --git a/benches/rayon.rs b/benches/rayon.rs new file mode 100644 index 000000000..f51435df2 --- /dev/null +++ b/benches/rayon.rs @@ -0,0 +1,114 @@ + +#![feature(test)] + +extern crate num_cpus; +extern crate test; +use test::Bencher; + +#[macro_use(s)] +extern crate ndarray; +use ndarray::prelude::*; + +extern crate rayon; +use rayon::prelude::*; + +const EXP_N: usize = 128; + +use std::cmp::max; + +fn set_threads() { + let n = max(1, num_cpus::get() / 2); + let cfg = rayon::Configuration::new().set_num_threads(n); + let _ = rayon::initialize(cfg); +} + +#[bench] +fn map_exp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.mapv_inplace(|x| x.exp()); + }); +} + +#[bench] +fn rayon_exp_regular(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = x.exp()); + }); +} + +const FASTEXP: usize = 800; + +#[inline] +fn fastexp(x: f64) -> f64 { + let x = 1. + x/1024.; + x.powi(1024) +} + +#[bench] +fn map_fastexp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_regular(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} + +#[bench] +fn map_fastexp_cut(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_cut(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} + +#[bench] +fn map_fastexp_by_axis(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + for mut sheet in a.axis_iter_mut(Axis(0)) { + sheet.mapv_inplace(fastexp) + } + }); +} + +#[bench] +fn rayon_fastexp_by_axis(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.axis_iter_mut(Axis(0)).into_par_iter() + .for_each(|mut sheet| sheet.mapv_inplace(fastexp)); + }); +} diff --git a/src/impl_methods.rs b/src/impl_methods.rs index 1d2171752..d571dc949 100644 --- a/src/impl_methods.rs +++ b/src/impl_methods.rs @@ -688,10 +688,6 @@ impl ArrayBase where S: Data, D: Dimension is_standard_layout(&self.dim, &self.strides) } - fn is_contiguous(&self) -> bool { - D::is_contiguous(&self.dim, &self.strides) - } - /// Return a pointer to the first element in the array. /// /// Raw access to array elements needs to follow the strided indexing diff --git a/src/impl_views.rs b/src/impl_views.rs index 35a355adb..d137dca0c 100644 --- a/src/impl_views.rs +++ b/src/impl_views.rs @@ -14,6 +14,62 @@ use error::ShapeError; use StrideShape; +pub trait ArrayViewPrivate { + type Item; + type Slice: IntoIterator; + unsafe fn into_slice_memory_order(self) -> Self::Slice; + + fn into_fold(self, acc: Acc, f: F) -> Acc + where F: FnMut(Acc, Self::Item) -> Acc; +} + +impl<'a, A, D> ArrayViewPrivate for ArrayView<'a, A, D> + where D: Dimension, +{ + type Item = &'a A; + type Slice = &'a [A]; + unsafe fn into_slice_memory_order(self) -> Self::Slice { + debug_assert!(self.is_contiguous()); + slice::from_raw_parts(self.ptr, self.len()) + } + + fn into_fold(self, acc: Acc, f: F) -> Acc + where F: FnMut(Acc, Self::Item) -> Acc + { + if self.is_contiguous() { + unsafe { + self.into_slice_memory_order().into_iter().fold(acc, f) + } + } else { + self.into_iter().fold(acc, f) + } + } +} + + +impl<'a, A, D> ArrayViewPrivate for ArrayViewMut<'a, A, D> + where D: Dimension, +{ + type Item = &'a mut A; + type Slice = &'a mut [A]; + unsafe fn into_slice_memory_order(self) -> Self::Slice { + debug_assert!(self.is_contiguous()); + slice::from_raw_parts_mut(self.ptr, self.len()) + } + + fn into_fold(self, acc: Acc, f: F) -> Acc + where F: FnMut(Acc, Self::Item) -> Acc + { + if self.is_contiguous() { + unsafe { + self.into_slice_memory_order().into_iter().fold(acc, f) + } + } else { + self.into_iter().fold(acc, f) + } + } +} + /// # Methods Specific to Array Views /// /// Methods for read-only array views `ArrayView<'a, A, D>` @@ -126,7 +182,6 @@ impl<'a, A, D> ArrayBase, D> None } } - } /// Methods for read-write array views `ArrayViewMut<'a, A, D>` @@ -237,6 +292,5 @@ impl<'a, A, D> ArrayBase, D> None } } - } diff --git a/src/iterators/mod.rs b/src/iterators/mod.rs index e379fb96f..4f91c4b61 100644 --- a/src/iterators/mod.rs +++ b/src/iterators/mod.rs @@ -21,6 +21,9 @@ use super::{ Axis, }; +#[cfg(feature = "rayon")] +pub mod par; + /// Base for array iterators /// /// Iterator element type is `&'a A`. diff --git a/src/iterators/par.rs b/src/iterators/par.rs new file mode 100644 index 000000000..c0ec90db1 --- /dev/null +++ b/src/iterators/par.rs @@ -0,0 +1,225 @@ + + +use rayon::par_iter::ParallelIterator; +use rayon::par_iter::IntoParallelIterator; +use rayon::par_iter::IndexedParallelIterator; +use rayon::par_iter::ExactParallelIterator; +use rayon::par_iter::BoundedParallelIterator; +use rayon::par_iter::internal::{Consumer, UnindexedConsumer}; +use rayon::par_iter::internal::bridge; +use rayon::par_iter::internal::bridge_unindexed; +use rayon::par_iter::internal::ProducerCallback; +use rayon::par_iter::internal::Producer; +use rayon::par_iter::internal::UnindexedProducer; +#[cfg(rayon_fold_with)] +use rayon::par_iter::internal::Folder; + +use super::AxisIter; +use super::AxisIterMut; +use imp_prelude::*; + +/// Iterator wrapper for parallelized implementations. +/// +/// **Requires crate feature `"rayon"`** +#[derive(Copy, Clone, Debug)] +pub struct Parallel { + iter: I, +} + +macro_rules! par_iter_wrapper { + // thread_bounds are either Sync or Send + Sync + ($iter_name:ident, [$($thread_bounds:tt)*]) => { + /// This iterator can be turned into a parallel iterator (rayon crate). + /// + /// **Requires crate feature `"rayon"`** + impl<'a, A, D> IntoParallelIterator for $iter_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + impl<'a, A, D> ParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$iter_name<'a, A, D> as Iterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge(self, consumer) + } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } + } + + impl<'a, A, D> IndexedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn with_producer(self, callback: Cb) -> Cb::Output + where Cb: ProducerCallback + { + callback.callback(self.iter) + } + } + + impl<'a, A, D> ExactParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn len(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } + } + + impl<'a, A, D> BoundedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn upper_bound(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } + + fn drive(self, consumer: C) -> C::Result + where C: Consumer + { + bridge(self, consumer) + } + } + + // This is the real magic, I guess + + impl<'a, A, D> Producer for $iter_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + fn cost(&mut self, len: usize) -> f64 { + // FIXME: No idea about what this is + len as f64 + } + + fn split_at(self, i: usize) -> (Self, Self) { + self.split_at(i) + } + } + } +} + +par_iter_wrapper!(AxisIter, [Sync]); +par_iter_wrapper!(AxisIterMut, [Send + Sync]); + +impl<'a, A, D> IntoParallelIterator for &'a Array + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a RcArray + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a mut Array + where D: Dimension, + A: Sync + Send +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a mut RcArray + where D: Dimension, + A: Sync + Send + Clone, +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} + +macro_rules! par_iter_view_wrapper { + // thread_bounds are either Sync or Send + Sync + ($view_name:ident, [$($thread_bounds:tt)*]) => { + impl<'a, A, D> IntoParallelIterator for $view_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + + impl<'a, A, D> ParallelIterator for Parallel<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$view_name<'a, A, D> as IntoIterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge_unindexed(self.iter, consumer) + } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } + } + + impl<'a, A, D> UnindexedProducer for $view_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + fn can_split(&self) -> bool { + self.len() > 1 + } + + fn split(self) -> (Self, Self) { + let max_axis = self.max_stride_axis(); + let mid = self.len_of(max_axis) / 2; + let (a, b) = self.split_at(max_axis, mid); + //println!("Split along axis {:?} at {}\nshapes {:?}, {:?}", max_axis, mid, a.shape(), b.shape()); + (a, b) + } + + #[cfg(rayon_fold_with)] + fn fold_with(self, folder: F) -> F + where F: Folder, + { + self.into_fold(folder, move |f, elt| f.consume(elt)) + } + } + + } +} + +par_iter_view_wrapper!(ArrayView, [Sync]); +par_iter_view_wrapper!(ArrayViewMut, [Sync + Send]); diff --git a/src/lib.rs b/src/lib.rs index 5e049b13e..4bfb88154 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,9 @@ //! - Enable transparent BLAS support for matrix multiplication. //! Uses ``blas-sys`` for pluggable backend, which needs to be configured //! separately. +//! - `rayon` +//! - Optional. +//! - Implement rayon 0.6 parallelization. //! #[cfg(feature = "serde")] @@ -77,6 +80,8 @@ extern crate matrixmultiply; extern crate itertools; extern crate num_traits as libnum; extern crate num_complex; +#[cfg(feature = "rayon")] +extern crate rayon; use std::iter::Zip; use std::marker::PhantomData; @@ -124,6 +129,8 @@ mod array_serde; mod array_serialize; mod arrayformat; mod data_traits; +#[cfg(feature = "rayon")] +pub mod par; pub use aliases::*; @@ -580,6 +587,9 @@ impl ArrayBase } } + fn is_contiguous(&self) -> bool { + D::is_contiguous(&self.dim, &self.strides) + } /// Apply closure `f` to each element in the array, in whatever /// order is the fastest to visit. diff --git a/src/par/mod.rs b/src/par/mod.rs new file mode 100644 index 000000000..717e35775 --- /dev/null +++ b/src/par/mod.rs @@ -0,0 +1,6 @@ + +//! Parallelization features for ndarray. +//! +//! **Requires crate feature `"rayon"`** + +pub use iterators::par::Parallel; diff --git a/tests/rayon.rs b/tests/rayon.rs new file mode 100644 index 000000000..8ff19ae84 --- /dev/null +++ b/tests/rayon.rs @@ -0,0 +1,43 @@ +#![cfg(feature = "rayon")] + +extern crate rayon; +#[macro_use(s)] extern crate ndarray; + +use ndarray::prelude::*; + +use rayon::prelude::*; + +const M: usize = 1024 * 10; +const N: usize = 100; + +#[test] +fn test_axis_iter() { + let mut a = Array2::::zeros((M, N)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + assert_eq!(a.axis_iter(Axis(0)).len(), M); + let s = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); + println!("{:?}", a.slice(s![..10, ..5])); + assert_eq!(s, a.scalar_sum()); +} + +#[test] +fn test_axis_iter_mut() { + let mut a = Array::linspace(0., 1.0f64, M * N).into_shape((M, N)).unwrap(); + let b = a.mapv(|x| x.exp()); + a.axis_iter_mut(Axis(0)).into_par_iter().for_each(|mut v| v.mapv_inplace(|x| x.exp())); + println!("{:?}", a.slice(s![..10, ..5])); + assert!(a.all_close(&b, 0.001)); +} + +#[test] +fn test_regular_iter() { + let mut a = Array2::::zeros((M, N)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + let s = a.par_iter().map(|&x| x).sum(); + println!("{:?}", a.slice(s![..10, ..5])); + assert_eq!(s, a.scalar_sum()); +}