From 4e7a595e3139ed5dbc2fca9d2318ac89ddfff5dc Mon Sep 17 00:00:00 2001 From: Bill Wood Date: Wed, 3 Nov 2021 12:24:20 -0400 Subject: [PATCH 1/3] update slice.rs to use merge sort for improved sort performance add debug_assert(v.len() >= 2) in insert_end() add debug_assert in swap_slices --- library/alloc/src/slice.rs | 272 +++++++++++++++++++------------------ 1 file changed, 140 insertions(+), 132 deletions(-) diff --git a/library/alloc/src/slice.rs b/library/alloc/src/slice.rs index 199b3c9d0290c..6bfd954cd7f77 100644 --- a/library/alloc/src/slice.rs +++ b/library/alloc/src/slice.rs @@ -249,10 +249,10 @@ impl [T] { /// /// # Current implementation /// - /// The current algorithm is an adaptive, iterative merge sort inspired by - /// [timsort](https://en.wikipedia.org/wiki/Timsort). - /// It is designed to be very fast in cases where the slice is nearly sorted, or consists of - /// two or more sorted sequences concatenated one after another. + /// This is a stable two stage merge sort with pre-sorted prefix optimization. + /// It is quite fast in cases where the slice is nearly sorted, or consists of + /// two or more sorted sequences concatenated one after another, while remaining very fast for + /// randomly sorted sequences. /// /// Also, it allocates temporary storage half the size of `self`, but for short slices a /// non-allocating insertion sort is used instead. @@ -302,10 +302,10 @@ impl [T] { /// /// # Current implementation /// - /// The current algorithm is an adaptive, iterative merge sort inspired by - /// [timsort](https://en.wikipedia.org/wiki/Timsort). - /// It is designed to be very fast in cases where the slice is nearly sorted, or consists of - /// two or more sorted sequences concatenated one after another. + /// This is a stable two stage merge sort with pre-sorted prefix optimization. + /// It is quite fast in cases where the slice is nearly sorted, or consists of + /// two or more sorted sequences concatenated one after another, while remaining very fast for + /// randomly sorted sequences. /// /// Also, it allocates temporary storage half the size of `self`, but for short slices a /// non-allocating insertion sort is used instead. @@ -879,34 +879,38 @@ impl ToOwned for [T] { // Sorting //////////////////////////////////////////////////////////////////////////////// -/// Inserts `v[0]` into pre-sorted sequence `v[1..]` so that whole `v[..]` becomes sorted. +/// Inserts `v[v.len() - 1]` into pre-sorted sequence `v[..v.len() - 1]` so that whole `v[..]` becomes sorted. /// /// This is the integral subroutine of insertion sort. #[cfg(not(no_global_oom_handling))] -fn insert_head(v: &mut [T], is_less: &mut F) +// benchmarking indicated that inlining makes a substantial improvement, yet only requires a couple of hundred bytes +#[inline(always)] +fn insert_end(v: &mut [T], is_less: &mut F) where F: FnMut(&T, &T) -> bool, { - if v.len() >= 2 && is_less(&v[1], &v[0]) { - unsafe { + debug_assert!(v.len() >= 2); + let end = v.len() - 1; + unsafe { + if is_less(v.get_unchecked(end), v.get_unchecked(end - 1)) { // There are three ways to implement insertion here: // - // 1. Swap adjacent elements until the first one gets to its final destination. + // 1. Swap adjacent elements until the last one gets to its final destination. // However, this way we copy data around more than is necessary. If elements are big // structures (costly to copy), this method will be slow. // - // 2. Iterate until the right place for the first element is found. Then shift the - // elements succeeding it to make room for it and finally place it into the + // 2. Iterate until the right place for the last element is found. Then shift the + // elements preceeding it to make room for it and finally place it into the // remaining hole. This is a good method. // - // 3. Copy the first element into a temporary variable. Iterate until the right place + // 3. Copy the last element into a temporary variable. Iterate until the right place // for it is found. As we go along, copy every traversed element into the slot - // preceding it. Finally, copy data from the temporary variable into the remaining + // succeeding it. Finally, copy data from the temporary variable into the remaining // hole. This method is very good. Benchmarks demonstrated slightly better // performance than with the 2nd method. // // All methods were benchmarked, and the 3rd showed best results. So we chose that one. - let tmp = mem::ManuallyDrop::new(ptr::read(&v[0])); + let tmp = mem::ManuallyDrop::new(ptr::read(v.get_unchecked(end))); // Intermediate state of the insertion process is always tracked by `hole`, which // serves two purposes: @@ -918,15 +922,14 @@ where // If `is_less` panics at any point during the process, `hole` will get dropped and // fill the hole in `v` with `tmp`, thus ensuring that `v` still holds every object it // initially held exactly once. - let mut hole = InsertionHole { src: &*tmp, dest: &mut v[1] }; - ptr::copy_nonoverlapping(&v[1], &mut v[0], 1); - - for i in 2..v.len() { - if !is_less(&v[i], &*tmp) { - break; - } - ptr::copy_nonoverlapping(&v[i], &mut v[i - 1], 1); - hole.dest = &mut v[i]; + let mut hole = InsertionHole { src: &*tmp, dest: v.get_unchecked_mut(end - 1) }; + ptr::copy_nonoverlapping(hole.dest, v.get_unchecked_mut(end), 1); + + let mut i = end - 1; + while i > 0 && is_less(&*tmp, v.get_unchecked(i - 1)) { + hole.dest = v.get_unchecked_mut(i - 1); + ptr::copy_nonoverlapping(hole.dest, v.get_unchecked_mut(i), 1); + i -= 1; } // `hole` gets dropped and thus copies `tmp` into the remaining hole in `v`. } @@ -955,7 +958,7 @@ where /// The two slices must be non-empty and `mid` must be in bounds. Buffer `buf` must be long enough /// to hold a copy of the shorter slice. Also, `T` must not be a zero-sized type. #[cfg(not(no_global_oom_handling))] -unsafe fn merge(v: &mut [T], mid: usize, buf: *mut T, is_less: &mut F) +fn merge(v: &mut [T], mid: usize, buf: *mut T, is_less: &mut F) where F: FnMut(&T, &T) -> bool, { @@ -1034,13 +1037,15 @@ where // Finally, `hole` gets dropped. If the shorter run was not fully consumed, whatever remains of // it will now be copied into the hole in `v`. - unsafe fn get_and_increment(ptr: &mut *mut T) -> *mut T { + #[inline(always)] + fn get_and_increment(ptr: &mut *mut T) -> *mut T { let old = *ptr; *ptr = unsafe { ptr.offset(1) }; old } - unsafe fn decrement_and_get(ptr: &mut *mut T) -> *mut T { + #[inline(always)] + fn decrement_and_get(ptr: &mut *mut T) -> *mut T { *ptr = unsafe { ptr.offset(-1) }; *ptr } @@ -1063,27 +1068,38 @@ where } } -/// This merge sort borrows some (but not all) ideas from TimSort, which is described in detail -/// [here](https://github.com/python/cpython/blob/main/Objects/listsort.txt). -/// -/// The algorithm identifies strictly descending and non-descending subsequences, which are called -/// natural runs. There is a stack of pending runs yet to be merged. Each newly found run is pushed -/// onto the stack, and then some pairs of adjacent runs are merged until these two invariants are -/// satisfied: +/// This is a stable two stage merge sort with pre-sorted prefix optimization. The two stages are: /// -/// 1. for every `i` in `1..runs.len()`: `runs[i - 1].len > runs[i].len` -/// 2. for every `i` in `2..runs.len()`: `runs[i - 2].len > runs[i - 1].len + runs[i].len` +/// 1) Top-down recursive depth-first merge, which helps data locality +/// 2) Small slices sort using a fast insertion sort, then merge /// -/// The invariants ensure that the total running time is *O*(*n* \* log(*n*)) worst-case. +/// The total running time is *O*(*n* \* log(*n*)) worst-case. #[cfg(not(no_global_oom_handling))] fn merge_sort(v: &mut [T], mut is_less: F) where F: FnMut(&T, &T) -> bool, { + // `gt!` macro centralizes and clarifies the logic. + // Unchecked array access gives an approximate 20% performance improvement. + // + // # Safety + // + // `$left` and `$right` must be < `$v.len()` + macro_rules! gt { + ($v: ident, $left: expr, $right: expr, $is_less: ident) => { + // $is_less(&$v[$right], &$v[$left]) + { + debug_assert!($left < $v.len() && $right < $v.len()); + $is_less(unsafe { &$v.get_unchecked($right) }, unsafe { &$v.get_unchecked($left) }) + } + }; + } + + // Benchmarking determined these are the best sizes. + // Recursive merge switches to insertion sort / merge when slice length is <= SMALL_SLICE_LEN*2. + const SMALL_SLICE_LEN: usize = 10; // Slices of up to this length get sorted using insertion sort. const MAX_INSERTION: usize = 20; - // Very short runs are extended using insertion sort to span at least this many elements. - const MIN_RUN: usize = 10; // Sorting has no meaningful behavior on zero-sized types. if size_of::() == 0 { @@ -1091,112 +1107,104 @@ where } let len = v.len(); - // Short arrays get sorted in-place via insertion sort to avoid allocations. if len <= MAX_INSERTION { - if len >= 2 { - for i in (0..len - 1).rev() { - insert_head(&mut v[i..], &mut is_less); - } + for i in 1..len { + insert_end(&mut v[..=i], &mut is_less); } return; } // Allocate a buffer to use as scratch memory. We keep the length 0 so we can keep in it // shallow copies of the contents of `v` without risking the dtors running on copies if - // `is_less` panics. When merging two sorted runs, this buffer holds a copy of the shorter run, - // which will always have length at most `len / 2`. - let mut buf = Vec::with_capacity(len / 2); - - // In order to identify natural runs in `v`, we traverse it backwards. That might seem like a - // strange decision, but consider the fact that merges more often go in the opposite direction - // (forwards). According to benchmarks, merging forwards is slightly faster than merging - // backwards. To conclude, identifying runs by traversing backwards improves performance. - let mut runs = vec![]; - let mut end = len; - while end > 0 { - // Find the next natural run, and reverse it if it's strictly descending. - let mut start = end - 1; - if start > 0 { - start -= 1; - unsafe { - if is_less(v.get_unchecked(start + 1), v.get_unchecked(start)) { - while start > 0 && is_less(v.get_unchecked(start), v.get_unchecked(start - 1)) { - start -= 1; + // `is_less` panics. When merging two slices, this buffer holds a copy of the right-hand slice, + // which will always have length at most `(len + 1) / 2`. + let mut buf = Vec::with_capacity((len + 1) / 2); + slice_merge_sort(v, 0, buf.as_mut_ptr(), &mut is_less); + + // Do a recursive depth-first merge while slice's length is greater than SMALL_SLICE_LEN*2. + // Below that length use a combination of insertion sort and merging. + // For optimization, `sorted` tracks how much of the slice's prefix is already sorted. + fn slice_merge_sort(v: &mut [T], mut sorted: usize, buf_ptr: *mut T, is_less: &mut F) + where + F: FnMut(&T, &T) -> bool, + { + let len = v.len(); + // find length of sorted prefix + if sorted == 0 { + sorted = if len <= 1 { + len + } else if gt!(v, 0, 1, is_less) { + // strictly descending + let mut i = 2; + while i < len && gt!(v, i - 1, i, is_less) { + i += 1; + } + // Reverse the slice so we don't have to sort it later. + v[..i].reverse(); + i + } else { + // ascending + let mut i = 2; + while i < len && !gt!(v, i - 1, i, is_less) { + i += 1; + } + i + }; + } + + // Do merge sort, using `sorted` to avoid redundant sorting. + if sorted < len { + if len <= SMALL_SLICE_LEN + 2 { + for i in sorted..len { + insert_end(&mut v[..=i], is_less); + } + } else { + let mid; + if len > SMALL_SLICE_LEN * 2 { + mid = sorted.max(len / 2); + if sorted < mid { + slice_merge_sort(&mut v[..mid], sorted, buf_ptr, is_less); + } + slice_merge_sort(&mut v[mid..], 0, buf_ptr, is_less); + if !gt!(v, mid - 1, mid, is_less) { + return; + } else if gt!(v, 0, len - 1, is_less) { + // strictly reverse sorted + swap_slices(v, mid, buf_ptr); + return; } - v[start..end].reverse(); } else { - while start > 0 && !is_less(v.get_unchecked(start), v.get_unchecked(start - 1)) - { - start -= 1; + for i in sorted..SMALL_SLICE_LEN { + insert_end(&mut v[..=i], is_less); } + for i in SMALL_SLICE_LEN + 1..len { + insert_end(&mut v[SMALL_SLICE_LEN..=i], is_less); + } + if !gt!(v, SMALL_SLICE_LEN - 1, SMALL_SLICE_LEN, is_less) { + return; + } + mid = SMALL_SLICE_LEN; } + merge(v, mid, buf_ptr, is_less); } } - - // Insert some more elements into the run if it's too short. Insertion sort is faster than - // merge sort on short sequences, so this significantly improves performance. - while start > 0 && end - start < MIN_RUN { - start -= 1; - insert_head(&mut v[start..end], &mut is_less); - } - - // Push this run onto the stack. - runs.push(Run { start, len: end - start }); - end = start; - - // Merge some pairs of adjacent runs to satisfy the invariants. - while let Some(r) = collapse(&runs) { - let left = runs[r + 1]; - let right = runs[r]; - unsafe { - merge( - &mut v[left.start..right.start + right.len], - left.len, - buf.as_mut_ptr(), - &mut is_less, - ); - } - runs[r] = Run { start: left.start, len: left.len + right.len }; - runs.remove(r + 1); - } } - // Finally, exactly one run must remain in the stack. - debug_assert!(runs.len() == 1 && runs[0].start == 0 && runs[0].len == len); - - // Examines the stack of runs and identifies the next pair of runs to merge. More specifically, - // if `Some(r)` is returned, that means `runs[r]` and `runs[r + 1]` must be merged next. If the - // algorithm should continue building a new run instead, `None` is returned. - // - // TimSort is infamous for its buggy implementations, as described here: - // http://envisage-project.eu/timsort-specification-and-verification/ - // - // The gist of the story is: we must enforce the invariants on the top four runs on the stack. - // Enforcing them on just top three is not sufficient to ensure that the invariants will still - // hold for *all* runs in the stack. - // - // This function correctly checks invariants for the top four runs. Additionally, if the top - // run starts at index 0, it will always demand a merge operation until the stack is fully - // collapsed, in order to complete the sort. - #[inline] - fn collapse(runs: &[Run]) -> Option { - let n = runs.len(); - if n >= 2 - && (runs[n - 1].start == 0 - || runs[n - 2].len <= runs[n - 1].len - || (n >= 3 && runs[n - 3].len <= runs[n - 2].len + runs[n - 1].len) - || (n >= 4 && runs[n - 4].len <= runs[n - 3].len + runs[n - 2].len)) - { - if n >= 3 && runs[n - 3].len < runs[n - 1].len { Some(n - 3) } else { Some(n - 2) } - } else { - None + /// swap contents of left-hand and right-hand slices divided at `mid` + /// + /// # Safety + /// + /// `buf_ptr` must point to a slice of `buf` which is long enough to hold `v[mid..]` + /// v.len() >= mid because `swap_slices` is only called when: sorted < len && mid == sorted.max(len / 2) + fn swap_slices(v: &mut [T], mid: usize, buf_ptr: *mut T) { + debug_assert!(v.len() >= mid); + let rlen = v.len() - mid; + let v_ptr = v.as_mut_ptr(); + unsafe { + ptr::copy_nonoverlapping(v_ptr.add(mid), buf_ptr, rlen); + ptr::copy(v_ptr, v_ptr.add(rlen), mid); + ptr::copy_nonoverlapping(buf_ptr, v_ptr, rlen); } } - - #[derive(Clone, Copy)] - struct Run { - start: usize, - len: usize, - } } From a7edde1315c124edc962e2b5f58be5383b869160 Mon Sep 17 00:00:00 2001 From: Bill Wood Date: Thu, 11 Nov 2021 10:18:11 -0500 Subject: [PATCH 2/3] improve safety of merge sort per PR comments correct formatting issue --- library/alloc/src/slice.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/library/alloc/src/slice.rs b/library/alloc/src/slice.rs index 6bfd954cd7f77..3e905aacf7ffa 100644 --- a/library/alloc/src/slice.rs +++ b/library/alloc/src/slice.rs @@ -889,10 +889,9 @@ fn insert_end(v: &mut [T], is_less: &mut F) where F: FnMut(&T, &T) -> bool, { - debug_assert!(v.len() >= 2); - let end = v.len() - 1; - unsafe { - if is_less(v.get_unchecked(end), v.get_unchecked(end - 1)) { + let end = v.len().saturating_sub(1); + if end > 0 && is_less(&v[end], &v[end - 1]) { + unsafe { // There are three ways to implement insertion here: // // 1. Swap adjacent elements until the last one gets to its final destination. @@ -957,8 +956,9 @@ where /// /// The two slices must be non-empty and `mid` must be in bounds. Buffer `buf` must be long enough /// to hold a copy of the shorter slice. Also, `T` must not be a zero-sized type. +#[allow(unused_unsafe)] #[cfg(not(no_global_oom_handling))] -fn merge(v: &mut [T], mid: usize, buf: *mut T, is_less: &mut F) +unsafe fn merge(v: &mut [T], mid: usize, buf: *mut T, is_less: &mut F) where F: FnMut(&T, &T) -> bool, { @@ -1038,14 +1038,14 @@ where // it will now be copied into the hole in `v`. #[inline(always)] - fn get_and_increment(ptr: &mut *mut T) -> *mut T { + unsafe fn get_and_increment(ptr: &mut *mut T) -> *mut T { let old = *ptr; *ptr = unsafe { ptr.offset(1) }; old } #[inline(always)] - fn decrement_and_get(ptr: &mut *mut T) -> *mut T { + unsafe fn decrement_and_get(ptr: &mut *mut T) -> *mut T { *ptr = unsafe { ptr.offset(-1) }; *ptr } @@ -1186,7 +1186,9 @@ where } mid = SMALL_SLICE_LEN; } - merge(v, mid, buf_ptr, is_less); + unsafe { + merge(v, mid, buf_ptr, is_less); + } } } } @@ -1196,9 +1198,11 @@ where /// # Safety /// /// `buf_ptr` must point to a slice of `buf` which is long enough to hold `v[mid..]` - /// v.len() >= mid because `swap_slices` is only called when: sorted < len && mid == sorted.max(len / 2) + /// `v[mid..].len() <= buf.len()` because buf is sized as `(v.len() + 1) / 2' and `mid == sorted.max(v.len() / 2)` + /// + /// `mid` must be <= `v.len()` + /// `mid <= v.len()` because `swap_slices` is only called when: `sorted < len && mid == sorted.max(len / 2)` fn swap_slices(v: &mut [T], mid: usize, buf_ptr: *mut T) { - debug_assert!(v.len() >= mid); let rlen = v.len() - mid; let v_ptr = v.as_mut_ptr(); unsafe { From 68c6da75581c9cbdb350caf71092b2dea568aa61 Mon Sep 17 00:00:00 2001 From: Bill Wood Date: Wed, 17 Nov 2021 17:40:51 -0500 Subject: [PATCH 3/3] clarify safety of swap_slices, remove debug_asserts to see if performance improves on regression test --- library/alloc/src/slice.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/library/alloc/src/slice.rs b/library/alloc/src/slice.rs index 3e905aacf7ffa..b8dda9a9b108a 100644 --- a/library/alloc/src/slice.rs +++ b/library/alloc/src/slice.rs @@ -955,7 +955,7 @@ where /// # Safety /// /// The two slices must be non-empty and `mid` must be in bounds. Buffer `buf` must be long enough -/// to hold a copy of the shorter slice. Also, `T` must not be a zero-sized type. +/// to hold a copy of the shorter slice. #[allow(unused_unsafe)] #[cfg(not(no_global_oom_handling))] unsafe fn merge(v: &mut [T], mid: usize, buf: *mut T, is_less: &mut F) @@ -1088,10 +1088,7 @@ where macro_rules! gt { ($v: ident, $left: expr, $right: expr, $is_less: ident) => { // $is_less(&$v[$right], &$v[$left]) - { - debug_assert!($left < $v.len() && $right < $v.len()); - $is_less(unsafe { &$v.get_unchecked($right) }, unsafe { &$v.get_unchecked($left) }) - } + $is_less(unsafe { &$v.get_unchecked($right) }, unsafe { &$v.get_unchecked($left) }) }; } @@ -1171,7 +1168,9 @@ where return; } else if gt!(v, 0, len - 1, is_less) { // strictly reverse sorted - swap_slices(v, mid, buf_ptr); + unsafe { + swap_slices(v, mid, buf_ptr); + } return; } } else { @@ -1202,7 +1201,8 @@ where /// /// `mid` must be <= `v.len()` /// `mid <= v.len()` because `swap_slices` is only called when: `sorted < len && mid == sorted.max(len / 2)` - fn swap_slices(v: &mut [T], mid: usize, buf_ptr: *mut T) { + #[allow(unused_unsafe)] + unsafe fn swap_slices(v: &mut [T], mid: usize, buf_ptr: *mut T) { let rlen = v.len() - mid; let v_ptr = v.as_mut_ptr(); unsafe {