-
Notifications
You must be signed in to change notification settings - Fork 655
Add AsyncBufReadExt::read_line #1556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
use futures_core::future::Future; | ||
use futures_core::task::{Context, Poll}; | ||
use futures_io::AsyncBufRead; | ||
use std::io; | ||
use std::mem; | ||
use std::pin::Pin; | ||
use std::str; | ||
use super::read_until::read_until_internal; | ||
|
||
/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method. | ||
#[derive(Debug)] | ||
pub struct ReadLine<'a, R: ?Sized + Unpin> { | ||
reader: &'a mut R, | ||
buf: &'a mut String, | ||
bytes: Vec<u8>, | ||
read: usize, | ||
} | ||
|
||
impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {} | ||
|
||
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { | ||
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { | ||
Self { | ||
reader, | ||
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, | ||
buf, | ||
read: 0, | ||
} | ||
} | ||
} | ||
|
||
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> { | ||
type Output = io::Result<usize>; | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let Self { reader, buf, bytes, read } = &mut *self; | ||
let ret = ready!(read_until_internal(Pin::new(reader), b'\n', bytes, read, cx)); | ||
if str::from_utf8(&bytes).is_err() { | ||
Poll::Ready(ret.and_then(|_| { | ||
Err(io::Error::new(io::ErrorKind::InvalidData, | ||
"stream did not contain valid UTF-8")) | ||
})) | ||
} else { | ||
unsafe { mem::swap(buf.as_mut_vec(), bytes); } | ||
Poll::Ready(ret) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
use futures::executor::block_on; | ||
use futures::future::Future; | ||
use futures::io::AsyncBufReadExt; | ||
use futures::task::Poll; | ||
use futures_test::io::AsyncReadTestExt; | ||
use futures_test::task::noop_context; | ||
use std::io::Cursor; | ||
use std::pin::Pin; | ||
|
||
#[test] | ||
fn read_line() { | ||
let mut buf = Cursor::new(b"12"); | ||
let mut v = String::new(); | ||
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); | ||
assert_eq!(v, "12"); | ||
|
||
let mut buf = Cursor::new(b"12\n\n"); | ||
let mut v = String::new(); | ||
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3); | ||
assert_eq!(v, "12\n"); | ||
v.clear(); | ||
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1); | ||
assert_eq!(v, "\n"); | ||
v.clear(); | ||
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0); | ||
assert_eq!(v, ""); | ||
} | ||
|
||
fn run<F: Future + Unpin>(mut f: F) -> F::Output { | ||
let mut cx = noop_context(); | ||
loop { | ||
if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) { | ||
return x; | ||
} | ||
} | ||
} | ||
|
||
#[test] | ||
fn maybe_pending() { | ||
let mut buf = b"12".interleave_pending(); | ||
let mut v = String::new(); | ||
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); | ||
assert_eq!(v, "12"); | ||
|
||
let mut buf = b"12\n\n".interleave_pending(); | ||
let mut v = String::new(); | ||
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3); | ||
assert_eq!(v, "12\n"); | ||
v.clear(); | ||
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1); | ||
assert_eq!(v, "\n"); | ||
v.clear(); | ||
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); | ||
assert_eq!(v, ""); | ||
v.clear(); | ||
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); | ||
assert_eq!(v, ""); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,20 @@ | ||
use futures::executor::block_on; | ||
use futures::future::Future; | ||
use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt}; | ||
use futures::task::{Context, Poll}; | ||
use futures::io::AsyncBufReadExt; | ||
use futures::task::Poll; | ||
use futures_test::io::AsyncReadTestExt; | ||
use futures_test::task::noop_context; | ||
use std::cmp; | ||
use std::io::{self, Cursor}; | ||
use std::io::Cursor; | ||
use std::pin::Pin; | ||
|
||
#[test] | ||
fn read_until() { | ||
let mut buf = Cursor::new(&b"12"[..]); | ||
let mut buf = Cursor::new(b"12"); | ||
let mut v = Vec::new(); | ||
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); | ||
assert_eq!(v, b"12"); | ||
|
||
let mut buf = Cursor::new(&b"1233"[..]); | ||
let mut buf = Cursor::new(b"1233"); | ||
let mut v = Vec::new(); | ||
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3); | ||
assert_eq!(v, b"123"); | ||
|
@@ -35,53 +35,14 @@ fn run<F: Future + Unpin>(mut f: F) -> F::Output { | |
} | ||
} | ||
|
||
struct MaybePending<'a> { | ||
inner: &'a [u8], | ||
ready: bool, | ||
} | ||
|
||
impl<'a> MaybePending<'a> { | ||
fn new(inner: &'a [u8]) -> Self { | ||
Self { inner, ready: false } | ||
} | ||
} | ||
|
||
impl AsyncRead for MaybePending<'_> { | ||
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) | ||
-> Poll<io::Result<usize>> | ||
{ | ||
unimplemented!() | ||
} | ||
} | ||
|
||
impl AsyncBufRead for MaybePending<'_> { | ||
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>) | ||
-> Poll<io::Result<&'a [u8]>> | ||
{ | ||
if self.ready { | ||
self.ready = false; | ||
if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } | ||
let len = cmp::min(2, self.inner.len()); | ||
Poll::Ready(Ok(&self.inner[0..len])) | ||
} else { | ||
self.ready = true; | ||
Poll::Pending | ||
} | ||
} | ||
|
||
fn consume(mut self: Pin<&mut Self>, amt: usize) { | ||
self.inner = &self.inner[amt..]; | ||
} | ||
} | ||
|
||
#[test] | ||
fn maybe_pending() { | ||
let mut buf = MaybePending::new(b"12"); | ||
let mut buf = b"12".interleave_pending(); | ||
let mut v = Vec::new(); | ||
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); | ||
assert_eq!(v, b"12"); | ||
|
||
let mut buf = MaybePending::new(b"12333"); | ||
let mut buf = b"12333".interleave_pending(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this won't add a pending in between reads that actually return data like the old implementation, it will be like If you want to test reading data multiple times my plan elsewhere was to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing that out. I updated this PR after checking locally that the new test can detect problems like #1566, but the code you suggested is even better. I will open a PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... This is because This can be solved by consuming it only when However, the current implementation is the same behavior as std. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. Opened #1584. |
||
let mut v = Vec::new(); | ||
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3); | ||
assert_eq!(v, b"123"); | ||
|
Uh oh!
There was an error while loading. Please reload this page.