Skip to content

Commit d7dc71b

Browse files
committed
Address review comments
1 parent 3df23c8 commit d7dc71b

File tree

8 files changed

+84
-29
lines changed

8 files changed

+84
-29
lines changed

futures-util/src/future/future/map.rs

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,52 @@
11
use core::pin::Pin;
2+
use core::ptr;
23
use futures_core::future::{FusedFuture, Future};
34
use futures_core::task::{Context, Poll};
45
use pin_project::{pin_project, project};
56

67
use crate::fns::FnOnce1;
78

8-
/// Future for the [`map`](super::FutureExt::map) method.
9+
/// Internal Map future
910
#[pin_project]
1011
#[derive(Debug)]
1112
#[must_use = "futures do nothing unless you `.await` or poll them"]
12-
pub struct Map<Fut, F> {
13-
#[pin]
14-
future: Fut,
15-
f: Option<F>,
13+
pub enum Map<Fut, F> {
14+
Incomplete {
15+
#[pin]
16+
future: Fut,
17+
f: F,
18+
},
19+
Complete,
20+
}
21+
22+
// Helper type to mark a `Map` as complete without running its destructor.
23+
struct UnsafeMarkAsComplete<Fut, F>(*mut Map<Fut, F>);
24+
25+
impl<Fut, F> Drop for UnsafeMarkAsComplete<Fut, F> {
26+
fn drop(&mut self) {
27+
unsafe {
28+
ptr::write(self.0, Map::Complete);
29+
}
30+
}
1631
}
1732

1833
impl<Fut, F> Map<Fut, F> {
1934
/// Creates a new Map.
2035
pub(crate) fn new(future: Fut, f: F) -> Map<Fut, F> {
21-
Map { future, f: Some(f) }
36+
Map::Incomplete { future, f }
2237
}
2338
}
2439

2540
impl<Fut, F, T> FusedFuture for Map<Fut, F>
2641
where Fut: Future,
2742
F: FnOnce1<Fut::Output, Output=T>,
2843
{
29-
fn is_terminated(&self) -> bool { self.f.is_none() }
44+
fn is_terminated(&self) -> bool {
45+
match self {
46+
Map::Incomplete { .. } => false,
47+
Map::Complete => true,
48+
}
49+
}
3050
}
3151

3252
impl<Fut, F, T> Future for Map<Fut, F>
@@ -36,13 +56,32 @@ impl<Fut, F, T> Future for Map<Fut, F>
3656
type Output = T;
3757

3858
#[project]
39-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
40-
#[project]
41-
let Map { future, f } = self.project();
42-
let output = ready!(future.poll(cx));
43-
let f = f.take()
44-
.expect("Map must not be polled after it returned `Poll::Ready`");
45-
46-
Poll::Ready(f.call_once(output))
59+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
60+
unsafe {
61+
// Store this pointer for later...
62+
let self_ptr: *mut Self = self.as_mut().get_unchecked_mut();
63+
64+
#[project]
65+
match self.project() {
66+
Map::Incomplete { mut future, f } => {
67+
let output = ready!(future.as_mut().poll(cx));
68+
69+
// Here be dragons
70+
let f = {
71+
// The ordering here is important, the call to `drop_in_place` must be
72+
// last as it may panic. Other lines must not panic.
73+
let _cleanup = UnsafeMarkAsComplete(self_ptr);
74+
let f = ptr::read(f);
75+
ptr::drop_in_place(future.get_unchecked_mut());
76+
f
77+
};
78+
79+
// Phew, everything is back to normal, and we should be in the
80+
// `Complete` state!
81+
Poll::Ready(f.call_once(output))
82+
},
83+
Map::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"),
84+
}
85+
}
4786
}
4887
}

