Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f02cc6b

Browse files
committedAug 13, 2013
auto merge of #8411 : bblum/rust/assorted-fixes, r=brson
Each commit is pretty much what it says on the tin. r anybody.
2 parents c99b2b9 + 5ac8c57 commit f02cc6b

File tree

11 files changed

+469
-479
lines changed

11 files changed

+469
-479
lines changed
 

‎src/libstd/cell.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#[missing_doc];
1414

1515
use cast::transmute_mut;
16+
use unstable::finally::Finally;
1617
use prelude::*;
1718

1819
/*
@@ -65,18 +66,17 @@ impl<T> Cell<T> {
6566

6667
/// Calls a closure with a reference to the value.
6768
pub fn with_ref<R>(&self, op: &fn(v: &T) -> R) -> R {
68-
let v = self.take();
69-
let r = op(&v);
70-
self.put_back(v);
71-
r
69+
do self.with_mut_ref |ptr| { op(ptr) }
7270
}
7371

7472
/// Calls a closure with a mutable reference to the value.
7573
pub fn with_mut_ref<R>(&self, op: &fn(v: &mut T) -> R) -> R {
76-
let mut v = self.take();
77-
let r = op(&mut v);
78-
self.put_back(v);
79-
r
74+
let mut v = Some(self.take());
75+
do (|| {
76+
op(v.get_mut_ref())
77+
}).finally {
78+
self.put_back(v.take_unwrap());
79+
}
8080
}
8181
}
8282

‎src/libstd/rt/comm.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use kinds::Send;
1818
use rt;
1919
use rt::sched::Scheduler;
2020
use rt::local::Local;
21-
use rt::select::{Select, SelectPort};
21+
use rt::select::{SelectInner, SelectPortInner};
22+
use select::{Select, SelectPort};
2223
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
2324
use unstable::sync::UnsafeAtomicRcBox;
2425
use util::Void;
@@ -113,7 +114,9 @@ impl<T> ChanOne<T> {
113114
// 'do_resched' configures whether the scheduler immediately switches to
114115
// the receiving task, or leaves the sending task still running.
115116
fn try_send_inner(self, val: T, do_resched: bool) -> bool {
116-
rtassert!(!rt::in_sched_context());
117+
if do_resched {
118+
rtassert!(!rt::in_sched_context());
119+
}
117120

118121
let mut this = self;
119122
let mut recvr_active = true;
@@ -215,7 +218,7 @@ impl<T> PortOne<T> {
215218
}
216219
}
217220

218-
impl<T> Select for PortOne<T> {
221+
impl<T> SelectInner for PortOne<T> {
219222
#[inline] #[cfg(not(test))]
220223
fn optimistic_check(&mut self) -> bool {
221224
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
@@ -318,7 +321,9 @@ impl<T> Select for PortOne<T> {
318321
}
319322
}
320323

321-
impl<T> SelectPort<T> for PortOne<T> {
324+
impl<T> Select for PortOne<T> { }
325+
326+
impl<T> SelectPortInner<T> for PortOne<T> {
322327
fn recv_ready(self) -> Option<T> {
323328
let mut this = self;
324329
let packet = this.packet();
@@ -349,6 +354,8 @@ impl<T> SelectPort<T> for PortOne<T> {
349354
}
350355
}
351356

357+
impl<T> SelectPort<T> for PortOne<T> { }
358+
352359
impl<T> Peekable<T> for PortOne<T> {
353360
fn peek(&self) -> bool {
354361
unsafe {
@@ -513,7 +520,7 @@ impl<T> Peekable<T> for Port<T> {
513520
// of them, but a &Port<T> should also be selectable so you can select2 on it
514521
// alongside a PortOne<U> without passing the port by value in recv_ready.
515522

516-
impl<'self, T> Select for &'self Port<T> {
523+
impl<'self, T> SelectInner for &'self Port<T> {
517524
#[inline]
518525
fn optimistic_check(&mut self) -> bool {
519526
do self.next.with_mut_ref |pone| { pone.optimistic_check() }
@@ -531,7 +538,9 @@ impl<'self, T> Select for &'self Port<T> {
531538
}
532539
}
533540

534-
impl<T> Select for Port<T> {
541+
impl<'self, T> Select for &'self Port<T> { }
542+
543+
impl<T> SelectInner for Port<T> {
535544
#[inline]
536545
fn optimistic_check(&mut self) -> bool {
537546
(&*self).optimistic_check()
@@ -548,7 +557,9 @@ impl<T> Select for Port<T> {
548557
}
549558
}
550559

551-
impl<'self, T> SelectPort<T> for &'self Port<T> {
560+
impl<T> Select for Port<T> { }
561+
562+
impl<'self, T> SelectPortInner<T> for &'self Port<T> {
552563
fn recv_ready(self) -> Option<T> {
553564
match self.next.take().recv_ready() {
554565
Some(StreamPayload { val, next }) => {
@@ -560,6 +571,8 @@ impl<'self, T> SelectPort<T> for &'self Port<T> {
560571
}
561572
}
562573

574+
impl<'self, T> SelectPort<T> for &'self Port<T> { }
575+
563576
pub struct SharedChan<T> {
564577
// Just like Chan, but a shared AtomicOption instead of Cell
565578
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>

‎src/libstd/rt/kill.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,8 +488,8 @@ impl Death {
488488
rtassert!(self.unkillable == 0);
489489
self.unkillable = 1;
490490

491-
// FIXME(#7544): See corresponding fixme at the callsite in task.rs.
492-
// NB(#8192): Doesn't work with "let _ = ..."
491+
// NB. See corresponding comment at the callsite in task.rs.
492+
// FIXME(#8192): Doesn't work with "let _ = ..."
493493
{ use util; util::ignore(group); }
494494

495495
// Step 1. Decide if we need to collect child failures synchronously.

‎src/libstd/rt/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ pub mod tube;
142142
/// Simple reimplementation of core::comm
143143
pub mod comm;
144144

145-
/// Routines for select()ing on pipes.
146-
pub mod select;
145+
mod select;
147146

148147
// FIXME #5248 shouldn't be pub
149148
/// The runtime needs to be able to put a pointer into thread-local storage.

‎src/libstd/rt/select.rs

Lines changed: 5 additions & 305 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11-
use option::*;
12-
// use either::{Either, Left, Right};
11+
//! Module for private, abstraction-leaking select traits. Wrapped in std::select.
12+
1313
use rt::kill::BlockedTask;
1414
use rt::sched::Scheduler;
15-
use rt::local::Local;
15+
use option::Option;
1616

17-
/// Trait for message-passing primitives that can be select()ed on.
18-
pub trait Select {
17+
pub trait SelectInner {
1918
// Returns true if data was available.
2019
fn optimistic_check(&mut self) -> bool;
2120
// Returns true if data was available. If so, shall also wake() the task.
@@ -24,305 +23,6 @@ pub trait Select {
2423
fn unblock_from(&mut self) -> bool;
2524
}
2625

27-
/// Trait for message-passing primitives that can use the select2() convenience wrapper.
28-
// (This is separate from the above trait to enable heterogeneous lists of ports
29-
// that implement Select on different types to use select().)
30-
pub trait SelectPort<T> : Select {
26+
pub trait SelectPortInner<T> {
3127
fn recv_ready(self) -> Option<T>;
3228
}
33-
34-
/// Receive a message from any one of many ports at once.
35-
pub fn select<A: Select>(ports: &mut [A]) -> uint {
36-
if ports.is_empty() {
37-
fail!("can't select on an empty list");
38-
}
39-
40-
for (index, port) in ports.mut_iter().enumerate() {
41-
if port.optimistic_check() {
42-
return index;
43-
}
44-
}
45-
46-
// If one of the ports already contains data when we go to block on it, we
47-
// don't bother enqueueing on the rest of them, so we shouldn't bother
48-
// unblocking from it either. This is just for efficiency, not correctness.
49-
// (If not, we need to unblock from all of them. Length is a placeholder.)
50-
let mut ready_index = ports.len();
51-
52-
let sched = Local::take::<Scheduler>();
53-
do sched.deschedule_running_task_and_then |sched, task| {
54-
let task_handles = task.make_selectable(ports.len());
55-
56-
for (index, (port, task_handle)) in
57-
ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
58-
// If one of the ports has data by now, it will wake the handle.
59-
if port.block_on(sched, task_handle) {
60-
ready_index = index;
61-
break;
62-
}
63-
}
64-
}
65-
66-
// Task resumes. Now unblock ourselves from all the ports we blocked on.
67-
// If the success index wasn't reset, 'take' will just take all of them.
68-
// Iterate in reverse so the 'earliest' index that's ready gets returned.
69-
for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() {
70-
if port.unblock_from() {
71-
ready_index = index;
72-
}
73-
}
74-
75-
assert!(ready_index < ports.len());
76-
return ready_index;
77-
}
78-
79-
/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
80-
81-
impl <'self> Select for &'self mut Select {
82-
fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
83-
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
84-
self.block_on(sched, task)
85-
}
86-
fn unblock_from(&mut self) -> bool { self.unblock_from() }
87-
}
88-
89-
pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
90-
-> Either<(Option<TA>, B), (A, Option<TB>)> {
91-
let result = {
92-
let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
93-
select(ports)
94-
};
95-
match result {
96-
0 => Left ((a.recv_ready(), b)),
97-
1 => Right((a, b.recv_ready())),
98-
x => fail!("impossible case in select2: %?", x)
99-
}
100-
}
101-
102-
*/
103-
104-
#[cfg(test)]
105-
mod test {
106-
use super::*;
107-
use option::*;
108-
use rt::comm::*;
109-
use rt::test::*;
110-
use vec::*;
111-
use comm::GenericChan;
112-
use task;
113-
use cell::Cell;
114-
use iterator::{Iterator, range};
115-
116-
#[test] #[ignore(cfg(windows))] #[should_fail]
117-
fn select_doesnt_get_trolled() {
118-
select::<PortOne<()>>([]);
119-
}
120-
121-
/* non-blocking select tests */
122-
123-
#[cfg(test)]
124-
fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
125-
// Unfortunately this does not actually test the block_on early-break
126-
// codepath in select -- racing between the sender and the receiver in
127-
// separate tasks is necessary to get around the optimistic check.
128-
let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>()));
129-
let mut dead_chans = ~[];
130-
let mut ports = ports;
131-
for (i, chan) in chans.move_iter().enumerate() {
132-
if send_on_chans.contains(&i) {
133-
chan.send(());
134-
} else {
135-
dead_chans.push(chan);
136-
}
137-
}
138-
let ready_index = select(ports);
139-
assert!(send_on_chans.contains(&ready_index));
140-
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
141-
let _ = dead_chans;
142-
143-
// Same thing with streams instead.
144-
// FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
145-
let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>()));
146-
let mut dead_chans = ~[];
147-
let mut ports = ports;
148-
for (i, chan) in chans.move_iter().enumerate() {
149-
if send_on_chans.contains(&i) {
150-
chan.send(());
151-
} else {
152-
dead_chans.push(chan);
153-
}
154-
}
155-
let ready_index = select(ports);
156-
assert!(send_on_chans.contains(&ready_index));
157-
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
158-
let _ = dead_chans;
159-
}
160-
161-
#[test]
162-
fn select_one() {
163-
do run_in_newsched_task { select_helper(1, [0]) }
164-
}
165-
166-
#[test]
167-
fn select_two() {
168-
// NB. I would like to have a test that tests the first one that is
169-
// ready is the one that's returned, but that can't be reliably tested
170-
// with the randomized behaviour of optimistic_check.
171-
do run_in_newsched_task { select_helper(2, [1]) }
172-
do run_in_newsched_task { select_helper(2, [0]) }
173-
do run_in_newsched_task { select_helper(2, [1,0]) }
174-
}
175-
176-
#[test]
177-
fn select_a_lot() {
178-
do run_in_newsched_task { select_helper(12, [7,8,9]) }
179-
}
180-
181-
#[test]
182-
fn select_stream() {
183-
use util;
184-
use comm::GenericChan;
185-
use iter::Times;
186-
187-
// Sends 10 buffered packets, and uses select to retrieve them all.
188-
// Puts the port in a different spot in the vector each time.
189-
do run_in_newsched_task {
190-
let (ports, _) = unzip(from_fn(10, |_| stream()));
191-
let (port, chan) = stream();
192-
do 10.times { chan.send(31337); }
193-
let mut ports = ports;
194-
let mut port = Some(port);
195-
let order = [5u,0,4,3,2,6,9,8,7,1];
196-
for &index in order.iter() {
197-
// put the port in the vector at any index
198-
util::swap(port.get_mut_ref(), &mut ports[index]);
199-
assert!(select(ports) == index);
200-
// get it back out
201-
util::swap(port.get_mut_ref(), &mut ports[index]);
202-
// NB. Not recv(), because optimistic_check randomly fails.
203-
assert!(port.get_ref().recv_ready().unwrap() == 31337);
204-
}
205-
}
206-
}
207-
208-
#[test]
209-
fn select_unkillable() {
210-
do run_in_newsched_task {
211-
do task::unkillable { select_helper(2, [1]) }
212-
}
213-
}
214-
215-
/* blocking select tests */
216-
217-
#[test]
218-
fn select_blocking() {
219-
select_blocking_helper(true);
220-
select_blocking_helper(false);
221-
222-
fn select_blocking_helper(killable: bool) {
223-
do run_in_newsched_task {
224-
let (p1,_c) = oneshot();
225-
let (p2,c2) = oneshot();
226-
let mut ports = [p1,p2];
227-
228-
let (p3,c3) = oneshot();
229-
let (p4,c4) = oneshot();
230-
231-
let x = Cell::new((c2, p3, c4));
232-
do task::spawn {
233-
let (c2, p3, c4) = x.take();
234-
p3.recv(); // handshake parent
235-
c4.send(()); // normal receive
236-
task::yield();
237-
c2.send(()); // select receive
238-
}
239-
240-
// Try to block before child sends on c2.
241-
c3.send(());
242-
p4.recv();
243-
if killable {
244-
assert!(select(ports) == 1);
245-
} else {
246-
do task::unkillable { assert!(select(ports) == 1); }
247-
}
248-
}
249-
}
250-
}
251-
252-
#[test]
253-
fn select_racing_senders() {
254-
static NUM_CHANS: uint = 10;
255-
256-
select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
257-
select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
258-
select_racing_senders_helper(true, ~[0,1,2]);
259-
select_racing_senders_helper(false, ~[0,1,2]);
260-
select_racing_senders_helper(true, ~[3,4,5,6]);
261-
select_racing_senders_helper(false, ~[3,4,5,6]);
262-
select_racing_senders_helper(true, ~[7,8,9]);
263-
select_racing_senders_helper(false, ~[7,8,9]);
264-
265-
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
266-
use rt::test::spawntask_random;
267-
use iter::Times;
268-
269-
do run_in_newsched_task {
270-
// A bit of stress, since ordinarily this is just smoke and mirrors.
271-
do 4.times {
272-
let send_on_chans = send_on_chans.clone();
273-
do task::spawn {
274-
let mut ports = ~[];
275-
for i in range(0u, NUM_CHANS) {
276-
let (p,c) = oneshot();
277-
ports.push(p);
278-
if send_on_chans.contains(&i) {
279-
let c = Cell::new(c);
280-
do spawntask_random {
281-
task::yield();
282-
c.take().send(());
283-
}
284-
}
285-
}
286-
// nondeterministic result, but should succeed
287-
if killable {
288-
select(ports);
289-
} else {
290-
do task::unkillable { select(ports); }
291-
}
292-
}
293-
}
294-
}
295-
}
296-
}
297-
298-
#[test] #[ignore(cfg(windows))]
299-
fn select_killed() {
300-
do run_in_newsched_task {
301-
let (success_p, success_c) = oneshot::<bool>();
302-
let success_c = Cell::new(success_c);
303-
do task::try {
304-
let success_c = Cell::new(success_c.take());
305-
do task::unkillable {
306-
let (p,c) = oneshot();
307-
let c = Cell::new(c);
308-
do task::spawn {
309-
let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>()));
310-
let mut ports = dead_ps;
311-
select(ports); // should get killed; nothing should leak
312-
c.take().send(()); // must not happen
313-
// Make sure dead_cs doesn't get closed until after select.
314-
let _ = dead_cs;
315-
}
316-
do task::spawn {
317-
fail!(); // should kill sibling awake
318-
}
319-
320-
// wait for killed selector to close (NOT send on) its c.
321-
// hope to send 'true'.
322-
success_c.take().send(p.try_recv().is_none());
323-
}
324-
};
325-
assert!(success_p.recv());
326-
}
327-
}
328-
}

