Skip to content

Commit 3ff9e98

Browse files
authored
Merge pull request #585 from async-rs/try_channels
expose try_recv and try_send on channels
2 parents 49dd02b + b7c7efc commit 3ff9e98

File tree

3 files changed

+152
-49
lines changed

3 files changed

+152
-49
lines changed

src/sync/channel.rs

+131-28
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::cell::UnsafeCell;
2-
use std::fmt;
2+
use std::error::Error;
3+
use std::fmt::{self, Debug, Display};
34
use std::future::Future;
45
use std::isize;
56
use std::marker::PhantomData;
@@ -31,6 +32,7 @@ use crate::sync::WakerSet;
3132
/// # Examples
3233
///
3334
/// ```
35+
/// # fn main() -> Result<(), async_std::sync::RecvError> {
3436
/// # async_std::task::block_on(async {
3537
/// #
3638
/// use std::time::Duration;
@@ -50,10 +52,11 @@ use crate::sync::WakerSet;
5052
/// });
5153
///
5254
/// task::sleep(Duration::from_secs(1)).await;
53-
/// assert_eq!(r.recv().await, Some(1));
54-
/// assert_eq!(r.recv().await, Some(2));
55+
/// assert_eq!(r.recv().await?, 1);
56+
/// assert_eq!(r.recv().await?, 2);
57+
/// # Ok(())
5558
/// #
56-
/// # })
59+
/// # }) }
5760
/// ```
5861
#[cfg(feature = "unstable")]
5962
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
@@ -112,6 +115,7 @@ impl<T> Sender<T> {
112115
/// # Examples
113116
///
114117
/// ```
118+
/// # fn main() -> Result<(), async_std::sync::RecvError> {
115119
/// # async_std::task::block_on(async {
116120
/// #
117121
/// use async_std::sync::channel;
@@ -124,11 +128,12 @@ impl<T> Sender<T> {
124128
/// s.send(2).await;
125129
/// });
126130
///
127-
/// assert_eq!(r.recv().await, Some(1));
128-
/// assert_eq!(r.recv().await, Some(2));
129-
/// assert_eq!(r.recv().await, None);
131+
/// assert_eq!(r.recv().await?, 1);
132+
/// assert_eq!(r.recv().await?, 2);
133+
/// assert!(r.recv().await.is_err());
130134
/// #
131-
/// # })
135+
/// # Ok(())
136+
/// # }) }
132137
/// ```
133138
pub async fn send(&self, msg: T) {
134139
struct SendFuture<'a, T> {
@@ -192,6 +197,27 @@ impl<T> Sender<T> {
192197
.await
193198
}
194199

200+
/// Attempts to send a message into the channel.
201+
///
202+
/// If the channel is full, this method will return an error.
203+
///
204+
/// # Examples
205+
///
206+
/// ```
207+
/// # async_std::task::block_on(async {
208+
/// #
209+
/// use async_std::sync::channel;
210+
///
211+
/// let (s, r) = channel(1);
212+
/// assert!(s.try_send(1).is_ok());
213+
/// assert!(s.try_send(2).is_err());
214+
/// #
215+
/// # })
216+
/// ```
217+
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
218+
self.channel.try_send(msg)
219+
}
220+
195221
/// Returns the channel capacity.
196222
///
197223
/// # Examples
@@ -313,6 +339,7 @@ impl<T> fmt::Debug for Sender<T> {
313339
/// # Examples
314340
///
315341
/// ```
342+
/// # fn main() -> Result<(), async_std::sync::RecvError> {
316343
/// # async_std::task::block_on(async {
317344
/// #
318345
/// use std::time::Duration;
@@ -328,10 +355,11 @@ impl<T> fmt::Debug for Sender<T> {
328355
/// s.send(2).await;
329356
/// });
330357
///
331-
/// assert_eq!(r.recv().await, Some(1)); // Received immediately.
332-
/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second.
358+
/// assert_eq!(r.recv().await?, 1); // Received immediately.
359+
/// assert_eq!(r.recv().await?, 2); // Received after 1 second.
333360
/// #
334-
/// # })
361+
/// # Ok(())
362+
/// # }) }
335363
/// ```
336364
#[cfg(feature = "unstable")]
337365
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
@@ -353,6 +381,7 @@ impl<T> Receiver<T> {
353381
/// # Examples
354382
///
355383
/// ```
384+
/// # fn main() -> Result<(), async_std::sync::RecvError> {
356385
/// # async_std::task::block_on(async {
357386
/// #
358387
/// use async_std::sync::channel;
@@ -366,22 +395,21 @@ impl<T> Receiver<T> {
366395
/// // Then we drop the sender
367396
/// });
368397
///
369-
/// assert_eq!(r.recv().await, Some(1));
370-
/// assert_eq!(r.recv().await, Some(2));
371-
///
372-
/// // recv() returns `None`
373-
/// assert_eq!(r.recv().await, None);
398+
/// assert_eq!(r.recv().await?, 1);
399+
/// assert_eq!(r.recv().await?, 2);
400+
/// assert!(r.recv().await.is_err());
374401
/// #
375-
/// # })
402+
/// # Ok(())
403+
/// # }) }
376404
/// ```
377-
pub async fn recv(&self) -> Option<T> {
405+
pub async fn recv(&self) -> Result<T, RecvError> {
378406
struct RecvFuture<'a, T> {
379407
channel: &'a Channel<T>,
380408
opt_key: Option<usize>,
381409
}
382410

383411
impl<T> Future for RecvFuture<'_, T> {
384-
type Output = Option<T>;
412+
type Output = Result<T, RecvError>;
385413

386414
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
387415
poll_recv(
@@ -409,6 +437,30 @@ impl<T> Receiver<T> {
409437
.await
410438
}
411439

440+
/// Attempts to receive a message from the channel.
441+
///
442+
/// If the channel is empty, this method will return an error.
443+
///
444+
/// # Examples
445+
///
446+
/// ```
447+
/// # async_std::task::block_on(async {
448+
/// #
449+
/// use async_std::sync::channel;
450+
///
451+
/// let (s, r) = channel(1);
452+
///
453+
/// s.send(1u8).await;
454+
///
455+
/// assert!(r.try_recv().is_ok());
456+
/// assert!(r.try_recv().is_err());
457+
/// #
458+
/// # })
459+
/// ```
460+
pub fn try_recv(&self) -> Result<T, TryRecvError> {
461+
self.channel.try_recv()
462+
}
463+
412464
/// Returns the channel capacity.
413465
///
414466
/// # Examples
@@ -523,12 +575,13 @@ impl<T> Stream for Receiver<T> {
523575

524576
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
525577
let this = &mut *self;
526-
poll_recv(
578+
let res = futures_core::ready!(poll_recv(
527579
&this.channel,
528580
&this.channel.stream_wakers,
529581
&mut this.opt_key,
530582
cx,
531-
)
583+
));
584+
Poll::Ready(res.ok())
532585
}
533586
}
534587

@@ -547,7 +600,7 @@ fn poll_recv<T>(
547600
wakers: &WakerSet,
548601
opt_key: &mut Option<usize>,
549602
cx: &mut Context<'_>,
550-
) -> Poll<Option<T>> {
603+
) -> Poll<Result<T, RecvError>> {
551604
loop {
552605
// If the current task is in the set, remove it.
553606
if let Some(key) = opt_key.take() {
@@ -556,8 +609,8 @@ fn poll_recv<T>(
556609

557610
// Try receiving a message.
558611
match channel.try_recv() {
559-
Ok(msg) => return Poll::Ready(Some(msg)),
560-
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
612+
Ok(msg) => return Poll::Ready(Ok(msg)),
613+
Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})),
561614
Err(TryRecvError::Empty) => {
562615
// Insert this receive operation.
563616
*opt_key = Some(wakers.insert(cx));
@@ -936,20 +989,70 @@ impl<T> Drop for Channel<T> {
936989
}
937990
}
938991

939-
/// An error returned from the `try_send()` method.
940-
enum TrySendError<T> {
992+
/// An error returned from the `try_send` method.
993+
#[cfg(feature = "unstable")]
994+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
995+
pub enum TrySendError<T> {
941996
/// The channel is full but not disconnected.
942997
Full(T),
943998

944999
/// The channel is full and disconnected.
9451000
Disconnected(T),
9461001
}
9471002

948-
/// An error returned from the `try_recv()` method.
949-
enum TryRecvError {
1003+
impl<T> Error for TrySendError<T> {}
1004+
1005+
impl<T> Debug for TrySendError<T> {
1006+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1007+
match self {
1008+
Self::Full(_) => Debug::fmt("Full<T>", f),
1009+
Self::Disconnected(_) => Debug::fmt("Disconnected<T>", f),
1010+
}
1011+
}
1012+
}
1013+
1014+
impl<T> Display for TrySendError<T> {
1015+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1016+
match self {
1017+
Self::Full(_) => Display::fmt("The channel is full.", f),
1018+
Self::Disconnected(_) => Display::fmt("The channel is full and disconnected.", f),
1019+
}
1020+
}
1021+
}
1022+
1023+
/// An error returned from the `try_recv` method.
1024+
#[cfg(feature = "unstable")]
1025+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1026+
#[derive(Debug)]
1027+
pub enum TryRecvError {
9501028
/// The channel is empty but not disconnected.
9511029
Empty,
9521030

9531031
/// The channel is empty and disconnected.
9541032
Disconnected,
9551033
}
1034+
1035+
impl Error for TryRecvError {}
1036+
1037+
impl Display for TryRecvError {
1038+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1039+
match self {
1040+
Self::Empty => Display::fmt("The channel is empty.", f),
1041+
Self::Disconnected => Display::fmt("The channel is empty and disconnected.", f),
1042+
}
1043+
}
1044+
}
1045+
1046+
/// An error returned from the `recv` method.
1047+
#[cfg(feature = "unstable")]
1048+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1049+
#[derive(Debug)]
1050+
pub struct RecvError;
1051+
1052+
impl Error for RecvError {}
1053+
1054+
impl Display for RecvError {
1055+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1056+
Display::fmt("The channel is empty.", f)
1057+
}
1058+
}

src/sync/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ mod rwlock;
184184

185185
cfg_unstable! {
186186
pub use barrier::{Barrier, BarrierWaitResult};
187-
pub use channel::{channel, Sender, Receiver};
187+
pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError};
188188

189189
mod barrier;
190190
mod channel;

0 commit comments

Comments
 (0)