futures-util/src/future/future/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,13 @@ delegate_all!(
4242

4343
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
4444
pub use fuse::Fuse;
45-
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
46-
pub use map::Map;
45+
46+
delegate_all!(
47+
/// Future for the [`flatten`](super::FutureExt::flatten) method.
48+
Map<Fut, F>(
49+
map::Map<Fut, F>
50+
): Debug + Future + FusedFuture + Sink + Stream + FusedStream + New[|x: Fut, f: F| map::Map::new(x, f)]
51+
);
4752

4853
delegate_all!(
4954
/// Stream for the [`into_stream`](FutureExt::into_stream) method.

futures-util/src/future/try_future/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ delegate_all!(
4343

4444
delegate_all!(
4545
/// Future for the [`try_flatten_stream`](TryFutureExt::try_flatten_stream) method.
46-
TryFlattenStream<Fut, St>(
47-
try_flatten::TryFlatten<Fut, St>
46+
TryFlattenStream<Fut>(
47+
try_flatten::TryFlatten<Fut, Fut::Ok>
4848
): Debug + Future + FusedFuture + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)]
49+
where Fut: TryFuture
4950
);
5051

5152
#[cfg(feature = "sink")]
@@ -526,7 +527,7 @@ pub trait TryFutureExt: TryFuture {
526527
/// assert_eq!(list, Ok(vec![17, 18, 19]));
527528
/// # });
528529
/// ```
529-
fn try_flatten_stream(self) -> TryFlattenStream<Self, Self::Ok>
530+
fn try_flatten_stream(self) -> TryFlattenStream<Self>
530531
where
531532
Self::Ok: TryStream<Error = Self::Error>,
532533
Self: Sized,

futures-util/src/future/try_future/try_flatten.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
5757
self.set(TryFlatten::Empty);
5858
break output;
5959
},
60-
TryFlatten::Empty => return Poll::Pending,
60+
TryFlatten::Empty => panic!("TryFlatten polled after completion"),
6161
}
6262
})
6363
}

futures-util/src/future/try_future/try_flatten_err.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl<Fut> Future for TryFlattenErr<Fut, Fut::Error>
5454
self.set(TryFlattenErr::Empty);
5555
break output;
5656
},
57-
TryFlattenErr::Empty => return Poll::Pending,
57+
TryFlattenErr::Empty => panic!("TryFlattenErr polled after completion"),
5858
}
5959
})
6060
}

futures-util/src/stream/stream/filter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F>
8080
let Filter { mut stream, f, mut pending_fut, pending_item } = self.project();
8181
Poll::Ready(loop {
8282
if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
83-
if ready!(fut.poll(cx)) {
83+
let res = ready!(fut.poll(cx));
84+
pending_fut.set(None);
85+
if res {
8486
break pending_item.take();
8587
}
8688
*pending_item = None;

futures-util/src/stream/stream/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,14 @@ mod filter_map;
4646
pub use self::filter_map::FilterMap;
4747

4848
mod flatten;
49-
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
50-
pub use self::flatten::Flatten;
49+
50+
delegate_all!(
51+
/// Stream for the [`inspect`](StreamExt::inspect) method.
52+
Flatten<St>(
53+
flatten::Flatten<St, St::Item>
54+
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
55+
where St: Stream
56+
);
5157

5258
mod fold;
5359
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -91,8 +97,8 @@ pub use self::map::Map;
9197
delegate_all!(
9298
/// Stream for the [`flat_map`](StreamExt::flat_map) method.
9399
FlatMap<St, U, F>(
94-
Flatten<Map<St, F>, U>
95-
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Flatten::new(Map::new(x, f))]
100+
flatten::Flatten<Map<St, F>, U>
101+
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
96102
);
97103

98104
mod next;
@@ -563,7 +569,7 @@ pub trait StreamExt: Stream {
563569
/// assert_eq!(output, vec![1, 2, 3, 4]);
564570
/// # });
565571
/// ```
566-
fn flatten(self) -> Flatten<Self, Self::Item>
572+
fn flatten(self) -> Flatten<Self>
567573
where
568574
Self::Item: Stream,
569575
Self: Sized,

futures-util/src/stream/try_stream/try_filter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
7878
let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project();
7979
Poll::Ready(loop {
8080
if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
81-
if ready!(fut.poll(cx)) {
81+
let res = ready!(fut.poll(cx));
82+
pending_fut.set(None);
83+
if res {
8284
break pending_item.take().map(Ok);
8385
}
8486
*pending_item = None;

0 commit comments

Comments
 (0)