‎src/libstd/rt/task.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,10 @@ impl Task {
253253
}
254254
}
255255

256-
// FIXME(#7544): We pass the taskgroup into death so that it can be
257-
// dropped while the unkillable counter is set. This should not be
258-
// necessary except for an extraneous clone() in task/spawn.rs that
259-
// causes a killhandle to get dropped, which mustn't receive a kill
260-
// signal since we're outside of the unwinder's try() scope.
261-
// { let _ = self.taskgroup.take(); }
256+
// NB. We pass the taskgroup into death so that it can be dropped while
257+
// the unkillable counter is set. This is necessary for when the
258+
// taskgroup destruction code drops references on KillHandles, which
259+
// might require using unkillable (to synchronize with an unwrapper).
262260
self.death.collect_failure(!self.unwinder.unwinding, self.taskgroup.take());
263261
self.destroyed = true;
264262
}

‎src/libstd/select.rs

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
use cell::Cell;
12+
use comm;
13+
use container::Container;
14+
use iterator::Iterator;
15+
use option::*;
16+
// use either::{Either, Left, Right};
17+
// use rt::kill::BlockedTask;
18+
use rt::sched::Scheduler;
19+
use rt::select::{SelectInner, SelectPortInner};
20+
use rt::local::Local;
21+
use rt::rtio::EventLoop;
22+
use task;
23+
use vec::{OwnedVector, MutableVector};
24+
25+
/// Trait for message-passing primitives that can be select()ed on.
26+
pub trait Select : SelectInner { }
27+
28+
/// Trait for message-passing primitives that can use the select2() convenience wrapper.
29+
// (This is separate from the above trait to enable heterogeneous lists of ports
30+
// that implement Select on different types to use select().)
31+
pub trait SelectPort<T> : SelectPortInner<T> { }
32+
33+
/// Receive a message from any one of many ports at once. Returns the index of the
34+
/// port whose data is ready. (If multiple are ready, returns the lowest index.)
35+
pub fn select<A: Select>(ports: &mut [A]) -> uint {
36+
if ports.is_empty() {
37+
fail!("can't select on an empty list");
38+
}
39+
40+
for (index, port) in ports.mut_iter().enumerate() {
41+
if port.optimistic_check() {
42+
return index;
43+
}
44+
}
45+
46+
// If one of the ports already contains data when we go to block on it, we
47+
// don't bother enqueueing on the rest of them, so we shouldn't bother
48+
// unblocking from it either. This is just for efficiency, not correctness.
49+
// (If not, we need to unblock from all of them. Length is a placeholder.)
50+
let mut ready_index = ports.len();
51+
52+
// XXX: We're using deschedule...and_then in an unsafe way here (see #8132),
53+
// in that we need to continue mutating the ready_index in the environment
54+
// after letting the task get woken up. The and_then closure needs to delay
55+
// the task from resuming until all ports have become blocked_on.
56+
let (p,c) = comm::oneshot();
57+
let p = Cell::new(p);
58+
let c = Cell::new(c);
59+
60+
let sched = Local::take::<Scheduler>();
61+
do sched.deschedule_running_task_and_then |sched, task| {
62+
let task_handles = task.make_selectable(ports.len());
63+
64+
for (index, (port, task_handle)) in
65+
ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
66+
// If one of the ports has data by now, it will wake the handle.
67+
if port.block_on(sched, task_handle) {
68+
ready_index = index;
69+
break;
70+
}
71+
}
72+
73+
let c = Cell::new(c.take());
74+
do sched.event_loop.callback { c.take().send_deferred(()) }
75+
}
76+
77+
// Unkillable is necessary not because getting killed is dangerous here,
78+
// but to force the recv not to use the same kill-flag that we used for
79+
// selecting. Otherwise a user-sender could spuriously wakeup us here.
80+
do task::unkillable { p.take().recv(); }
81+
82+
// Task resumes. Now unblock ourselves from all the ports we blocked on.
83+
// If the success index wasn't reset, 'take' will just take all of them.
84+
// Iterate in reverse so the 'earliest' index that's ready gets returned.
85+
for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() {
86+
if port.unblock_from() {
87+
ready_index = index;
88+
}
89+
}
90+
91+
assert!(ready_index < ports.len());
92+
return ready_index;
93+
}
94+
95+
/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
96+
97+
impl <'self> Select for &'self mut Select {
98+
fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
99+
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
100+
self.block_on(sched, task)
101+
}
102+
fn unblock_from(&mut self) -> bool { self.unblock_from() }
103+
}
104+
105+
pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
106+
-> Either<(Option<TA>, B), (A, Option<TB>)> {
107+
let result = {
108+
let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
109+
select(ports)
110+
};
111+
match result {
112+
0 => Left ((a.recv_ready(), b)),
113+
1 => Right((a, b.recv_ready())),
114+
x => fail!("impossible case in select2: %?", x)
115+
}
116+
}
117+
118+
*/
119+
120+
#[cfg(test)]
121+
mod test {
122+
use super::*;
123+
use clone::Clone;
124+
use iter::Times;
125+
use option::*;
126+
use rt::comm::*;
127+
use rt::test::*;
128+
use vec::*;
129+
use comm::GenericChan;
130+
use task;
131+
use cell::Cell;
132+
use iterator::{Iterator, range};
133+
134+
#[test] #[ignore(cfg(windows))] #[should_fail]
135+
fn select_doesnt_get_trolled() {
136+
select::<PortOne<()>>([]);
137+
}
138+
139+
/* non-blocking select tests */
140+
141+
#[cfg(test)]
142+
fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
143+
// Unfortunately this does not actually test the block_on early-break
144+
// codepath in select -- racing between the sender and the receiver in
145+
// separate tasks is necessary to get around the optimistic check.
146+
let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>()));
147+
let mut dead_chans = ~[];
148+
let mut ports = ports;
149+
for (i, chan) in chans.move_iter().enumerate() {
150+
if send_on_chans.contains(&i) {
151+
chan.send(());
152+
} else {
153+
dead_chans.push(chan);
154+
}
155+
}
156+
let ready_index = select(ports);
157+
assert!(send_on_chans.contains(&ready_index));
158+
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
159+
let _ = dead_chans;
160+
161+
// Same thing with streams instead.
162+
// FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
163+
let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>()));
164+
let mut dead_chans = ~[];
165+
let mut ports = ports;
166+
for (i, chan) in chans.move_iter().enumerate() {
167+
if send_on_chans.contains(&i) {
168+
chan.send(());
169+
} else {
170+
dead_chans.push(chan);
171+
}
172+
}
173+
let ready_index = select(ports);
174+
assert!(send_on_chans.contains(&ready_index));
175+
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
176+
let _ = dead_chans;
177+
}
178+
179+
#[test]
180+
fn select_one() {
181+
do run_in_newsched_task { select_helper(1, [0]) }
182+
}
183+
184+
#[test]
185+
fn select_two() {
186+
// NB. I would like to have a test that tests the first one that is
187+
// ready is the one that's returned, but that can't be reliably tested
188+
// with the randomized behaviour of optimistic_check.
189+
do run_in_newsched_task { select_helper(2, [1]) }
190+
do run_in_newsched_task { select_helper(2, [0]) }
191+
do run_in_newsched_task { select_helper(2, [1,0]) }
192+
}
193+
194+
#[test]
195+
fn select_a_lot() {
196+
do run_in_newsched_task { select_helper(12, [7,8,9]) }
197+
}
198+
199+
#[test]
200+
fn select_stream() {
201+
use util;
202+
use comm::GenericChan;
203+
204+
// Sends 10 buffered packets, and uses select to retrieve them all.
205+
// Puts the port in a different spot in the vector each time.
206+
do run_in_newsched_task {
207+
let (ports, _) = unzip(from_fn(10, |_| stream()));
208+
let (port, chan) = stream();
209+
do 10.times { chan.send(31337); }
210+
let mut ports = ports;
211+
let mut port = Some(port);
212+
let order = [5u,0,4,3,2,6,9,8,7,1];
213+
for &index in order.iter() {
214+
// put the port in the vector at any index
215+
util::swap(port.get_mut_ref(), &mut ports[index]);
216+
assert!(select(ports) == index);
217+
// get it back out
218+
util::swap(port.get_mut_ref(), &mut ports[index]);
219+
// NB. Not recv(), because optimistic_check randomly fails.
220+
assert!(port.get_ref().recv_ready().unwrap() == 31337);
221+
}
222+
}
223+
}
224+
225+
#[test]
226+
fn select_unkillable() {
227+
do run_in_newsched_task {
228+
do task::unkillable { select_helper(2, [1]) }
229+
}
230+
}
231+
232+
/* blocking select tests */
233+
234+
#[test]
235+
fn select_blocking() {
236+
select_blocking_helper(true);
237+
select_blocking_helper(false);
238+
239+
fn select_blocking_helper(killable: bool) {
240+
do run_in_newsched_task {
241+
let (p1,_c) = oneshot();
242+
let (p2,c2) = oneshot();
243+
let mut ports = [p1,p2];
244+
245+
let (p3,c3) = oneshot();
246+
let (p4,c4) = oneshot();
247+
248+
let x = Cell::new((c2, p3, c4));
249+
do task::spawn {
250+
let (c2, p3, c4) = x.take();
251+
p3.recv(); // handshake parent
252+
c4.send(()); // normal receive
253+
task::yield();
254+
c2.send(()); // select receive
255+
}
256+
257+
// Try to block before child sends on c2.
258+
c3.send(());
259+
p4.recv();
260+
if killable {
261+
assert!(select(ports) == 1);
262+
} else {
263+
do task::unkillable { assert!(select(ports) == 1); }
264+
}
265+
}
266+
}
267+
}
268+
269+
#[test]
270+
fn select_racing_senders() {
271+
static NUM_CHANS: uint = 10;
272+
273+
select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
274+
select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
275+
select_racing_senders_helper(true, ~[0,1,2]);
276+
select_racing_senders_helper(false, ~[0,1,2]);
277+
select_racing_senders_helper(true, ~[3,4,5,6]);
278+
select_racing_senders_helper(false, ~[3,4,5,6]);
279+
select_racing_senders_helper(true, ~[7,8,9]);
280+
select_racing_senders_helper(false, ~[7,8,9]);
281+
282+
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
283+
use rt::test::spawntask_random;
284+
285+
do run_in_newsched_task {
286+
// A bit of stress, since ordinarily this is just smoke and mirrors.
287+
do 4.times {
288+
let send_on_chans = send_on_chans.clone();
289+
do task::spawn {
290+
let mut ports = ~[];
291+
for i in range(0u, NUM_CHANS) {
292+
let (p,c) = oneshot();
293+
ports.push(p);
294+
if send_on_chans.contains(&i) {
295+
let c = Cell::new(c);
296+
do spawntask_random {
297+
task::yield();
298+
c.take().send(());
299+
}
300+
}
301+
}
302+
// nondeterministic result, but should succeed
303+
if killable {
304+
select(ports);
305+
} else {
306+
do task::unkillable { select(ports); }
307+
}
308+
}
309+
}
310+
}
311+
}
312+
}
313+
314+
#[test] #[ignore(cfg(windows))]
315+
fn select_killed() {
316+
do run_in_newsched_task {
317+
let (success_p, success_c) = oneshot::<bool>();
318+
let success_c = Cell::new(success_c);
319+
do task::try {
320+
let success_c = Cell::new(success_c.take());
321+
do task::unkillable {
322+
let (p,c) = oneshot();
323+
let c = Cell::new(c);
324+
do task::spawn {
325+
let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>()));
326+
let mut ports = dead_ps;
327+
select(ports); // should get killed; nothing should leak
328+
c.take().send(()); // must not happen
329+
// Make sure dead_cs doesn't get closed until after select.
330+
let _ = dead_cs;
331+
}
332+
do task::spawn {
333+
fail!(); // should kill sibling awake
334+
}
335+
336+
// wait for killed selector to close (NOT send on) its c.
337+
// hope to send 'true'.
338+
success_c.take().send(p.try_recv().is_none());
339+
}
340+
};
341+
assert!(success_p.recv());
342+
}
343+
}
344+
}

