Skip to content

[WIP] Follow latest pinning API #1266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -9,3 +9,6 @@ members = [
"futures-util",
"futures-test",
]

[patch.crates-io]
pin-utils = { git = "https://github.com/Kroisse/pin-utils.git", branch = "pin" }
16 changes: 9 additions & 7 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#![feature(test, futures_api, pin, arbitrary_self_types)]

extern crate test;

use futures::ready;
use futures::channel::mpsc::{self, Sender, UnboundedSender};
use futures::executor::LocalPool;
use futures::stream::{Stream, StreamExt};
use futures::sink::Sink;
use futures::task::{self, Poll, Wake, LocalWaker};
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::Arc;
use test::Bencher;
use self::test::Bencher;

fn notify_noop() -> LocalWaker {
struct Noop;
@@ -100,16 +102,16 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context)
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context)
-> Poll<Option<Self::Item>>
{
let this = &mut *self;
let mut tx = PinMut::new(&mut this.tx);
let mut tx = Pin::new(&mut this.tx);

ready!(tx.reborrow().poll_ready(cx)).unwrap();
tx.reborrow().start_send(this.last + 1).unwrap();
ready!(tx.as_mut().poll_ready(cx)).unwrap();
tx.as_mut().start_send(this.last + 1).unwrap();
this.last += 1;
assert_eq!(Poll::Ready(Ok(())), tx.reborrow().poll_flush(cx));
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(cx));
Poll::Ready(Some(this.last))
}
}
14 changes: 7 additions & 7 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ use std::any::Any;
use std::error::Error;
use std::fmt;
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
@@ -113,7 +113,7 @@ pub struct Sender<T> {
maybe_parked: bool,
}

// We never project PinMut<Sender> to `PinMut<T>`
// We never project Pin<&mut Sender> to `Pin<&mut T>`
impl<T> Unpin for Sender<T> {}

/// The transmission end of an unbounded mpsc channel.
@@ -139,7 +139,7 @@ pub struct Receiver<T> {
#[derive(Debug)]
pub struct UnboundedReceiver<T>(Receiver<T>);

// `PinMut<UnboundedReceiver<T>>` is never projected to `PinMut<T>`
// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
impl<T> Unpin for UnboundedReceiver<T> {}

/// The error type for [`Sender`s](Sender) used as `Sink`s.
@@ -953,14 +953,14 @@ impl<T> Receiver<T> {
}
}

// The receiver does not ever take a PinMut to the inner T
// The receiver does not ever take a Pin to the inner T
impl<T> Unpin for Receiver<T> {}

impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
loop {
@@ -1030,10 +1030,10 @@ impl<T> Stream for UnboundedReceiver<T> {
type Item = T;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
PinMut::new(&mut self.0).poll_next(cx)
Pin::new(&mut self.0).poll_next(cx)
}
}

6 changes: 3 additions & 3 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
use futures_core::future::Future;
use futures_core::task::{self, Poll, Waker};
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
@@ -29,7 +29,7 @@ pub struct Sender<T> {
inner: Arc<Inner<T>>,
}

// The channels do not ever project PinMut to the inner T
// The channels do not ever project Pin to the inner T
impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {}

@@ -419,7 +419,7 @@ impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;

fn poll(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Result<T, Canceled>> {
self.inner.recv(cx)
26 changes: 13 additions & 13 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -32,30 +32,30 @@ fn send_recv_no_buffer() {
let (tx, rx) = mpsc::channel::<i32>(0);
pin_mut!(tx, rx);

assert!(tx.reborrow().poll_flush(cx).is_ready());
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert!(tx.as_mut().poll_flush(cx).is_ready());
assert!(tx.as_mut().poll_ready(cx).is_ready());

// Send first message
assert!(tx.reborrow().start_send(1).is_ok());
assert!(tx.reborrow().poll_ready(cx).is_pending());
assert!(tx.as_mut().start_send(1).is_ok());
assert!(tx.as_mut().poll_ready(cx).is_pending());

// poll_ready said Pending, so no room in buffer, therefore new sends
// should get rejected with is_full.
assert!(tx.reborrow().start_send(0).unwrap_err().is_full());
assert!(tx.reborrow().poll_ready(cx).is_pending());
assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
assert!(tx.as_mut().poll_ready(cx).is_pending());

// Take the value
assert_eq!(rx.reborrow().poll_next(cx), Poll::Ready(Some(1)));
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
assert!(tx.as_mut().poll_ready(cx).is_ready());

// Send second message
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert!(tx.reborrow().start_send(2).is_ok());
assert!(tx.reborrow().poll_ready(cx).is_pending());
assert!(tx.as_mut().poll_ready(cx).is_ready());
assert!(tx.as_mut().start_send(2).is_ok());
assert!(tx.as_mut().poll_ready(cx).is_pending());

// Take the value
assert_eq!(rx.reborrow().poll_next(cx), Poll::Ready(Some(2)));
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
assert!(tx.as_mut().poll_ready(cx).is_ready());

Poll::Ready(())
}));
4 changes: 2 additions & 2 deletions futures-channel/tests/oneshot.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{Future, FutureExt, poll_fn};
use futures::task::{self, Poll};
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::mpsc;
use std::thread;

