Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a70f9d7

Browse files
committedNov 29, 2013
Implement a lock-free work-stealing deque
This adds an implementation of the Chase-Lev work-stealing deque to libstd under std::rt::deque. I've been unable to break the implementation of the deque itself, and it's not super highly optimized just yet (everything uses a SeqCst memory ordering). The major snag in implementing the chase-lev deque is that the buffers used to store data internally cannot get deallocated back to the OS. In the meantime, a shared buffer pool (synchronized by a normal mutex) is used to deallocate/allocate buffers from. This is done in hope of not overcommitting too much memory. It is in theory possible to eventually free the buffers, but one must be very careful in doing so. I was unable to get some good numbers from src/test/bench tests (I don't think many of them are slamming the work queue that much), but I was able to get some good numbers from one of my own tests. In a recent rewrite of select::select(), I found that my implementation was incredibly slow due to contention on the shared work queue. Upon switching to the parallel deque, I saw the contention drop to 0 and the runtime go from 1.6s to 0.9s with the most amount of time spent in libuv awakening the schedulers (plus allocations). Closes #4877
1 parent 08f4d1f commit a70f9d7

File tree

7 files changed

+723
-141
lines changed

7 files changed

+723
-141
lines changed
 

‎src/librustuv/net.rs‎

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,21 +1076,22 @@ mod test {
10761076
use std::rt::task::Task;
10771077
use std::rt::task::UnwindResult;
10781078
use std::rt::thread::Thread;
1079-
use std::rt::work_queue::WorkQueue;
1079+
use std::rt::deque::BufferPool;
10801080
use std::unstable::run_in_bare_thread;
10811081
use uvio::UvEventLoop;
10821082
10831083
do run_in_bare_thread {
10841084
let sleepers = SleeperList::new();
1085-
let work_queue1 = WorkQueue::new();
1086-
let work_queue2 = WorkQueue::new();
1087-
let queues = ~[work_queue1.clone(), work_queue2.clone()];
1085+
let mut pool = BufferPool::init();
1086+
let (worker1, stealer1) = pool.deque();
1087+
let (worker2, stealer2) = pool.deque();
1088+
let queues = ~[stealer1, stealer2];
10881089
10891090
let loop1 = ~UvEventLoop::new() as ~EventLoop;
1090-
let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
1091+
let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
10911092
sleepers.clone());
10921093
let loop2 = ~UvEventLoop::new() as ~EventLoop;
1093-
let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
1094+
let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
10941095
sleepers.clone());
10951096
10961097
let handle1 = Cell::new(sched1.make_handle());

‎src/libstd/rt/deque.rs‎

Lines changed: 658 additions & 0 deletions
Large diffs are not rendered by default.

‎src/libstd/rt/mod.rs‎

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ use vec::{OwnedVector, MutableVector, ImmutableVector};
7575
use vec;
7676

7777
use self::thread::Thread;
78-
use self::work_queue::WorkQueue;
7978

8079
// the os module needs to reach into this helper, so allow general access
8180
// through this reexport.
@@ -130,9 +129,6 @@ pub mod rtio;
130129
/// or task-local storage.
131130
pub mod local;
132131

133-
/// A parallel work-stealing deque.
134-
pub mod work_queue;
135-
136132
/// A parallel queue.
137133
pub mod message_queue;
138134

@@ -142,6 +138,9 @@ mod mpsc_queue;
142138
/// A lock-free multi-producer, multi-consumer bounded queue.
143139
mod mpmc_bounded_queue;
144140

141+
/// A parallel work-stealing deque
142+
pub mod deque;
143+
145144
/// A parallel data structure for tracking sleeping schedulers.
146145
pub mod sleeper_list;
147146

@@ -287,22 +286,24 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
287286

288287
// Create a work queue for each scheduler, ntimes. Create an extra
289288
// for the main thread if that flag is set. We won't steal from it.
290-
let work_queues: ~[WorkQueue<~Task>] = vec::from_fn(nscheds, |_| WorkQueue::new());
289+
let mut pool = deque::BufferPool::init();
290+
let arr = vec::from_fn(nscheds, |_| pool.deque());
291+
let (workers, stealers) = vec::unzip(arr.move_iter());
291292

292293
// The schedulers.
293294
let mut scheds = ~[];
294295
// Handles to the schedulers. When the main task ends these will be
295296
// sent the Shutdown message to terminate the schedulers.
296297
let mut handles = ~[];
297298

298-
for work_queue in work_queues.iter() {
299+
for worker in workers.move_iter() {
299300
rtdebug!("inserting a regular scheduler");
300301

301302
// Every scheduler is driven by an I/O event loop.
302303
let loop_ = new_event_loop();
303304
let mut sched = ~Scheduler::new(loop_,
304-
work_queue.clone(),
305-
work_queues.clone(),
305+
worker,
306+
stealers.clone(),
306307
sleepers.clone());
307308
let handle = sched.make_handle();
308309

@@ -321,12 +322,12 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
321322

322323
// This scheduler needs a queue that isn't part of the stealee
323324
// set.
324-
let work_queue = WorkQueue::new();
325+
let (worker, _) = pool.deque();
325326

326327
let main_loop = new_event_loop();
327328
let mut main_sched = ~Scheduler::new_special(main_loop,
328-
work_queue,
329-
work_queues.clone(),
329+
worker,
330+
stealers.clone(),
330331
sleepers.clone(),
331332
false,
332333
Some(friend_handle));

‎src/libstd/rt/sched.rs‎

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
1313
use clone::Clone;
1414
use unstable::raw;
1515
use super::sleeper_list::SleeperList;
16-
use super::work_queue::WorkQueue;
1716
use super::stack::{StackPool};
1817
use super::rtio::EventLoop;
1918
use super::context::Context;
2019
use super::task::{Task, AnySched, Sched};
2120
use super::message_queue::MessageQueue;
2221
use rt::kill::BlockedTask;
22+
use rt::deque;
2323
use rt::local_ptr;
2424
use rt::local::Local;
2525
use rt::rtio::{RemoteCallback, PausibleIdleCallback, Callback};
@@ -39,14 +39,14 @@ use vec::{OwnedVector};
3939
/// in too much allocation and too many events.
4040
pub struct Scheduler {
4141
/// There are N work queues, one per scheduler.
42-
priv work_queue: WorkQueue<~Task>,
42+
work_queue: deque::Worker<~Task>,
4343
/// Work queues for the other schedulers. These are created by
4444
/// cloning the core work queues.
45-
work_queues: ~[WorkQueue<~Task>],
45+
work_queues: ~[deque::Stealer<~Task>],
4646
/// The queue of incoming messages from other schedulers.
4747
/// These are enqueued by SchedHandles after which a remote callback
4848
/// is triggered to handle the message.
49-
priv message_queue: MessageQueue<SchedMessage>,
49+
message_queue: MessageQueue<SchedMessage>,
5050
/// A shared list of sleeping schedulers. We'll use this to wake
5151
/// up schedulers when pushing work onto the work queue.
5252
sleeper_list: SleeperList,
@@ -56,33 +56,33 @@ pub struct Scheduler {
5656
/// not active since there are multiple event sources that may
5757
/// wake the scheduler. It just prevents the scheduler from pushing
5858
/// multiple handles onto the sleeper list.
59-
priv sleepy: bool,
59+
sleepy: bool,
6060
/// A flag to indicate we've received the shutdown message and should
6161
/// no longer try to go to sleep, but exit instead.
6262
no_sleep: bool,
6363
stack_pool: StackPool,
6464
/// The scheduler runs on a special task. When it is not running
6565
/// it is stored here instead of the work queue.
66-
priv sched_task: Option<~Task>,
66+
sched_task: Option<~Task>,
6767
/// An action performed after a context switch on behalf of the
6868
/// code running before the context switch
69-
priv cleanup_job: Option<CleanupJob>,
69+
cleanup_job: Option<CleanupJob>,
7070
/// Should this scheduler run any task, or only pinned tasks?
7171
run_anything: bool,
7272
/// If the scheduler shouldn't run some tasks, a friend to send
7373
/// them to.
74-
priv friend_handle: Option<SchedHandle>,
74+
friend_handle: Option<SchedHandle>,
7575
/// A fast XorShift rng for scheduler use
7676
rng: XorShiftRng,
7777
/// A toggleable idle callback
78-
priv idle_callback: Option<~PausibleIdleCallback>,
78+
idle_callback: Option<~PausibleIdleCallback>,
7979
/// A countdown that starts at a random value and is decremented
8080
/// every time a yield check is performed. When it hits 0 a task
8181
/// will yield.
82-
priv yield_check_count: uint,
82+
yield_check_count: uint,
8383
/// A flag to tell the scheduler loop it needs to do some stealing
8484
/// in order to introduce randomness as part of a yield
85-
priv steal_for_yield: bool,
85+
steal_for_yield: bool,
8686

8787
// n.b. currently destructors of an object are run in top-to-bottom in order
8888
// of field declaration. Due to its nature, the pausible idle callback
@@ -115,8 +115,8 @@ impl Scheduler {
115115
// * Initialization Functions
116116

117117
pub fn new(event_loop: ~EventLoop,
118-
work_queue: WorkQueue<~Task>,
119-
work_queues: ~[WorkQueue<~Task>],
118+
work_queue: deque::Worker<~Task>,
119+
work_queues: ~[deque::Stealer<~Task>],
120120
sleeper_list: SleeperList)
121121
-> Scheduler {
122122

@@ -127,8 +127,8 @@ impl Scheduler {
127127
}
128128

129129
pub fn new_special(event_loop: ~EventLoop,
130-
work_queue: WorkQueue<~Task>,
131-
work_queues: ~[WorkQueue<~Task>],
130+
work_queue: deque::Worker<~Task>,
131+
work_queues: ~[deque::Stealer<~Task>],
132132
sleeper_list: SleeperList,
133133
run_anything: bool,
134134
friend: Option<SchedHandle>)
@@ -440,11 +440,11 @@ impl Scheduler {
440440
let start_index = self.rng.gen_range(0, len);
441441
for index in range(0, len).map(|i| (i + start_index) % len) {
442442
match work_queues[index].steal() {
443-
Some(task) => {
443+
deque::Data(task) => {
444444
rtdebug!("found task by stealing");
445445
return Some(task)
446446
}
447-
None => ()
447+
_ => ()
448448
}
449449
};
450450
rtdebug!("giving up on stealing");
@@ -889,6 +889,7 @@ mod test {
889889
use borrow::to_uint;
890890
use rt::sched::{Scheduler};
891891
use cell::Cell;
892+
use rt::deque::BufferPool;
892893
use rt::thread::Thread;
893894
use rt::task::{Task, Sched};
894895
use rt::basic;
@@ -994,22 +995,22 @@ mod test {
994995
#[test]
995996
fn test_schedule_home_states() {
996997
use rt::sleeper_list::SleeperList;
997-
use rt::work_queue::WorkQueue;
998998
use rt::sched::Shutdown;
999999
use borrow;
10001000
use rt::comm::*;
10011001

10021002
do run_in_bare_thread {
10031003

10041004
let sleepers = SleeperList::new();
1005-
let normal_queue = WorkQueue::new();
1006-
let special_queue = WorkQueue::new();
1007-
let queues = ~[normal_queue.clone(), special_queue.clone()];
1005+
let mut pool = BufferPool::init();
1006+
let (normal_worker, normal_stealer) = pool.deque();
1007+
let (special_worker, special_stealer) = pool.deque();
1008+
let queues = ~[normal_stealer, special_stealer];
10081009

10091010
// Our normal scheduler
10101011
let mut normal_sched = ~Scheduler::new(
10111012
basic::event_loop(),
1012-
normal_queue,
1013+
normal_worker,
10131014
queues.clone(),
10141015
sleepers.clone());
10151016

@@ -1020,7 +1021,7 @@ mod test {
10201021
// Our special scheduler
10211022
let mut special_sched = ~Scheduler::new_special(
10221023
basic::event_loop(),
1023-
special_queue.clone(),
1024+
special_worker,
10241025
queues.clone(),
10251026
sleepers.clone(),
10261027
false,
@@ -1169,7 +1170,6 @@ mod test {
11691170
// Used to deadlock because Shutdown was never recvd.
11701171
#[test]
11711172
fn no_missed_messages() {
1172-
use rt::work_queue::WorkQueue;
11731173
use rt::sleeper_list::SleeperList;
11741174
use rt::stack::StackPool;
11751175
use rt::sched::{Shutdown, TaskFromFriend};
@@ -1178,13 +1178,13 @@ mod test {
11781178
do run_in_bare_thread {
11791179
stress_factor().times(|| {
11801180
let sleepers = SleeperList::new();
1181-
let queue = WorkQueue::new();
1182-
let queues = ~[queue.clone()];
1181+
let mut pool = BufferPool::init();
1182+
let (worker, stealer) = pool.deque();
11831183

11841184
let mut sched = ~Scheduler::new(
11851185
basic::event_loop(),
1186-
queue,
1187-
queues.clone(),
1186+
worker,
1187+
~[stealer],
11881188
sleepers.clone());
11891189

11901190
let mut handle = sched.make_handle();

‎src/libstd/rt/test.rs‎

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,25 @@ use rand;
2323
use result::{Result, Ok, Err};
2424
use rt::basic;
2525
use rt::comm::oneshot;
26+
use rt::deque::BufferPool;
2627
use rt::new_event_loop;
2728
use rt::sched::Scheduler;
2829
use rt::sleeper_list::SleeperList;
2930
use rt::task::Task;
3031
use rt::task::UnwindResult;
3132
use rt::thread::Thread;
32-
use rt::work_queue::WorkQueue;
3333
use unstable::{run_in_bare_thread};
34+
use vec;
3435
use vec::{OwnedVector, MutableVector, ImmutableVector};
3536

3637
pub fn new_test_uv_sched() -> Scheduler {
3738

38-
let queue = WorkQueue::new();
39-
let queues = ~[queue.clone()];
39+
let mut pool = BufferPool::init();
40+
let (worker, stealer) = pool.deque();
4041

4142
let mut sched = Scheduler::new(new_event_loop(),
42-
queue,
43-
queues,
43+
worker,
44+
~[stealer],
4445
SleeperList::new());
4546

4647
// Don't wait for the Shutdown message
@@ -50,13 +51,12 @@ pub fn new_test_uv_sched() -> Scheduler {
5051
}
5152

5253
pub fn new_test_sched() -> Scheduler {
53-
54-
let queue = WorkQueue::new();
55-
let queues = ~[queue.clone()];
54+
let mut pool = BufferPool::init();
55+
let (worker, stealer) = pool.deque();
5656

5757
let mut sched = Scheduler::new(basic::event_loop(),
58-
queue,
59-
queues,
58+
worker,
59+
~[stealer],
6060
SleeperList::new());
6161

6262
// Don't wait for the Shutdown message
@@ -227,18 +227,16 @@ pub fn run_in_mt_newsched_task(f: proc()) {
227227

228228
let mut handles = ~[];
229229
let mut scheds = ~[];
230-
let mut work_queues = ~[];
231230

232-
for _ in range(0u, nthreads) {
233-
let work_queue = WorkQueue::new();
234-
work_queues.push(work_queue);
235-
}
231+
let mut pool = BufferPool::<~Task>::init();
232+
let workers = range(0, nthreads).map(|_| pool.deque());
233+
let (workers, stealers) = vec::unzip(workers);
236234

237-
for i in range(0u, nthreads) {
235+
for worker in workers.move_iter() {
238236
let loop_ = new_event_loop();
239237
let mut sched = ~Scheduler::new(loop_,
240-
work_queues[i].clone(),
241-
work_queues.clone(),
238+
worker,
239+
stealers.clone(),
242240
sleepers.clone());
243241
let handle = sched.make_handle();
244242

‎src/libstd/rt/work_queue.rs‎

Lines changed: 0 additions & 75 deletions
This file was deleted.

‎src/libstd/task/spawn.rs‎

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ use rt::sched::{Scheduler, Shutdown, TaskFromFriend};
8484
use rt::task::{Task, Sched};
8585
use rt::task::UnwindResult;
8686
use rt::thread::Thread;
87-
use rt::work_queue::WorkQueue;
8887
use rt::{in_green_task_context, new_event_loop};
8988
use task::SingleThreaded;
9089
use task::TaskOpts;
@@ -111,11 +110,11 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
111110
// Since this is a 1:1 scheduler we create a queue not in
112111
// the stealee set. The run_anything flag is set false
113112
// which will disable stealing.
114-
let work_queue = WorkQueue::new();
113+
let (worker, _stealer) = (*sched).work_queue.pool().deque();
115114

116115
// Create a new scheduler to hold the new task
117116
let mut new_sched = ~Scheduler::new_special(new_event_loop(),
118-
work_queue,
117+
worker,
119118
(*sched).work_queues.clone(),
120119
(*sched).sleeper_list.clone(),
121120
false,

5 commit comments

Comments
 (5)

bors commented on Nov 29, 2013

@bors
Collaborator

saw approval from pcwalton
at alexcrichton@a70f9d7

bors commented on Nov 29, 2013

@bors
Collaborator

merging alexcrichton/rust/issue-4877 = a70f9d7 into auto

bors commented on Nov 29, 2013

@bors
Collaborator

alexcrichton/rust/issue-4877 = a70f9d7 merged ok, testing candidate = dd1184e

bors commented on Nov 29, 2013

@bors
Collaborator

fast-forwarding master to auto = dd1184e

Please sign in to comment.