Skip to content

Implement PortReader and ChanWriter #10823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 10, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 172 additions & 11 deletions src/libstd/io/comm_adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,107 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use option::Option;
use comm::{GenericPort, GenericChan};
use prelude::*;

use comm::{GenericPort, GenericChan, GenericSmartChan};
use cmp;
use io;
use option::{None, Option, Some};
use super::{Reader, Writer};
use vec::{bytes, CopyableVector, MutableVector, ImmutableVector};

pub struct PortReader<P>;
/// Allows reading from a port.
///
/// # Example
///
/// ```
/// let reader = PortReader::new(port);
///
/// let mut buf = ~[0u8, ..100];
/// match reader.read(buf) {
/// Some(nread) => println!("Read {} bytes", nread),
/// None => println!("At the end of the stream!")
/// }
/// ```
pub struct PortReader<P> {
priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed.
priv pos: uint, // How many of the buffered bytes have already be consumed.
priv port: P, // The port to pull data from.
priv closed: bool, // Whether the pipe this port connects to has been closed.
}

impl<P: GenericPort<~[u8]>> PortReader<P> {
pub fn new(_port: P) -> PortReader<P> { fail!() }
pub fn new(port: P) -> PortReader<P> {
PortReader {
buf: None,
pos: 0,
port: port,
closed: false,
}
}
}

impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it's doing a little more work than necessary, but it's a little tough to follow. Awhile back I actually kinda implemented this (but not as well as you did with conditions and whatnot), and that implementation may help out reducing the amount of shifting/checking/conditionals:

https://github.com/alexcrichton/rust/blob/5af700c677034499827de9682097ae375fe69cb7/src/libstd/io/comm.rs#L65

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that my version naturally handles the channel being closed without having to store that information in the reader itself, along with maintaining only one positional variable (not both len and pos).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, it now looks a more lot like what you wrote. I ended up needing self.closed to implement eof().

let mut num_read = 0;
loop {
match self.buf {
Some(ref prev) => {
let dst = buf.mut_slice_from(num_read);
let src = prev.slice_from(self.pos);
let count = cmp::min(dst.len(), src.len());
bytes::copy_memory(dst, src, count);
num_read += count;
self.pos += count;
},
None => (),
};
if num_read == buf.len() || self.closed {
break;
}
self.pos = 0;
self.buf = self.port.try_recv();
self.closed = self.buf.is_none();
}
if self.closed && num_read == 0 {
io::io_error::cond.raise(io::standard_error(io::EndOfFile));
None
} else {
Some(num_read)
}
}

fn eof(&mut self) -> bool { fail!() }
fn eof(&mut self) -> bool { self.closed }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also factor in whether self.buf is None vs Some and where pos is relative to the array inside Some. Could you add a test for this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.closed can only be true if self.buf is None. That is currently tested in test_port_reader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all that's left is a squash of commits, then r+ from me.

}

pub struct ChanWriter<C>;
/// Allows writing to a chan.
///
/// # Example
///
/// ```
/// let writer = ChanWriter::new(chan);
/// writer.write("hello, world".as_bytes());
/// ```
pub struct ChanWriter<C> {
chan: C,
}

impl<C: GenericChan<~[u8]>> ChanWriter<C> {
pub fn new(_chan: C) -> ChanWriter<C> { fail!() }
impl<C: GenericSmartChan<~[u8]>> ChanWriter<C> {
pub fn new(chan: C) -> ChanWriter<C> {
ChanWriter { chan: chan }
}
}

impl<C: GenericChan<~[u8]>> Writer for ChanWriter<C> {
fn write(&mut self, _buf: &[u8]) { fail!() }
impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> {
fn write(&mut self, buf: &[u8]) {
if !self.chan.try_send(buf.to_owned()) {
io::io_error::cond.raise(io::IoError {
kind: io::BrokenPipe,
desc: "Pipe closed",
detail: None
});
}
}
}

pub struct ReaderPort<R>;
Expand All @@ -55,3 +132,87 @@ impl<W: Writer> WriterChan<W> {
impl<W: Writer> GenericChan<~[u8]> for WriterChan<W> {
fn send(&self, _x: ~[u8]) { fail!() }
}


#[cfg(test)]
mod test {
use prelude::*;
use super::*;
use io;
use comm;
use task;

#[test]
fn test_port_reader() {
let (port, chan) = comm::stream();
do task::spawn {
chan.send(~[1u8, 2u8]);
chan.send(~[]);
chan.send(~[3u8, 4u8]);
chan.send(~[5u8, 6u8]);
chan.send(~[7u8, 8u8]);
}

let mut reader = PortReader::new(port);
let mut buf = ~[0u8, ..3];

assert_eq!(false, reader.eof());

assert_eq!(Some(0), reader.read(~[]));
assert_eq!(false, reader.eof());

assert_eq!(Some(3), reader.read(buf));
assert_eq!(false, reader.eof());
assert_eq!(~[1,2,3], buf);

assert_eq!(Some(3), reader.read(buf));
assert_eq!(false, reader.eof());
assert_eq!(~[4,5,6], buf);

assert_eq!(Some(2), reader.read(buf));
assert_eq!(~[7,8,6], buf);
assert_eq!(true, reader.eof());

let mut err = None;
let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
err = Some(k)
}).inside(|| {
reader.read(buf)
});
assert_eq!(Some(io::EndOfFile), err);
assert_eq!(None, result);
assert_eq!(true, reader.eof());
assert_eq!(~[7,8,6], buf);

// Ensure it continues to fail in the same way.
err = None;
let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
err = Some(k)
}).inside(|| {
reader.read(buf)
});
assert_eq!(Some(io::EndOfFile), err);
assert_eq!(None, result);
assert_eq!(true, reader.eof());
assert_eq!(~[7,8,6], buf);
}

#[test]
fn test_chan_writer() {
let (port, chan) = comm::stream();
let mut writer = ChanWriter::new(chan);
writer.write_be_u32(42);

let wanted = ~[0u8, 0u8, 0u8, 42u8];
let got = do task::try { port.recv() }.unwrap();
assert_eq!(wanted, got);

let mut err = None;
io::io_error::cond.trap(|io::IoError { kind, .. } | {
err = Some(kind)
}).inside(|| {
writer.write_u8(1)
});
assert_eq!(Some(io::BrokenPipe), err);
}
}