Skip to content

Commit fec8059

Browse files
committed
Move port_set and shared_chan into core.
1 parent ba10819 commit fec8059

File tree

3 files changed

+81
-210
lines changed

3 files changed

+81
-210
lines changed

src/libcore/pipes.rs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,19 @@
33
import unsafe::{forget, reinterpret_cast, transmute};
44
import either::{either, left, right};
55
import option::unwrap;
6+
import arc::methods;
7+
8+
/* Use this after the snapshot
9+
macro_rules! move {
10+
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
11+
}
12+
*/
13+
14+
fn macros() {
15+
#macro[
16+
[#move(x), { unsafe { let y <- *ptr::addr_of(x); y } }]
17+
];
18+
}
619

720
enum state {
821
empty,
@@ -455,11 +468,6 @@ enum port<T:send> {
455468
fn stream<T:send>() -> (chan<T>, port<T>) {
456469
let (c, s) = streamp::init();
457470

458-
#macro[
459-
[#move[x],
460-
unsafe { let y <- *ptr::addr_of(x); y }]
461-
];
462-
463471
(chan_({ mut endp: some(c) }), port_({ mut endp: some(s) }))
464472
}
465473

@@ -506,3 +514,69 @@ impl port<T: send> for port<T> {
506514
peek
507515
}
508516
}
517+
518+
// Treat a whole bunch of ports as one.
519+
class port_set<T: send> {
520+
let mut ports: ~[pipes::port<T>];
521+
522+
new() { self.ports = ~[]; }
523+
524+
fn add(+port: pipes::port<T>) {
525+
vec::push(self.ports, port)
526+
}
527+
528+
fn try_recv() -> option<T> {
529+
let mut result = none;
530+
while result == none && self.ports.len() > 0 {
531+
let i = pipes::wait_many(self.ports.map(|p| p.header()));
532+
// dereferencing an unsafe pointer nonsense to appease the
533+
// borrowchecker.
534+
alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} {
535+
some(m) {
536+
result = some(#move(m));
537+
}
538+
none {
539+
// Remove this port.
540+
let mut ports = ~[];
541+
self.ports <-> ports;
542+
vec::consume(ports,
543+
|j, x| if i != j { vec::push(self.ports, x) });
544+
}
545+
}
546+
}
547+
result
548+
}
549+
550+
fn recv() -> T {
551+
option::unwrap(self.try_recv())
552+
}
553+
}
554+
555+
impl private_methods/&<T: send> for pipes::port<T> {
556+
pure fn header() -> *pipes::packet_header unchecked {
557+
alt self.endp {
558+
some(endp) {
559+
endp.header()
560+
}
561+
none { fail "peeking empty stream" }
562+
}
563+
}
564+
}
565+
566+
567+
type shared_chan<T: send> = arc::exclusive<pipes::chan<T>>;
568+
569+
impl chan<T: send> for shared_chan<T> {
570+
fn send(+x: T) {
571+
let mut xx = some(x);
572+
do self.with |_c, chan| {
573+
let mut x = none;
574+
x <-> xx;
575+
chan.send(option::unwrap(x))
576+
}
577+
}
578+
}
579+
580+
fn shared_chan<T:send>(+c: pipes::chan<T>) -> shared_chan<T> {
581+
arc::exclusive(c)
582+
}

src/test/bench/msgsend-pipes-shared.rs

Lines changed: 1 addition & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io::writer;
1313
import io::writer_util;
1414

1515
import arc::methods;
16-
import pipes::{port, chan};
16+
import pipes::{port, chan, shared_chan};
1717

