Skip to content

Commit a849c47

Browse files
committed
Encapsulate the lock-free mpsc queue in the MessageQueue type
1 parent 1ce5081 commit a849c47

File tree

3 files changed

+15
-57
lines changed

3 files changed

+15
-57
lines changed

src/libstd/rt/message_queue.rs

+11-49
Original file line numberDiff line numberDiff line change
@@ -11,83 +11,45 @@
1111
//! A concurrent queue that supports multiple producers and a
1212
//! single consumer.
1313
14-
use container::Container;
1514
use kinds::Send;
1615
use vec::OwnedVector;
17-
use cell::Cell;
18-
use option::*;
19-
use unstable::sync::{UnsafeArc, LittleLock};
16+
use option::Option;
2017
use clone::Clone;
18+
use rt::mpsc_queue::Queue;
2119

2220
pub struct MessageQueue<T> {
23-
priv state: UnsafeArc<State<T>>
24-
}
25-
26-
struct State<T> {
27-
count: uint,
28-
queue: ~[T],
29-
lock: LittleLock
21+
priv queue: Queue<T>
3022
}
3123

3224
impl<T: Send> MessageQueue<T> {
3325
pub fn new() -> MessageQueue<T> {
3426
MessageQueue {
35-
state: UnsafeArc::new(State {
36-
count: 0,
37-
queue: ~[],
38-
lock: LittleLock::new()
39-
})
27+
queue: Queue::new()
4028
}
4129
}
4230

31+
#[inline]
4332
pub fn push(&mut self, value: T) {
44-
unsafe {
45-
let value = Cell::new(value);
46-
let state = self.state.get();
47-
do (*state).lock.lock {
48-
(*state).count += 1;
49-
(*state).queue.push(value.take());
50-
}
51-
}
33+
self.queue.push(value)
5234
}
5335

36+
#[inline]
5437
pub fn pop(&mut self) -> Option<T> {
55-
unsafe {
56-
let state = self.state.get();
57-
do (*state).lock.lock {
58-
if !(*state).queue.is_empty() {
59-
(*state).count += 1;
60-
Some((*state).queue.shift())
61-
} else {
62-
None
63-
}
64-
}
65-
}
38+
self.queue.pop()
6639
}
6740

6841
/// A pop that may sometimes miss enqueued elements, but is much faster
6942
/// to give up without doing any synchronization
43+
#[inline]
7044
pub fn casual_pop(&mut self) -> Option<T> {
71-
unsafe {
72-
let state = self.state.get();
73-
// NB: Unsynchronized check
74-
if (*state).count == 0 { return None; }
75-
do (*state).lock.lock {
76-
if !(*state).queue.is_empty() {
77-
(*state).count += 1;
78-
Some((*state).queue.shift())
79-
} else {
80-
None
81-
}
82-
}
83-
}
45+
self.queue.pop()
8446
}
8547
}
8648

8749
impl<T: Send> Clone for MessageQueue<T> {
8850
fn clone(&self) -> MessageQueue<T> {
8951
MessageQueue {
90-
state: self.state.clone()
52+
queue: self.queue.clone()
9153
}
9254
}
9355
}

src/libstd/rt/mpsc_queue.rs

-4
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,6 @@ impl<T: Send> Queue<T> {
159159
unsafe { (*self.state.get()).push(value) }
160160
}
161161

162-
pub fn casual_pop(&mut self) -> Option<T> {
163-
unsafe { (*self.state.get()).pop() }
164-
}
165-
166162
pub fn pop(&mut self) -> Option<T> {
167163
unsafe{ (*self.state.get()).pop() }
168164
}

src/libstd/rt/sched.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use super::stack::{StackPool};
1919
use super::rtio::EventLoop;
2020
use super::context::Context;
2121
use super::task::{Task, AnySched, Sched};
22-
use super::mpsc_queue::Queue;
22+
use super::message_queue::MessageQueue;
2323
use rt::kill::BlockedTask;
2424
use rt::local_ptr;
2525
use rt::local::Local;
@@ -47,7 +47,7 @@ pub struct Scheduler {
4747
/// The queue of incoming messages from other schedulers.
4848
/// These are enqueued by SchedHandles after which a remote callback
4949
/// is triggered to handle the message.
50-
priv message_queue: Queue<SchedMessage>,
50+
priv message_queue: MessageQueue<SchedMessage>,
5151
/// A shared list of sleeping schedulers. We'll use this to wake
5252
/// up schedulers when pushing work onto the work queue.
5353
sleeper_list: SleeperList,
@@ -137,7 +137,7 @@ impl Scheduler {
137137

138138
let mut sched = Scheduler {
139139
sleeper_list: sleeper_list,
140-
message_queue: Queue::new(),
140+
message_queue: MessageQueue::new(),
141141
sleepy: false,
142142
no_sleep: false,
143143
event_loop: event_loop,
@@ -802,7 +802,7 @@ pub enum SchedMessage {
802802

803803
pub struct SchedHandle {
804804
priv remote: ~RemoteCallback,
805-
priv queue: Queue<SchedMessage>,
805+
priv queue: MessageQueue<SchedMessage>,
806806
sched_id: uint
807807
}
808808

0 commit comments

Comments
 (0)