Skip to content

Convert basic futures combinators to futures-core 0.3 #980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ members = [
# "futures-macro-await",
# "futures-sink",
# "futures-stable",
# "futures-util",
"futures-util",
]
22 changes: 12 additions & 10 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-util"
version = "0.2.0"
version = "0.3.0-alpha"
authors = ["Alex Crichton <[email protected]>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/rust-lang-nursery/futures-rs"
Expand All @@ -11,18 +11,20 @@ Common utilities and extension traits for the futures-rs library.
"""

[features]
std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"]
default = ["std", "futures-core/either", "futures-sink/either"]
# std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"]
std = ["futures-core/std", "either/use_std"]
# default = ["std", "futures-core/either", "futures-sink/either"]
default = ["std", "futures-core/either"]
bench = []

[dependencies]
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.3.0-alpha", default-features = false }
# futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
# futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
either = { version = "1.4", default-features = false }

[dev-dependencies]
futures = { path = "../futures", version = "0.2.0" }
futures-executor = { path = "../futures-executor", version = "0.2.0" }
futures-channel = { path = "../futures-channel", version = "0.2.0" }
# futures = { path = "../futures", version = "0.2.0" }
# futures-executor = { path = "../futures-executor", version = "0.2.0" }
# futures-channel = { path = "../futures-channel", version = "0.2.0" }
40 changes: 0 additions & 40 deletions futures-util/src/future/and_then.rs

This file was deleted.

29 changes: 10 additions & 19 deletions futures-util/src/future/catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::mem::Pin;
use std::prelude::v1::*;
use std::any::Any;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};

use futures_core::{Future, Poll, Async};
use futures_core::{Future, Poll};
use futures_core::task;

/// Future for the `catch_unwind` combinator.
Expand All @@ -11,35 +12,25 @@ use futures_core::task;
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct CatchUnwind<F> where F: Future {
future: Option<F>,
future: F,
}

pub fn new<F>(future: F) -> CatchUnwind<F>
where F: Future + UnwindSafe,
{
CatchUnwind {
future: Some(future),
}
CatchUnwind { future }
}

impl<F> Future for CatchUnwind<F>
where F: Future + UnwindSafe,
{
type Item = Result<F::Item, F::Error>;
type Error = Box<Any + Send>;
type Output = Result<F::Output, Box<Any + Send>>;

fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
let mut future = self.future.take().expect("cannot poll twice");
let (res, future) = catch_unwind(AssertUnwindSafe(|| {
(future.poll(cx), future)
}))?;
match res {
Ok(Async::Pending) => {
self.future = Some(future);
Ok(Async::Pending)
}
Ok(Async::Ready(t)) => Ok(Async::Ready(Ok(t))),
Err(e) => Ok(Async::Ready(Err(e))),
fn poll(mut self: Pin<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
let fut = unsafe { pinned_field!(self, future) };
match catch_unwind(AssertUnwindSafe(|| fut.poll(cx))) {
Ok(res) => res.map(Ok),
Err(e) => Poll::Ready(Err(e))
}
}
}
72 changes: 38 additions & 34 deletions futures-util/src/future/chain.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,53 @@
use core::mem;
use core::mem::Pin;

use futures_core::{Future, Poll, Async};
use futures_core::{Future, Poll};
use futures_core::task;

#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub enum Chain<A, B, C> where A: Future {
First(A, C),
Second(B),
Done,
pub enum Chain<Fut1, Fut2, Data> {
First(Fut1, Option<Data>),
Second(Fut2),
}

impl<A, B, C> Chain<A, B, C>
where A: Future,
B: Future,
impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data>
where Fut1: Future,
Fut2: Future,
{
pub fn new(a: A, c: C) -> Chain<A, B, C> {
Chain::First(a, c)
pub fn new(fut1: Fut1, data: Data) -> Chain<Fut1, Fut2, Data> {
Chain::First(fut1, Some(data))
}

pub fn poll<F>(&mut self, cx: &mut task::Context, f: F) -> Poll<B::Item, B::Error>
where F: FnOnce(Result<A::Item, A::Error>, C)
-> Result<Result<B::Item, B>, B::Error>,
pub fn poll<F>(mut self: Pin<Self>, cx: &mut task::Context, f: F) -> Poll<Fut2::Output>
where F: FnOnce(Fut1::Output, Data) -> Fut2,
{
let a_result = match *self {
Chain::First(ref mut a, _) => {
match a.poll(cx) {
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::Ready(t)) => Ok(t),
Err(e) => Err(e),
let mut f = Some(f);

loop {
// safe to `get_mut` here because we don't move out
let fut2 = match *unsafe { Pin::get_mut(&mut self) } {
Chain::First(ref mut fut1, ref mut data) => {
// safe to create a new `Pin` because `fut1` will never move
// before it's dropped.
match unsafe { Pin::new_unchecked(fut1) }.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(t) => {
(f.take().unwrap())(t, data.take().unwrap())
}
}
}
}
Chain::Second(ref mut b) => return b.poll(cx),
Chain::Done => panic!("cannot poll a chained future twice"),
};
let data = match mem::replace(self, Chain::Done) {
Chain::First(_, c) => c,
_ => panic!(),
};
match f(a_result, data)? {
Ok(e) => Ok(Async::Ready(e)),
Err(mut b) => {
let ret = b.poll(cx);
*self = Chain::Second(b);
ret
Chain::Second(ref mut fut2) => {
// safe to create a new `Pin` because `fut2` will never move
// before it's dropped; once we're in `Chain::Second` we stay
// there forever.
return unsafe { Pin::new_unchecked(fut2) }.poll(cx)
}
};

// safe because we're using the `&mut` to do an assignment, not for moving out
unsafe {
// note: it's safe to move the `fut2` here because we haven't yet polled it
*Pin::get_mut(&mut self) = Chain::Second(fut2);
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions futures-util/src/future/empty.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
//! Definition of the Empty combinator, a future that's never ready.

use core::mem::Pin;
use core::marker;

use futures_core::{Future, Poll, Async};
use futures_core::{Future, Poll};
use futures_core::task;

/// A future which is never resolved.
///
/// This future can be created with the `empty` function.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Empty<T, E> {
_data: marker::PhantomData<(T, E)>,
pub struct Empty<T> {
_data: marker::PhantomData<T>,
}

/// Creates a future which never resolves, representing a computation that never
/// finishes.
///
/// The returned future will forever return `Async::Pending`.
pub fn empty<T, E>() -> Empty<T, E> {
pub fn empty<T>() -> Empty<T> {
Empty { _data: marker::PhantomData }
}

impl<T, E> Future for Empty<T, E> {
type Item = T;
type Error = E;
impl<T> Future for Empty<T> {
type Output = T;

fn poll(&mut self, _: &mut task::Context) -> Poll<T, E> {
Ok(Async::Pending)
fn poll(self: Pin<Self>, _: &mut task::Context) -> Poll<T> {
Poll::Pending
}
}
38 changes: 0 additions & 38 deletions futures-util/src/future/err_into.rs

This file was deleted.

25 changes: 10 additions & 15 deletions futures-util/src/future/flatten.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::fmt;
use core::mem::Pin;

use futures_core::{Future, IntoFuture, Poll};
use futures_core::{Future, Poll};
use futures_core::task;

use super::chain::Chain;
Expand All @@ -11,14 +12,13 @@ use super::chain::Chain;
///
/// This is created by the `Future::flatten` method.
#[must_use = "futures do nothing unless polled"]
pub struct Flatten<A> where A: Future, A::Item: IntoFuture {
state: Chain<A, <A::Item as IntoFuture>::Future, ()>,
pub struct Flatten<A> where A: Future, A::Output: Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have Result-y versions of this and FlattenStream? Or maybe we just make impl<T> Future for Result<T, E> where T: Future (and the same for Stream)?

state: Chain<A, A::Output, ()>,
}

impl<A> fmt::Debug for Flatten<A>
where A: Future + fmt::Debug,
A::Item: IntoFuture,
<<A as Future>::Item as IntoFuture>::Future: fmt::Debug,
A::Output: Future + fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Flatten")
Expand All @@ -29,7 +29,7 @@ impl<A> fmt::Debug for Flatten<A>

pub fn new<A>(future: A) -> Flatten<A>
where A: Future,
A::Item: IntoFuture,
A::Output: Future,
{
Flatten {
state: Chain::new(future, ()),
Expand All @@ -38,16 +38,11 @@ pub fn new<A>(future: A) -> Flatten<A>

impl<A> Future for Flatten<A>
where A: Future,
A::Item: IntoFuture,
<<A as Future>::Item as IntoFuture>::Error: From<<A as Future>::Error>
A::Output: Future,
{
type Item = <<A as Future>::Item as IntoFuture>::Item;
type Error = <<A as Future>::Item as IntoFuture>::Error;
type Output = <A::Output as Future>::Output;

fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.state.poll(cx, |a, ()| {
let future = a?.into_future();
Ok(Err(future))
})
fn poll(mut self: Pin<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
unsafe { pinned_field!(self, state) }.poll(cx, |a, ()| a)
}
}
Loading