From 3c3ab18bc38b8c71182a18b352c9109b227ea9a4 Mon Sep 17 00:00:00 2001
From: Aleksey Kladov <aleksey.kladov@gmail.com>
Date: Fri, 1 Nov 2019 21:24:16 +0300
Subject: [PATCH 1/3] Stream::merge does not end prematurely if one stream is
 delayed

---
 src/stream/stream/merge.rs | 15 ++++++------
 tests/stream.rs            | 47 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 7 deletions(-)
 create mode 100644 tests/stream.rs

diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs
index d926ec4fe..c387f303c 100644
--- a/src/stream/stream/merge.rs
+++ b/src/stream/stream/merge.rs
@@ -38,13 +38,14 @@ where
 
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         let this = self.project();
-        if let Poll::Ready(Some(item)) = this.left.poll_next(cx) {
-            // The first stream made progress. The Merge needs to be polled
-            // again to check the progress of the second stream.
-            cx.waker().wake_by_ref();
-            Poll::Ready(Some(item))
-        } else {
-            this.right.poll_next(cx)
+        match this.left.poll_next(cx) {
+            Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
+            Poll::Ready(None) => this.right.poll_next(cx),
+            Poll::Pending => match this.right.poll_next(cx) {
+                Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
+                Poll::Ready(None) => Poll::Pending,
+                Poll::Pending => Poll::Pending,
+            }
         }
     }
 }
diff --git a/tests/stream.rs b/tests/stream.rs
new file mode 100644
index 000000000..fd6febada
--- /dev/null
+++ b/tests/stream.rs
@@ -0,0 +1,47 @@
+use async_std::prelude::*;
+use async_std::stream;
+use async_std::sync::channel;
+use async_std::task;
+
+#[test]
+/// Checks that streams are merged fully even if one of the components
+/// experiences delay.
+fn merging_delayed_streams_work() {
+    let (sender, receiver) = channel::<i32>(10);
+
+    let mut s = receiver.merge(stream::empty());
+    let t = task::spawn(async move {
+        let mut xs = Vec::new();
+        while let Some(x) = s.next().await {
+            xs.push(x);
+        }
+        xs
+    });
+
+    task::block_on(async move {
+        task::sleep(std::time::Duration::from_millis(500)).await;
+        sender.send(92).await;
+        drop(sender);
+        let xs = t.await;
+        assert_eq!(xs, vec![92])
+    });
+
+    let (sender, receiver) = channel::<i32>(10);
+
+    let mut s = stream::empty().merge(receiver);
+    let t = task::spawn(async move {
+        let mut xs = Vec::new();
+        while let Some(x) = s.next().await {
+            xs.push(x);
+        }
+        xs
+    });
+
+    task::block_on(async move {
+        task::sleep(std::time::Duration::from_millis(500)).await;
+        sender.send(92).await;
+        drop(sender);
+        let xs = t.await;
+        assert_eq!(xs, vec![92])
+    });
+}

From 1174c10abd05eb26261c7c389f05cb0a0c9409c9 Mon Sep 17 00:00:00 2001
From: Aleksey Kladov <aleksey.kladov@gmail.com>
Date: Fri, 1 Nov 2019 21:43:05 +0300
Subject: [PATCH 2/3] `cargo test` without features works

---
 Cargo.toml | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/Cargo.toml b/Cargo.toml
index 63897053b..5809dad93 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,3 +55,11 @@ futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
 # These are used by the book for examples
 futures-channel-preview = "=0.3.0-alpha.19"
 futures-util-preview = "=0.3.0-alpha.19"
+
+[[test]]
+name = "stream"
+required-features = ["unstable"]
+
+[[example]]
+name = "tcp-ipv4-and-6-echo"
+required-features = ["unstable"]

From 4891f55d9692bdb0c26e7da7ad40191b7a3204ba Mon Sep 17 00:00:00 2001
From: Aleksey Kladov <aleksey.kladov@gmail.com>
Date: Sat, 2 Nov 2019 21:47:02 +0300
Subject: [PATCH 3/3] Stream::merge works correctly for unfused streams

---
 src/stream/stream/merge.rs | 11 +++++---
 src/stream/stream/mod.rs   |  2 +-
 tests/stream.rs            | 53 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 61 insertions(+), 5 deletions(-)

diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs
index c387f303c..f3505acae 100644
--- a/src/stream/stream/merge.rs
+++ b/src/stream/stream/merge.rs
@@ -4,6 +4,9 @@ use std::task::{Context, Poll};
 use futures_core::Stream;
 use pin_project_lite::pin_project;
 
+use crate::prelude::*;
+use crate::stream::Fuse;
+
 pin_project! {
     /// A stream that merges two other streams into a single stream.
     ///
@@ -17,15 +20,15 @@ pin_project! {
     #[derive(Debug)]
     pub struct Merge<L, R> {
         #[pin]
-        left: L,
+        left: Fuse<L>,
         #[pin]
-        right: R,
+        right: Fuse<R>,
     }
 }
 
-impl<L, R> Merge<L, R> {
+impl<L: Stream, R: Stream> Merge<L, R> {
     pub(crate) fn new(left: L, right: R) -> Self {
-        Self { left, right }
+        Self { left: left.fuse(), right: right.fuse() }
     }
 }
 
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index f7905ed21..f677b9dbc 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -1590,7 +1590,7 @@ extension_trait! {
         }
 
         #[doc = r#"
-            Searches for an element in a Stream that satisfies a predicate, returning 
+            Searches for an element in a Stream that satisfies a predicate, returning
             its index.
 
             # Examples
diff --git a/tests/stream.rs b/tests/stream.rs
index fd6febada..42a6191fd 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -1,3 +1,8 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use pin_project_lite::pin_project;
+
 use async_std::prelude::*;
 use async_std::stream;
 use async_std::sync::channel;
@@ -45,3 +50,51 @@ fn merging_delayed_streams_work() {
         assert_eq!(xs, vec![92])
     });
 }
+
+pin_project! {
+    /// The opposite of `Fuse`: makes the stream panic if polled after termination.
+    struct Explode<S> {
+        #[pin]
+        done: bool,
+        #[pin]
+        inner: S,
+    }
+}
+
+impl<S: Stream> Stream for Explode<S> {
+    type Item = S::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        if *this.done {
+            panic!("KABOOM!")
+        }
+        let res = this.inner.poll_next(cx);
+        if let Poll::Ready(None) = &res {
+            *this.done = true;
+        }
+        res
+    }
+}
+
+fn explode<S: Stream>(s: S) -> Explode<S> {
+    Explode {
+        done: false,
+        inner: s,
+    }
+}
+
+#[test]
+fn merge_works_with_unfused_streams() {
+    let s1 = explode(stream::once(92));
+    let s2 = explode(stream::once(92));
+    let mut s = s1.merge(s2);
+    let xs = task::block_on(async move {
+        let mut xs = Vec::new();
+        while let Some(x) = s.next().await {
+            xs.push(x)
+        }
+        xs
+    });
+    assert_eq!(xs, vec![92, 92]);
+}