@@ -42,7 +42,7 @@ struct WaitForCancel {
impl Future for WaitForCancel {
type Output = ();

fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
self.tx.poll_cancel(cx)
}
}
6 changes: 3 additions & 3 deletions futures-core/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Futures.

use crate::task::{self, Poll};
use core::pin::PinMut;
use core::pin::Pin;

pub use core::future::{Future, FutureObj, LocalFutureObj, UnsafeFutureObj};

@@ -20,7 +20,7 @@ pub trait TryFuture {
/// directly inheriting from the `Future` trait; in the future it won't be
/// needed.
fn try_poll(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Result<Self::Ok, Self::Error>>;
}
@@ -32,7 +32,7 @@ impl<F, T, E> TryFuture for F
type Error = E;

#[inline]
fn try_poll(self: PinMut<Self>, cx: &mut task::Context) -> Poll<F::Output> {
fn try_poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<F::Output> {
self.poll(cx)
}
}
43 changes: 21 additions & 22 deletions futures-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

use crate::task::{self, Poll};
use core::marker::Unpin;
use core::pin::PinMut;
use core::pin::Pin;

#[cfg(feature = "either")]
use either::Either;
@@ -52,7 +52,7 @@ pub trait Stream {
/// to ensure that `poll_next` always returns `Ready(None)` in subsequent
/// calls.
fn poll_next(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>>;
}
@@ -61,21 +61,21 @@ impl<'a, S: ?Sized + Stream + Unpin> Stream for &'a mut S {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
S::poll_next(PinMut::new(&mut **self), cx)
S::poll_next(Pin::new(&mut **self), cx)
}
}

impl<'a, S: ?Sized + Stream> Stream for PinMut<'a, S> {
impl<'a, S: ?Sized + Stream> Stream for Pin<&'a mut S> {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
S::poll_next((*self).reborrow(), cx)
S::poll_next((*self).as_mut(), cx)
}
}

@@ -86,11 +86,11 @@ impl<A, B> Stream for Either<A, B>
{
type Item = A::Item;

fn poll_next(self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<A::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<A::Item>> {
unsafe {
match PinMut::get_mut_unchecked(self) {
Either::Left(a) => PinMut::new_unchecked(a).poll_next(cx),
Either::Right(b) => PinMut::new_unchecked(b).poll_next(cx),
match Pin::get_mut_unchecked(self) {
Either::Left(a) => Pin::new_unchecked(a).poll_next(cx),
Either::Right(b) => Pin::new_unchecked(b).poll_next(cx),
}
}
}
@@ -110,7 +110,7 @@ pub trait TryStream {
/// This method is a stopgap for a compiler limitation that prevents us from
/// directly inheriting from the `Stream` trait; in the future it won't be
/// needed.
fn try_poll_next(self: PinMut<Self>, cx: &mut task::Context)
fn try_poll_next(self: Pin<&mut Self>, cx: &mut task::Context)
-> Poll<Option<Result<Self::Ok, Self::Error>>>;
}

@@ -120,7 +120,7 @@ impl<S, T, E> TryStream for S
type Ok = T;
type Error = E;

fn try_poll_next(self: PinMut<Self>, cx: &mut task::Context)
fn try_poll_next(self: Pin<&mut Self>, cx: &mut task::Context)
-> Poll<Option<Result<Self::Ok, Self::Error>>>
{
self.poll_next(cx)
@@ -129,46 +129,45 @@ impl<S, T, E> TryStream for S

if_std! {
use std::boxed::Box;
use std::pin::PinBox;

impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
PinMut::new(&mut **self).poll_next(cx)
Pin::new(&mut **self).poll_next(cx)
}
}

impl<S: ?Sized + Stream> Stream for PinBox<S> {
impl<S: ?Sized + Stream> Stream for Pin<Box<S>> {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
self.as_pin_mut().poll_next(cx)
S::poll_next((*self).as_mut(), cx)
}
}

impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> {
type Item = S::Item;

fn poll_next(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<S::Item>> {
unsafe { PinMut::map_unchecked(self, |x| &mut x.0) }.poll_next(cx)
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) }.poll_next(cx)
}
}

impl<T: Unpin> Stream for ::std::collections::VecDeque<T> {
type Item = T;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
_cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.pop_front())
Loading