Skip to content

Commit 52fd329

Browse files
committed
Add AsyncBufRead trait
1 parent bbbfbb7 commit 52fd329

File tree

8 files changed

+267
-84
lines changed

8 files changed

+267
-84
lines changed

futures-io/src/lib.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,63 @@ mod if_std {
241241
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
242242
}
243243

244+
/// Read bytes asynchronously.
245+
///
246+
/// This trait is analogous to the `std::io::BufRead` trait, but integrates
247+
/// with the asynchronous task system. In particular, the `poll_fill_buf`
248+
/// method, unlike `BufRead::fill_buf`, will automatically queue the current task
249+
/// for wakeup and return if data is not yet available, rather than blocking
250+
/// the calling thread.
251+
pub trait AsyncBufRead: AsyncRead {
252+
/// Attempt to return the contents of the internal buffer, filling it with more data
253+
/// from the inner reader if it is empty.
254+
///
255+
/// On success, returns `Poll::Ready(Ok(buf))`.
256+
///
257+
/// If no data is available for reading, the method returns
258+
/// `Poll::Pending` and arranges for the current task (via
259+
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
260+
/// readable or is closed.
261+
///
262+
/// This function is a lower-level call. It needs to be paired with the
263+
/// [`consume`] method to function properly. When calling this
264+
/// method, none of the contents will be "read" in the sense that later
265+
/// calling [`poll_read`] may return the same contents. As such, [`consume`] must
266+
/// be called with the number of bytes that are consumed from this buffer to
267+
/// ensure that the bytes are never returned twice.
268+
///
269+
/// [`poll_read`]: AsyncRead::poll_read
270+
/// [`consume`]: AsyncBufRead::consume
271+
///
272+
/// An empty buffer returned indicates that the stream has reached EOF.
273+
///
274+
/// # Implementation
275+
///
276+
/// This function may not return errors of kind `WouldBlock` or
277+
/// `Interrupted`. Implementations must convert `WouldBlock` into
278+
/// `Poll::Pending` and either internally retry or convert
279+
/// `Interrupted` into another error kind.
280+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
281+
-> Poll<Result<&'a [u8]>>;
282+
283+
/// Tells this buffer that `amt` bytes have been consumed from the buffer,
284+
/// so they should no longer be returned in calls to [`poll_read`].
285+
///
286+
/// This function is a lower-level call. It needs to be paired with the
287+
/// [`poll_fill_buf`] method to function properly. This function does
288+
/// not perform any I/O, it simply informs this object that some amount of
289+
/// its buffer, returned from [`poll_fill_buf`], has been consumed and should
290+
/// no longer be returned. As such, this function may do odd things if
291+
/// [`poll_fill_buf`] isn't called before calling it.
292+
///
293+
/// The `amt` must be `<=` the number of bytes in the buffer returned by
294+
/// [`poll_fill_buf`].
295+
///
296+
/// [`poll_read`]: AsyncRead::poll_read
297+
/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
298+
fn consume(&mut self, amt: usize);
299+
}
300+
244301
macro_rules! deref_async_read {
245302
() => {
246303
unsafe fn initializer(&self) -> Initializer {
@@ -315,6 +372,10 @@ mod if_std {
315372
unsafe_delegate_async_read_to_stdio!();
316373
}
317374

375+
impl AsyncRead for StdIo::Empty {
376+
unsafe_delegate_async_read_to_stdio!();
377+
}
378+
318379
impl<T: AsRef<[u8]> + Unpin> AsyncRead for StdIo::Cursor<T> {
319380
unsafe_delegate_async_read_to_stdio!();
320381
}
@@ -429,6 +490,70 @@ mod if_std {
429490
impl AsyncWrite for StdIo::Sink {
430491
delegate_async_write_to_stdio!();
431492
}
493+
494+
macro_rules! deref_async_buf_read {
495+
() => {
496+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
497+
-> Poll<Result<&'a [u8]>>
498+
{
499+
Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
500+
}
501+
502+
fn consume(&mut self, amt: usize) {
503+
T::consume(&mut **self, amt)
504+
}
505+
}
506+
}
507+
508+
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
509+
deref_async_buf_read!();
510+
}
511+
512+
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
513+
deref_async_buf_read!();
514+
}
515+
516+
impl<P> AsyncBufRead for Pin<P>
517+
where
518+
P: DerefMut + Unpin,
519+
P::Target: AsyncBufRead + Unpin,
520+
{
521+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
522+
-> Poll<Result<&'a [u8]>>
523+
{
524+
self.get_mut().as_mut().poll_fill_buf(cx)
525+
}
526+
527+
fn consume(&mut self, amt: usize) {
528+
<P::Target as AsyncBufRead>::consume(self.as_mut().get_mut(), amt)
529+
}
530+
}
531+
532+
macro_rules! delegate_async_buf_read_to_stdio {
533+
() => {
534+
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>)
535+
-> Poll<Result<&'a [u8]>>
536+
{
537+
Poll::Ready(StdIo::BufRead::fill_buf(self.get_mut()))
538+
}
539+
540+
fn consume(&mut self, amt: usize) {
541+
StdIo::BufRead::consume(self, amt)
542+
}
543+
}
544+
}
545+
546+
impl AsyncBufRead for &[u8] {
547+
delegate_async_buf_read_to_stdio!();
548+
}
549+
550+
impl AsyncBufRead for StdIo::Empty {
551+
delegate_async_buf_read_to_stdio!();
552+
}
553+
554+
impl<T: AsRef<[u8]> + Unpin> AsyncBufRead for StdIo::Cursor<T> {
555+
delegate_async_buf_read_to_stdio!();
556+
}
432557
}
433558

