Skip to content

std: Rename Chan/Port types and constructor #12815

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 13, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 64 additions & 67 deletions src/doc/guide-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ concurrency at this writing:
* [`std::task`] - All code relating to tasks and task scheduling,
* [`std::comm`] - The message passing interface,
* [`sync::DuplexStream`] - An extension of `pipes::stream` that allows both sending and receiving,
* [`sync::SyncChan`] - An extension of `pipes::stream` that provides synchronous message sending,
* [`sync::SyncPort`] - An extension of `pipes::stream` that acknowledges each message received,
* [`sync::SyncSender`] - An extension of `pipes::stream` that provides synchronous message sending,
* [`sync::SyncReceiver`] - An extension of `pipes::stream` that acknowledges each message received,
* [`sync::rendezvous`] - Creates a stream whose channel, upon sending a message, blocks until the
message is received.
* [`sync::Arc`] - The Arc (atomically reference counted) type, for safely sharing immutable data,
Expand All @@ -70,8 +70,8 @@ concurrency at this writing:
[`std::task`]: std/task/index.html
[`std::comm`]: std/comm/index.html
[`sync::DuplexStream`]: sync/struct.DuplexStream.html
[`sync::SyncChan`]: sync/struct.SyncChan.html
[`sync::SyncPort`]: sync/struct.SyncPort.html
[`sync::SyncSender`]: sync/struct.SyncSender.html
[`sync::SyncReceiver`]: sync/struct.SyncReceiver.html
[`sync::rendezvous`]: sync/fn.rendezvous.html
[`sync::Arc`]: sync/struct.Arc.html
[`sync::RWArc`]: sync/struct.RWArc.html
Expand Down Expand Up @@ -141,118 +141,115 @@ receiving messages. Pipes are low-level communication building-blocks and so
come in a variety of forms, each one appropriate for a different use case. In
what follows, we cover the most commonly used varieties.

The simplest way to create a pipe is to use `Chan::new`
function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
is a sending endpoint of a pipe, and a *port* is the receiving
The simplest way to create a pipe is to use the `channel`
function to create a `(Sender, Receiver)` pair. In Rust parlance, a *sender*
is a sending endpoint of a pipe, and a *receiver* is the receiving
endpoint. Consider the following example of calculating two results
concurrently:

~~~~
# use std::task::spawn;

let (port, chan): (Port<int>, Chan<int>) = Chan::new();
let (tx, rx): (Sender<int>, Receiver<int>) = channel();

spawn(proc() {
let result = some_expensive_computation();
chan.send(result);
tx.send(result);
});

some_other_expensive_computation();
let result = port.recv();
let result = rx.recv();
# fn some_expensive_computation() -> int { 42 }
# fn some_other_expensive_computation() {}
~~~~

Let's examine this example in detail. First, the `let` statement creates a
stream for sending and receiving integers (the left-hand side of the `let`,
`(chan, port)`, is an example of a *destructuring let*: the pattern separates
`(tx, rx)`, is an example of a *destructuring let*: the pattern separates
a tuple into its component parts).

~~~~
let (port, chan): (Port<int>, Chan<int>) = Chan::new();
let (tx, rx): (Sender<int>, Receiver<int>) = channel();
~~~~

The child task will use the channel to send data to the parent task,
which will wait to receive the data on the port. The next statement
The child task will use the sender to send data to the parent task,
which will wait to receive the data on the receiver. The next statement
spawns the child task.

~~~~
# use std::task::spawn;
# fn some_expensive_computation() -> int { 42 }
# let (port, chan) = Chan::new();
# let (tx, rx) = channel();
spawn(proc() {
let result = some_expensive_computation();
chan.send(result);
tx.send(result);
});
~~~~

Notice that the creation of the task closure transfers `chan` to the child
task implicitly: the closure captures `chan` in its environment. Both `Chan`
and `Port` are sendable types and may be captured into tasks or otherwise
Notice that the creation of the task closure transfers `tx` to the child
task implicitly: the closure captures `tx` in its environment. Both `Sender`
and `Receiver` are sendable types and may be captured into tasks or otherwise
transferred between them. In the example, the child task runs an expensive
computation, then sends the result over the captured channel.

Finally, the parent continues with some other expensive
computation, then waits for the child's result to arrive on the
port:
receiver:

~~~~
# fn some_other_expensive_computation() {}
# let (port, chan) = Chan::<int>::new();
# chan.send(0);
# let (tx, rx) = channel::<int>();
# tx.send(0);
some_other_expensive_computation();
let result = port.recv();
let result = rx.recv();
~~~~

