diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index fe3579e9d..d9d2b0a08 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -5,6 +5,7 @@ use pin_project_lite::pin_project; use crate::prelude::*; use crate::stream::Fuse; +use crate::utils; pin_project! { /// A stream that merges two other streams into a single stream. @@ -27,7 +28,10 @@ pin_project! { impl Merge { pub(crate) fn new(left: L, right: R) -> Self { - Self { left: left.fuse(), right: right.fuse() } + Self { + left: left.fuse(), + right: right.fuse(), + } } } @@ -40,14 +44,29 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - match this.left.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => this.right.poll_next(cx), - Poll::Pending => match this.right.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Pending, - Poll::Pending => Poll::Pending, - } + if utils::random(1) == 1 { + poll_next_in_order(this.left, this.right, cx) + } else { + poll_next_in_order(this.right, this.left, cx) } } } + +fn poll_next_in_order( + first: Pin<&mut F>, + second: Pin<&mut S>, + cx: &mut Context<'_>, +) -> Poll> +where + F: Stream, + S: Stream, +{ + match first.poll_next(cx) { + Poll::Ready(None) => second.poll_next(cx), + Poll::Ready(item) => Poll::Ready(item), + Poll::Pending => match second.poll_next(cx) { + Poll::Ready(None) | Poll::Pending => Poll::Pending, + Poll::Ready(item) => Poll::Ready(item), + }, + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ea1459f2..223aea5a0 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1663,18 +1663,17 @@ extension_trait! { ``` # async_std::task::block_on(async { use async_std::prelude::*; - use async_std::stream; + use async_std::stream::{self, FromStream}; let a = stream::once(1u8); let b = stream::once(2u8); let c = stream::once(3u8); - let mut s = a.merge(b).merge(c); + let s = a.merge(b).merge(c); + let mut lst = Vec::from_stream(s).await; - assert_eq!(s.next().await, Some(1u8)); - assert_eq!(s.next().await, Some(2u8)); - assert_eq!(s.next().await, Some(3u8)); - assert_eq!(s.next().await, None); + lst.sort_unstable(); + assert_eq!(&lst, &[1u8, 2u8, 3u8]); # }); ``` "#]