1818
macro_rules! move {
1919
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
@@ -96,113 +96,3 @@ fn main(args: ~[str]) {
9696
#debug("%?", args);
9797
run(args);
9898
}
99-
100-
// Treat a whole bunch of ports as one.
101-
class box<T> {
102-
let mut contents: option<T>;
103-
new(+x: T) { self.contents = some(x); }
104-
105-
fn swap(f: fn(+T) -> T) {
106-
let mut tmp = none;
107-
self.contents <-> tmp;
108-
self.contents = some(f(option::unwrap(tmp)));
109-
}
110-
111-
fn unwrap() -> T {
112-
let mut tmp = none;
113-
self.contents <-> tmp;
114-
option::unwrap(tmp)
115-
}
116-
}
117-
118-
class port_set<T: send> {
119-
let mut ports: ~[pipes::port<T>];
120-
121-
new() { self.ports = ~[]; }
122-
123-
fn add(+port: pipes::port<T>) {
124-
vec::push(self.ports, port)
125-
}
126-
127-
fn try_recv() -> option<T> {
128-
let mut result = none;
129-
while result == none && self.ports.len() > 0 {
130-
let i = pipes::wait_many(self.ports.map(|p| p.header()));
131-
// dereferencing an unsafe pointer nonsense to appease the
132-
// borrowchecker.
133-
alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} {
134-
some(m) {
135-
result = some(move!{m});
136-
}
137-
none {
138-
// Remove this port.
139-
let mut ports = ~[];
140-
self.ports <-> ports;
141-
vec::consume(ports,
142-
|j, x| if i != j { vec::push(self.ports, x) });
143-
}
144-
}
145-
}
146-
/*
147-
while !done {
148-
do self.ports.swap |ports| {
149-
if ports.len() > 0 {
150-
let old_len = ports.len();
151-
let (_, m, ports) = pipes::select(ports);
152-
alt m {
153-
some(pipes::streamp::data(x, next)) {
154-
result = some(move!{x});
155-
done = true;
156-
assert ports.len() == old_len - 1;
157-
vec::append_one(ports, move!{next})
158-
}
159-
none {
160-
//#error("pipe closed");
161-
assert ports.len() == old_len - 1;
162-
ports
163-
}
164-
}
165-
}
166-
else {
167-
//#error("no more pipes");
168-
done = true;
169-
~[]
170-
}
171-
}
172-
}
173-
*/
174-
result
175-
}
176-
177-
fn recv() -> T {
178-
option::unwrap(self.try_recv())
179-
}
180-
}
181-
182-
impl private_methods/&<T: send> for pipes::port<T> {
183-
pure fn header() -> *pipes::packet_header unchecked {
184-
alt self.endp {
185-
some(endp) {
186-
endp.header()
187-
}
188-
none { fail "peeking empty stream" }
189-
}
190-
}
191-
}
192-
193-
type shared_chan<T: send> = arc::exclusive<pipes::chan<T>>;
194-
195-
impl chan<T: send> for shared_chan<T> {
196-
fn send(+x: T) {
197-
let mut xx = some(x);
198-
do self.with |_c, chan| {
199-
let mut x = none;
200-
x <-> xx;
201-
chan.send(option::unwrap(x))
202-
}
203-
}
204-
}
205-
206-
fn shared_chan<T:send>(+c: pipes::chan<T>) -> shared_chan<T> {
207-
arc::exclusive(c)
208-
}

src/test/bench/msgsend-pipes.rs

Lines changed: 1 addition & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std;
88
import io::writer;
99
import io::writer_util;
1010

11-
import pipes::{port, chan};
11+
import pipes::{port, port_set, chan};
1212

1313
macro_rules! move {
1414
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
@@ -92,96 +92,3 @@ fn main(args: ~[str]) {
9292
#debug("%?", args);
9393
run(args);
9494
}
95-
96-
// Treat a whole bunch of ports as one.
97-
class box<T> {
98-
let mut contents: option<T>;
99-
new(+x: T) { self.contents = some(x); }
100-
101-
fn swap(f: fn(+T) -> T) {
102-
let mut tmp = none;
103-
self.contents <-> tmp;
104-
self.contents = some(f(option::unwrap(tmp)));
105-
}
106-
107-
fn unwrap() -> T {
108-
let mut tmp = none;
109-
self.contents <-> tmp;
110-
option::unwrap(tmp)
111-
}
112-
}
113-
114-
class port_set<T: send> {
115-
let mut ports: ~[pipes::port<T>];
116-
117-
new() { self.ports = ~[]; }
118-
119-
fn add(+port: pipes::port<T>) {
120-
vec::push(self.ports, port)
121-
}
122-
123-
fn try_recv() -> option<T> {
124-
let mut result = none;
125-
while result == none && self.ports.len() > 0 {
126-
let i = pipes::wait_many(self.ports.map(|p| p.header()));
127-
// dereferencing an unsafe pointer nonsense to appease the
128-
// borrowchecker.
129-
alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} {
130-
some(m) {
131-
result = some(move!{m});
132-
}
133-
none {
134-
// Remove this port.
135-
let mut ports = ~[];
136-
self.ports <-> ports;
137-
vec::consume(ports,
138-
|j, x| if i != j { vec::push(self.ports, x) });
139-
}
140-
}
141-
}
142-
/*
143-
while !done {
144-
do self.ports.swap |ports| {
145-
if ports.len() > 0 {
146-
let old_len = ports.len();
147-
let (_, m, ports) = pipes::select(ports);
148-
alt m {
149-
some(pipes::streamp::data(x, next)) {
150-
result = some(move!{x});
151-
done = true;
152-
assert ports.len() == old_len - 1;
153-
vec::append_one(ports, move!{next})
154-
}
155-
none {
156-
//#error("pipe closed");
157-
assert ports.len() == old_len - 1;
158-
ports
159-
}
160-
}
161-
}
162-
else {
163-
//#error("no more pipes");
164-
done = true;
165-
~[]
166-
}
167-
}
168-
}
169-
*/
170-
result
171-
}
172-
173-
fn recv() -> T {
174-
option::unwrap(self.try_recv())
175-
}
176-
}
177-
178-
impl private_methods/&<T: send> for pipes::port<T> {
179-
pure fn header() -> *pipes::packet_header unchecked {
180-
alt self.endp {
181-
some(endp) {
182-
endp.header()
183-
}
184-
none { fail "peeking empty stream" }
185-
}
186-
}
187-
}

0 commit comments

Comments
 (0)