‎src/libstd/std.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ pub mod trie;
164164

165165
pub mod task;
166166
pub mod comm;
167+
pub mod select;
167168
pub mod local_data;
168169

169170

‎src/libstd/task/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
use prelude::*;
3939

4040
use cell::Cell;
41-
use cmp::Eq;
4241
use comm::{stream, Chan, GenericChan, GenericPort, Port};
4342
use result::Result;
4443
use result;

‎src/libstd/task/spawn.rs

Lines changed: 63 additions & 129 deletions
Large diffs are not rendered by default.

‎src/libstd/unstable/sync.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -229,20 +229,22 @@ impl<T> Drop for UnsafeAtomicRcBox<T>{
229229
if self.data.is_null() {
230230
return; // Happens when destructing an unwrapper's handle.
231231
}
232-
do task::unkillable {
233-
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
234-
// Must be acquire+release, not just release, to make sure this
235-
// doesn't get reordered to after the unwrapper pointer load.
236-
let old_count = data.count.fetch_sub(1, SeqCst);
237-
assert!(old_count >= 1);
238-
if old_count == 1 {
239-
// Were we really last, or should we hand off to an
240-
// unwrapper? It's safe to not xchg because the unwrapper
241-
// will set the unwrap lock *before* dropping his/her
242-
// reference. In effect, being here means we're the only
243-
// *awake* task with the data.
244-
match data.unwrapper.take(Acquire) {
245-
Some(~(message,response)) => {
232+
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
233+
// Must be acquire+release, not just release, to make sure this
234+
// doesn't get reordered to after the unwrapper pointer load.
235+
let old_count = data.count.fetch_sub(1, SeqCst);
236+
assert!(old_count >= 1);
237+
if old_count == 1 {
238+
// Were we really last, or should we hand off to an
239+
// unwrapper? It's safe to not xchg because the unwrapper
240+
// will set the unwrap lock *before* dropping his/her
241+
// reference. In effect, being here means we're the only
242+
// *awake* task with the data.
243+
match data.unwrapper.take(Acquire) {
244+
Some(~(message,response)) => {
245+
let cell = Cell::new((message, response, data));
246+
do task::unkillable {
247+
let (message, response, data) = cell.take();
246248
// Send 'ready' and wait for a response.
247249
message.send(());
248250
// Unkillable wait. Message guaranteed to come.
@@ -253,13 +255,13 @@ impl<T> Drop for UnsafeAtomicRcBox<T>{
253255
// Other task was killed. drop glue takes over.
254256
}
255257
}
256-
None => {
257-
// drop glue takes over.
258-
}
259258
}
260-
} else {
261-
cast::forget(data);
259+
None => {
260+
// drop glue takes over.
261+
}
262262
}
263+
} else {
264+
cast::forget(data);
263265
}
264266
}
265267
}

0 commit comments

Comments
 (0)
Please sign in to comment.