434559
#[cfg(feature = "std")]

futures-util/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Common utilities and extension traits for the futures-rs library.
1515
name = "futures_util"
1616

1717
[features]
18-
std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "rand", "rand_core", "slab"]
18+
std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "rand", "rand_core", "slab", "memchr"]
1919
default = ["std"]
2020
compat = ["std", "futures_01"]
2121
io-compat = ["compat", "tokio-io"]
@@ -36,6 +36,7 @@ proc-macro-nested = "0.1.2"
3636
rand = { version = "0.6.4", optional = true }
3737
rand_core = { version = ">=0.2.2, <0.4", optional = true } # See https://github.com/rust-random/rand/issues/645
3838
slab = { version = "0.4", optional = true }
39+
memchr = { version = "2.2", optional = true }
3940
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
4041
tokio-io = { version = "0.1.9", optional = true }
4142
pin-utils = "0.1.0-alpha.4"

futures-util/src/io/disabled/read_until.rs

Lines changed: 0 additions & 70 deletions
This file was deleted.

futures-util/src/io/mod.rs

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,10 @@
55
//! `AsyncReadExt` and `AsyncWriteExt` traits which add methods
66
//! to the `AsyncRead` and `AsyncWrite` types.
77
8-
use std::vec::Vec;
9-
10-
pub use futures_io::{AsyncRead, AsyncWrite, IoVec};
8+
pub use futures_io::{AsyncRead, AsyncWrite, AsyncBufRead, IoVec};
119

1210
#[cfg(feature = "io-compat")] use crate::compat::Compat;
1311

14-
// Temporarily removed until AsyncBufRead is implemented
15-
// pub use io::lines::{lines, Lines};
16-
// pub use io::read_until::{read_until, ReadUntil};
17-
// mod lines;
18-
// mod read_until;
19-
2012
mod allow_std;
2113
pub use self::allow_std::AllowStdIo;
2214

@@ -26,6 +18,9 @@ pub use self::copy_into::CopyInto;
2618
mod flush;
2719
pub use self::flush::Flush;
2820

21+
// mod lines;
22+
// pub use self::lines::Lines;
23+
2924
mod read;
3025
pub use self::read::Read;
3126

@@ -35,6 +30,9 @@ pub use self::read_exact::ReadExact;
3530
mod read_to_end;
3631
pub use self::read_to_end::ReadToEnd;
3732

33+
mod read_until;
34+
pub use self::read_until::ReadUntil;
35+
3836
mod close;
3937
pub use self::close::Close;
4038

@@ -324,3 +322,61 @@ pub trait AsyncWriteExt: AsyncWrite {
324322
}
325323

