Skip to content

Commit 1e6e98c

Browse files
committedMar 25, 2014
auto merge of #12991 : alexcrichton/rust/sync-chan, r=brson
This commit contains an implementation of synchronous, bounded channels for Rust. This is an implementation of the proposal made last January [1]. These channels are built on mutexes, and currently focus on a working implementation rather than speed. Receivers for sync channels have select() implemented for them, but there is currently no implementation of select() for sync senders. Rust will continue to provide both synchronous and asynchronous channels as part of the standard distribution, there is no intent to remove asynchronous channels. This flavor of channels is meant to provide an alternative to asynchronous channels because like green tasks, asynchronous channels are not appropriate for all situations. [1] - https://mail.mozilla.org/pipermail/rust-dev/2014-January/007924.html
·
1.88.00.10
2 parents 6bf3fca + 56cae9b commit 1e6e98c

File tree

11 files changed

+1229
-115
lines changed

11 files changed

+1229
-115
lines changed
 

‎src/libstd/comm/mod.rs

Lines changed: 696 additions & 2 deletions
Large diffs are not rendered by default.

‎src/libstd/comm/select.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,4 +648,40 @@ mod test {
648648
tx1.send(());
649649
rx2.recv();
650650
})
651+
652+
test!(fn sync1() {
653+
let (tx, rx) = sync_channel(1);
654+
tx.send(1);
655+
select! {
656+
n = rx.recv() => { assert_eq!(n, 1); }
657+
}
658+
})
659+
660+
test!(fn sync2() {
661+
let (tx, rx) = sync_channel(0);
662+
spawn(proc() {
663+
for _ in range(0, 100) { task::deschedule() }
664+
tx.send(1);
665+
});
666+
select! {
667+
n = rx.recv() => { assert_eq!(n, 1); }
668+
}
669+
})
670+
671+
test!(fn sync3() {
672+
let (tx1, rx1) = sync_channel(0);
673+
let (tx2, rx2) = channel();
674+
spawn(proc() { tx1.send(1); });
675+
spawn(proc() { tx2.send(2); });
676+
select! {
677+
n = rx1.recv() => {
678+
assert_eq!(n, 1);
679+
assert_eq!(rx2.recv(), 2);
680+
},
681+
n = rx2.recv() => {
682+
assert_eq!(n, 2);
683+
assert_eq!(rx1.recv(), 1);
684+
}
685+
}
686+
})
651687
}

‎src/libstd/comm/sync.rs

Lines changed: 485 additions & 0 deletions
Large diffs are not rendered by default.

‎src/libstd/io/fs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,10 +1282,10 @@ mod test {
12821282
}
12831283