The `Port` and `Chan` pair created by `Chan::new` enables efficient
The `Sender` and `Receiver` pair created by `channel` enables efficient
communication between a single sender and a single receiver, but multiple
senders cannot use a single `Chan`, and multiple receivers cannot use a single
`Port`. What if our example needed to compute multiple results across a number
of tasks? The following program is ill-typed:
senders cannot use a single `Sender` value, and multiple receivers cannot use a
single `Receiver` value. What if our example needed to compute multiple
results across a number of tasks? The following program is ill-typed:

~~~ {.ignore}
# use std::task::{spawn};
# fn some_expensive_computation() -> int { 42 }
let (port, chan) = Chan::new();
let (tx, rx) = channel();

spawn(proc() {
chan.send(some_expensive_computation());
tx.send(some_expensive_computation());
});

// ERROR! The previous spawn statement already owns the channel,
// ERROR! The previous spawn statement already owns the sender,
// so the compiler will not allow it to be captured again
spawn(proc() {
chan.send(some_expensive_computation());
tx.send(some_expensive_computation());
});
~~~

Instead we can clone the `chan`, which allows for multiple senders.
Instead we can clone the `tx`, which allows for multiple senders.

~~~
# use std::task::spawn;

let (port, chan) = Chan::new();
let (tx, rx) = channel();

for init_val in range(0u, 3) {
// Create a new channel handle to distribute to the child task
let child_chan = chan.clone();
let child_tx = tx.clone();
spawn(proc() {
child_chan.send(some_expensive_computation(init_val));
child_tx.send(some_expensive_computation(init_val));
});
}

let result = port.recv() + port.recv() + port.recv();
let result = rx.recv() + rx.recv() + rx.recv();
# fn some_expensive_computation(_i: uint) -> int { 42 }
~~~

Cloning a `Chan` produces a new handle to the same channel, allowing multiple
tasks to send data to a single port. It also upgrades the channel internally in
Cloning a `Sender` produces a new handle to the same channel, allowing multiple
tasks to send data to a single receiver. It upgrades the channel internally in
order to allow this functionality, which means that channels that are not
cloned can avoid the overhead required to handle multiple senders. But this
fact has no bearing on the channel's usage: the upgrade is transparent.

Note that the above cloning example is somewhat contrived since
you could also simply use three `Chan` pairs, but it serves to
you could also simply use three `Sender` pairs, but it serves to
illustrate the point. For reference, written with multiple streams, it
might look like the example below.

Expand All @@ -261,16 +258,16 @@ might look like the example below.
# use std::vec;