326324
impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
325+
326+
/// An extension trait which adds utility methods to `AsyncBufRead` types.
327+
pub trait AsyncBufReadExt: AsyncBufRead {
328+
/// Creates a future which will read all the bytes associated with this I/O
329+
/// object into `buf` until the delimiter `byte` or EOF is reached.
330+
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
331+
///
332+
/// This function will read bytes from the underlying stream until the
333+
/// delimiter or EOF is found. Once found, all bytes up to, and including,
334+
/// the delimiter (if found) will be appended to `buf`.
335+
///
336+
/// The returned future will resolve to the number of bytes read once the read
337+
/// operation is completed.
338+
///
339+
/// In the case of an error the buffer and the object will be discarded, with
340+
/// the error yielded.
341+
///
342+
/// # Examples
343+
///
344+
/// ```
345+
/// #![feature(async_await, await_macro)]
346+
/// # futures::executor::block_on(async {
347+
/// use futures::io::AsyncBufReadExt;
348+
/// use std::io::Cursor;
349+
///
350+
/// let mut cursor = Cursor::new(b"lorem-ipsum");
351+
/// let mut buf = vec![];
352+
///
353+
/// // cursor is at 'l'
354+
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
355+
/// assert_eq!(num_bytes, 6);
356+
/// assert_eq!(buf, b"lorem-");
357+
/// buf.clear();
358+
///
359+
/// // cursor is at 'i'
360+
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
361+
/// assert_eq!(num_bytes, 5);
362+
/// assert_eq!(buf, b"ipsum");
363+
/// buf.clear();
364+
///
365+
/// // cursor is at EOF
366+
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
367+
/// assert_eq!(num_bytes, 0);
368+
/// assert_eq!(buf, b"");
369+
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
370+
/// ```
371+
fn read_until<'a>(
372+
&'a mut self,
373+
byte: u8,
374+
buf: &'a mut Vec<u8>,
375+
) -> ReadUntil<'a, Self>
376+
where Self: Unpin,
377+
{
378+
ReadUntil::new(self, byte, buf)
379+
}
380+
}
381+
382+
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

futures-util/src/io/read_until.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use futures_core::future::Future;
2+
use futures_core::task::{Context, Poll};
3+
use futures_io::AsyncBufRead;
4+
use std::io;
5+
use std::pin::Pin;
6+
7+
/// Future for the [`read_until`](super::AsyncBufReadExt::read_until) method.
8+
#[derive(Debug)]
9+
pub struct ReadUntil<'a, R: ?Sized + Unpin> {
10+
reader: &'a mut R,
11+
byte: u8,
12+
buf: &'a mut Vec<u8>,
13+
}
14+
15+
impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {}
16+
17+
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
18+
pub(super) fn new(reader: &'a mut R, byte: u8, buf: &'a mut Vec<u8>) -> Self {
19+
Self { reader, byte, buf }
20+
}
21+
}
22+
23+
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
24+
type Output = io::Result<usize>;
25+
26+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27+
let this = &mut *self;
28+
let mut read = 0;
29+
loop {
30+
let (done, used) = {
31+
let available = try_ready!(Pin::new(&mut this.reader).poll_fill_buf(cx));
32+
if let Some(i) = memchr::memchr(this.byte, available) {
33+
this.buf.extend_from_slice(&available[..=i]);
34+
(true, i + 1)
35+
} else {
36+
this.buf.extend_from_slice(available);
37+
(false, available.len())
38+
}
39+
};
40+
this.reader.consume(used);
41+
read += used;
42+
if done || used == 0 {
43+
return Poll::Ready(Ok(read));
44+
}
45+
}
46+
}
47+
}

futures-util/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub mod compat;
9797
#[cfg(feature = "std")]
9898
pub mod io;
9999
#[cfg(feature = "std")]
100-
#[doc(hidden)] pub use crate::io::{AsyncReadExt, AsyncWriteExt};
100+
#[doc(hidden)] pub use crate::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt};
101101

102102
cfg_target_has_atomic! {
103103
#[cfg(feature = "alloc")]

0 commit comments

Comments
 (0)