12841284
iotest!(fn binary_file() {
1285-
use rand::{Rng, task_rng};
1285+
use rand::{StdRng, Rng};
12861286

12871287
let mut bytes = [0, ..1024];
1288-
task_rng().fill_bytes(bytes);
1288+
StdRng::new().fill_bytes(bytes);
12891289

12901290
let tmpdir = tmpdir();
12911291

‎src/libstd/prelude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub use slice::{Vector, VectorVector, CloneableVector, ImmutableVector};
6262
pub use vec::Vec;
6363

6464
// Reexported runtime types
65-
pub use comm::{channel, Sender, Receiver};
65+
pub use comm::{sync_channel, channel, SyncSender, Sender, Receiver};
6666
pub use task::spawn;
6767

6868
// Reexported statics

‎src/libstd/rt/args.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ mod imp {
6969
use iter::Iterator;
7070
use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
7171
use mem;
72+
#[cfg(not(test))] use ptr::RawPtr;
7273

7374
static mut global_args_ptr: uint = 0;
7475
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;

‎src/libstd/rt/task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,8 @@ mod test {
433433
434434
#[test]
435435
fn rng() {
436-
use rand::{Rng, task_rng};
437-
let mut r = task_rng();
436+
use rand::{StdRng, Rng};
437+
let mut r = StdRng::new();
438438
let _ = r.next_u32();
439439
}
440440

‎src/libstd/unstable/mutex.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,9 +580,9 @@ mod test {
580580
fn smoke_cond() {
581581
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;
582582
unsafe {
583-
let mut guard = lock.lock();
583+
let guard = lock.lock();
584584
let t = Thread::start(proc() {
585-
let mut guard = lock.lock();
585+
let guard = lock.lock();
586586
guard.signal();
587587
});
588588
guard.wait();

‎src/libstd/vec.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,13 +1355,8 @@ impl<T> Drop for MoveItems<T> {
13551355

13561356
#[cfg(test)]
13571357
mod tests {
1358-
use super::Vec;
1359-
use iter::{Iterator, range, Extendable};
1360-
use mem::{drop, size_of};
1361-
use ops::Drop;
1362-
use option::{Some, None};
1363-
use container::Container;
1364-
use slice::{Vector, MutableVector, ImmutableVector};
1358+
use prelude::*;
1359+
use mem::size_of;
13651360

13661361
#[test]
13671362
fn test_small_vec_struct() {

‎src/libsync/comm.rs

Lines changed: 1 addition & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -51,54 +51,9 @@ impl<S:Send,R:Send> DuplexStream<S, R> {
5151
}
5252
}
5353

54-
/// An extension of `pipes::stream` that provides synchronous message sending.
55-
pub struct SyncSender<S> { priv duplex_stream: DuplexStream<S, ()> }
56-
/// An extension of `pipes::stream` that acknowledges each message received.
57-
pub struct SyncReceiver<R> { priv duplex_stream: DuplexStream<(), R> }
58-
59-
impl<S: Send> SyncSender<S> {
60-
pub fn send(&self, val: S) {
61-
assert!(self.try_send(val), "SyncSender.send: receiving port closed");
62-
}
63-
64-
/// Sends a message, or report if the receiver has closed the connection
65-
/// before receiving.
66-
pub fn try_send(&self, val: S) -> bool {
67-
self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
68-
}
69-
}
70-
71-
impl<R: Send> SyncReceiver<R> {
72-
pub fn recv(&self) -> R {
73-
self.recv_opt().expect("SyncReceiver.recv: sending channel closed")
74-
}
75-
76-
pub fn recv_opt(&self) -> Option<R> {
77-
self.duplex_stream.recv_opt().map(|val| {
78-
self.duplex_stream.try_send(());
79-
val
80-
})
81-
}
82-
83-
pub fn try_recv(&self) -> comm::TryRecvResult<R> {
84-
match self.duplex_stream.try_recv() {
85-
comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
86-
state => state,
87-
}
88-
}
89-
}
90-
91-
/// Creates a stream whose channel, upon sending a message, blocks until the
92-
/// message is received.
93-
pub fn rendezvous<T: Send>() -> (SyncReceiver<T>, SyncSender<T>) {
94-
let (chan_stream, port_stream) = duplex();
95-
(SyncReceiver { duplex_stream: port_stream },
96-
SyncSender { duplex_stream: chan_stream })
97-
}
98-
9954
#[cfg(test)]
10055
mod test {
101-
use comm::{duplex, rendezvous};
56+
use comm::{duplex};
10257

10358

10459
#[test]
@@ -111,56 +66,4 @@ mod test {
11166
assert!(left.recv() == 123);
11267
assert!(right.recv() == ~"abc");
11368
}
114-
115-
#[test]
116-
pub fn basic_rendezvous_test() {
117-
let (port, chan) = rendezvous();
118-
119-
spawn(proc() {
120-
chan.send("abc");
121-
});
122-
123-
assert!(port.recv() == "abc");
124-
}
125-
126-
#[test]
127-
fn recv_a_lot() {
128-
// Rendezvous streams should be able to handle any number of messages being sent
129-
let (port, chan) = rendezvous();
130-
spawn(proc() {
131-
for _ in range(0, 10000) { chan.send(()); }
132-
});
133-
for _ in range(0, 10000) { port.recv(); }
134-
}
135-
136-
#[test]
137-
fn send_and_fail_and_try_recv() {
138-
let (port, chan) = rendezvous();
139-
spawn(proc() {
140-
chan.duplex_stream.send(()); // Can't access this field outside this module
141-
fail!()
142-
});
143-
port.recv()
144-
}
145-
146-
#[test]
147-
fn try_send_and_recv_then_fail_before_ack() {
148-
let (port, chan) = rendezvous();
149-
spawn(proc() {
150-
port.duplex_stream.recv();
151-
fail!()
152-
});
153-
chan.try_send(());
154-
}
155-
156-
#[test]
157-
#[should_fail]
158-
fn send_and_recv_then_fail_before_ack() {
159-
let (port, chan) = rendezvous();
160-
spawn(proc() {
161-
port.duplex_stream.recv();
162-
fail!()
163-
});
164-
chan.send(());
165-
}
16669
}

‎src/libsync/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#[cfg(test)]
2626
#[phase(syntax, link)] extern crate log;
2727

28-
pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex};
28+
pub use comm::{DuplexStream, duplex};
2929
pub use task_pool::TaskPool;
3030
pub use future::Future;
3131
pub use arc::{Arc, Weak};

0 commit comments

Comments
 (0)
Please sign in to comment.