// Create a vector of ports, one for each child task
let ports = vec::from_fn(3, |init_val| {
let (port, chan) = Chan::new();
let rxs = vec::from_fn(3, |init_val| {
let (tx, rx) = channel();
spawn(proc() {
chan.send(some_expensive_computation(init_val));
tx.send(some_expensive_computation(init_val));
});
port
rx
});

// Wait on each port, accumulating the results
let result = ports.iter().fold(0, |accum, port| accum + port.recv() );
let result = rxs.iter().fold(0, |accum, rx| accum + rx.recv() );
# fn some_expensive_computation(_i: uint) -> int { 42 }
~~~

Expand All @@ -281,7 +278,7 @@ later.
The basic example below illustrates this.

~~~
# extern crate sync;
extern crate sync;

# fn main() {
# fn make_a_sandwich() {};
Expand Down Expand Up @@ -342,9 +339,10 @@ Here is a small example showing how to use Arcs. We wish to run concurrently sev
a single large vector of floats. Each task needs the full vector to perform its duty.

~~~
# extern crate sync;
extern crate rand;
# use std::vec;
extern crate rand;
extern crate sync;

use std::vec;
use sync::Arc;

fn pnorm(nums: &~[f64], p: uint) -> f64 {
Expand All @@ -358,11 +356,11 @@ fn main() {
let numbers_arc = Arc::new(numbers);

for num in range(1u, 10) {
let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());
let (tx, rx) = channel();
tx.send(numbers_arc.clone());

spawn(proc() {
let local_arc : Arc<~[f64]> = port.recv();
let local_arc : Arc<~[f64]> = rx.recv();
let task_numbers = local_arc.get();
println!("{}-norm = {}", num, pnorm(task_numbers, num));
});
Expand Down Expand Up @@ -395,8 +393,8 @@ and a clone of it is sent to each task
# fn main() {
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc = Arc::new(numbers);
# let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());
# let (tx, rx) = channel();
tx.send(numbers_arc.clone());
# }
~~~

Expand All @@ -412,9 +410,9 @@ Each task recovers the underlying data by
# fn main() {
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc=Arc::new(numbers);
# let (port, chan) = Chan::new();
# chan.send(numbers_arc.clone());
# let local_arc : Arc<~[f64]> = port.recv();
# let (tx, rx) = channel();
# tx.send(numbers_arc.clone());
# let local_arc : Arc<~[f64]> = rx.recv();
let task_numbers = local_arc.get();
# }
~~~
Expand Down Expand Up @@ -486,19 +484,18 @@ proceed).

A very common thing to do is to spawn a child task where the parent
and child both need to exchange messages with each other. The
function `sync::comm::DuplexStream()` supports this pattern. We'll
function `sync::comm::duplex` supports this pattern. We'll
look briefly at how to use it.

To see how `DuplexStream()` works, we will create a child task
To see how `duplex` works, we will create a child task
that repeatedly receives a `uint` message, converts it to a string, and sends
the string in response. The child terminates when it receives `0`.
Here is the function that implements the child task:

~~~
# extern crate sync;
extern crate sync;
# fn main() {
# use sync::DuplexStream;
fn stringifier(channel: &DuplexStream<~str, uint>) {
fn stringifier(channel: &sync::DuplexStream<~str, uint>) {
let mut value: uint;
loop {
value = channel.recv();
Expand All @@ -520,10 +517,10 @@ response itself is simply the stringified version of the received value,
Here is the code for the parent task:

~~~
# extern crate sync;
extern crate sync;
# use std::task::spawn;
# use sync::DuplexStream;
# fn stringifier(channel: &DuplexStream<~str, uint>) {
# fn stringifier(channel: &sync::DuplexStream<~str, uint>) {
# let mut value: uint;
# loop {
# value = channel.recv();
Expand All @@ -533,7 +530,7 @@ Here is the code for the parent task:
# }
# fn main() {

let (from_child, to_child) = DuplexStream::new();
let (from_child, to_child) = sync::duplex();

spawn(proc() {
stringifier(&to_child);
Expand Down
2 changes: 1 addition & 1 deletion src/etc/vim/syntax/rust.vim
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ syn keyword rustTrait MutableVector MutableTotalOrdVector
syn keyword rustTrait Vector VectorVector CloneableVector ImmutableVector

"syn keyword rustFunction stream
syn keyword rustTrait Port Chan
syn keyword rustTrait Sender Receiver
"syn keyword rustFunction spawn

syn keyword rustSelf self
Expand Down
10 changes: 5 additions & 5 deletions src/libextra/workcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ pub struct Exec {

enum Work<'a, T> {
WorkValue(T),
WorkFromTask(&'a Prep<'a>, Port<(Exec, T)>),
WorkFromTask(&'a Prep<'a>, Receiver<(Exec, T)>),
}

fn json_encode<'a, T:Encodable<json::Encoder<'a>>>(t: &T) -> ~str {
Expand Down Expand Up @@ -411,7 +411,7 @@ impl<'a> Prep<'a> {

_ => {
debug!("Cache miss!");
let (port, chan) = Chan::new();
let (tx, rx) = channel();
let blk = bo.take_unwrap();

// FIXME: What happens if the task fails?
Expand All @@ -421,9 +421,9 @@ impl<'a> Prep<'a> {
discovered_outputs: WorkMap::new(),
};
let v = blk(&mut exe);
chan.send((exe, v));
tx.send((exe, v));
});
Work::from_task(self, port)
Work::from_task(self, rx)
}
}
}
Expand All @@ -437,7 +437,7 @@ impl<'a, T:Send +
pub fn from_value(elt: T) -> Work<'a, T> {
WorkValue(elt)
}
pub fn from_task(prep: &'a Prep<'a>, port: Port<(Exec, T)>)
pub fn from_task(prep: &'a Prep<'a>, port: Receiver<(Exec, T)>)
-> Work<'a, T> {
WorkFromTask(prep, port)
}
Expand Down
12 changes: 6 additions & 6 deletions src/libgreen/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ mod test {
#[test]
fn some_channels() {
run(proc() {
let (p, c) = Chan::new();
let (tx, rx) = channel();
spawn(proc() {
c.send(());
tx.send(());
});
p.recv();
rx.recv();
});
}

Expand All @@ -272,11 +272,11 @@ mod test {

for _ in range(0, 20) {
pool.spawn(TaskOpts::new(), proc() {
let (p, c) = Chan::new();
let (tx, rx) = channel();
spawn(proc() {
c.send(());
tx.send(());
});
p.recv();
rx.recv();
});
}

Expand Down
Loading