Skip to content

Commit 508c72b

Browse files
authoredJan 18, 2025
Rollup merge of #135583 - NobodyXu:move-pipe-to-io, r=joshtriplett
Move `std::pipe::*` into `std::io` Resolve concern from final comment period #127154 (comment)
·
1.89.01.86.0
2 parents 86947bb + 2af4197 commit 508c72b

File tree

11 files changed

+274
-291
lines changed

11 files changed

+274
-291
lines changed
 

‎library/std/src/io/mod.rs

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ pub use self::{
330330
};
331331
use crate::mem::take;
332332
use crate::ops::{Deref, DerefMut};
333+
use crate::sys::anonymous_pipe::{AnonPipe, pipe as pipe_inner};
333334
use crate::{cmp, fmt, slice, str, sys};
334335

335336
mod buffered;
@@ -3250,3 +3251,251 @@ impl<B: BufRead> Iterator for Lines<B> {
32503251
}
32513252
}
32523253
}
3254+
3255+
/// Create anonymous pipe that is close-on-exec and blocking.
3256+
///
3257+
/// # Behavior
3258+
///
3259+
/// A pipe is a synchronous, unidirectional data channel between two or more processes, like an
3260+
/// interprocess [`mpsc`](crate::sync::mpsc) provided by the OS. In particular:
3261+
///
3262+
/// * A read on a [`PipeReader`] blocks until the pipe is non-empty.
3263+
/// * A write on a [`PipeWriter`] blocks when the pipe is full.
3264+
/// * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`]
3265+
/// returns EOF.
3266+
/// * [`PipeReader`] can be shared, but only one process will consume the data in the pipe.
3267+
///
3268+
/// # Capacity
3269+
///
3270+
/// Pipe capacity is platform dependent. To quote the Linux [man page]:
3271+
///
3272+
/// > Different implementations have different limits for the pipe capacity. Applications should
3273+
/// > not rely on a particular capacity: an application should be designed so that a reading process
3274+
/// > consumes data as soon as it is available, so that a writing process does not remain blocked.
3275+
///
3276+
/// # Examples
3277+
///
3278+
/// ```no_run
3279+
/// #![feature(anonymous_pipe)]
3280+
/// # #[cfg(miri)] fn main() {}
3281+
/// # #[cfg(not(miri))]
3282+
/// # fn main() -> std::io::Result<()> {
3283+
/// # use std::process::Command;
3284+
/// # use std::io::{Read, Write};
3285+
/// let (ping_rx, mut ping_tx) = std::io::pipe()?;
3286+
/// let (mut pong_rx, pong_tx) = std::io::pipe()?;
3287+
///
3288+
/// // Spawn a process that echoes its input.
3289+
/// let mut echo_server = Command::new("cat").stdin(ping_rx).stdout(pong_tx).spawn()?;
3290+
///
3291+
/// ping_tx.write_all(b"hello")?;
3292+
/// // Close to unblock echo_server's reader.
3293+
/// drop(ping_tx);
3294+
///
3295+
/// let mut buf = String::new();
3296+
/// // Block until echo_server's writer is closed.
3297+
/// pong_rx.read_to_string(&mut buf)?;
3298+
/// assert_eq!(&buf, "hello");
3299+
///
3300+
/// echo_server.wait()?;
3301+
/// # Ok(())
3302+
/// # }
3303+
/// ```
3304+
/// [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html
3305+
/// [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe
3306+
/// [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html
3307+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3308+
#[inline]
3309+
pub fn pipe() -> Result<(PipeReader, PipeWriter)> {
3310+
pipe_inner().map(|(reader, writer)| (PipeReader(reader), PipeWriter(writer)))
3311+
}
3312+
3313+
/// Read end of the anonymous pipe.
3314+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3315+
#[derive(Debug)]
3316+
pub struct PipeReader(pub(crate) AnonPipe);
3317+
3318+
/// Write end of the anonymous pipe.
3319+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3320+
#[derive(Debug)]
3321+
pub struct PipeWriter(pub(crate) AnonPipe);
3322+
3323+
impl PipeReader {
3324+
/// Create a new [`PipeReader`] instance that shares the same underlying file description.
3325+
///
3326+
/// # Examples
3327+
///
3328+
/// ```no_run
3329+
/// #![feature(anonymous_pipe)]
3330+
/// # #[cfg(miri)] fn main() {}
3331+
/// # #[cfg(not(miri))]
3332+
/// # fn main() -> std::io::Result<()> {
3333+
/// # use std::fs;
3334+
/// # use std::io::Write;
3335+
/// # use std::process::Command;
3336+
/// const NUM_SLOT: u8 = 2;
3337+
/// const NUM_PROC: u8 = 5;
3338+
/// const OUTPUT: &str = "work.txt";
3339+
///
3340+
/// let mut jobs = vec![];
3341+
/// let (reader, mut writer) = std::io::pipe()?;
3342+
///
3343+
/// // Write NUM_SLOT characters the pipe.
3344+
/// writer.write_all(&[b'|'; NUM_SLOT as usize])?;
3345+
///
3346+
/// // Spawn several processes that read a character from the pipe, do some work, then
3347+
/// // write back to the pipe. When the pipe is empty, the processes block, so only
3348+
/// // NUM_SLOT processes can be working at any given time.
3349+
/// for _ in 0..NUM_PROC {
3350+
/// jobs.push(
3351+
/// Command::new("bash")
3352+
/// .args(["-c",
3353+
/// &format!(
3354+
/// "read -n 1\n\
3355+
/// echo -n 'x' >> '{OUTPUT}'\n\
3356+
/// echo -n '|'",
3357+
/// ),
3358+
/// ])
3359+
/// .stdin(reader.try_clone()?)
3360+
/// .stdout(writer.try_clone()?)
3361+
/// .spawn()?,
3362+
/// );
3363+
/// }
3364+
///
3365+
/// // Wait for all jobs to finish.
3366+
/// for mut job in jobs {
3367+
/// job.wait()?;
3368+
/// }
3369+
///
3370+
/// // Check our work and clean up.
3371+
/// let xs = fs::read_to_string(OUTPUT)?;
3372+
/// fs::remove_file(OUTPUT)?;
3373+
/// assert_eq!(xs, "x".repeat(NUM_PROC.into()));
3374+
/// # Ok(())
3375+
/// # }
3376+
/// ```
3377+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3378+
pub fn try_clone(&self) -> Result<Self> {
3379+
self.0.try_clone().map(Self)
3380+
}
3381+
}
3382+
3383+
impl PipeWriter {
3384+
/// Create a new [`PipeWriter`] instance that shares the same underlying file description.
3385+
///
3386+
/// # Examples
3387+
///
3388+
/// ```no_run
3389+
/// #![feature(anonymous_pipe)]
3390+
/// # #[cfg(miri)] fn main() {}
3391+
/// # #[cfg(not(miri))]
3392+
/// # fn main() -> std::io::Result<()> {
3393+
/// # use std::process::Command;
3394+
/// # use std::io::Read;
3395+
/// let (mut reader, writer) = std::io::pipe()?;
3396+
///
3397+
/// // Spawn a process that writes to stdout and stderr.
3398+
/// let mut peer = Command::new("bash")
3399+
/// .args([
3400+
/// "-c",
3401+
/// "echo -n foo\n\
3402+
/// echo -n bar >&2"
3403+
/// ])
3404+
/// .stdout(writer.try_clone()?)
3405+
/// .stderr(writer)
3406+
/// .spawn()?;
3407+
///
3408+
/// // Read and check the result.
3409+
/// let mut msg = String::new();
3410+
/// reader.read_to_string(&mut msg)?;
3411+
/// assert_eq!(&msg, "foobar");
3412+
///
3413+
/// peer.wait()?;
3414+
/// # Ok(())
3415+
/// # }
3416+
/// ```
3417+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3418+
pub fn try_clone(&self) -> Result<Self> {
3419+
self.0.try_clone().map(Self)
3420+
}
3421+
}
3422+
3423+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3424+
impl Read for &PipeReader {
3425+
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
3426+
self.0.read(buf)
3427+
}
3428+
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
3429+
self.0.read_vectored(bufs)
3430+
}
3431+
#[inline]
3432+
fn is_read_vectored(&self) -> bool {
3433+
self.0.is_read_vectored()
3434+
}
3435+
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
3436+
self.0.read_to_end(buf)
3437+
}
3438+
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> {
3439+
self.0.read_buf(buf)
3440+
}
3441+
}
3442+
3443+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3444+
impl Read for PipeReader {
3445+
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
3446+
self.0.read(buf)
3447+
}
3448+
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
3449+
self.0.read_vectored(bufs)
3450+
}
3451+
#[inline]
3452+
fn is_read_vectored(&self) -> bool {
3453+
self.0.is_read_vectored()
3454+
}
3455+
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
3456+
self.0.read_to_end(buf)
3457+
}
3458+
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> {
3459+
self.0.read_buf(buf)
3460+
}
3461+
}
3462+
3463+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3464+
impl Write for &PipeWriter {
3465+
fn write(&mut self, buf: &[u8]) -> Result<usize> {
3466+
self.0.write(buf)
3467+
}
3468+
#[inline]
3469+
fn flush(&mut self) -> Result<()> {
3470+
Ok(())
3471+
}
3472+
3473+
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
3474+
self.0.write_vectored(bufs)
3475+
}
3476+
3477+
#[inline]
3478+
fn is_write_vectored(&self) -> bool {
3479+
self.0.is_write_vectored()
3480+
}
3481+
}
3482+
3483+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3484+
impl Write for PipeWriter {
3485+
fn write(&mut self, buf: &[u8]) -> Result<usize> {
3486+
self.0.write(buf)
3487+
}
3488+
#[inline]
3489+
fn flush(&mut self) -> Result<()> {
3490+
Ok(())
3491+
}
3492+
3493+
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
3494+
self.0.write_vectored(bufs)
3495+
}
3496+
3497+
#[inline]
3498+
fn is_write_vectored(&self) -> bool {
3499+
self.0.is_write_vectored()
3500+
}
3501+
}

‎library/std/src/io/tests.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,3 +823,20 @@ fn try_oom_error() {
823823
let io_err = io::Error::from(reserve_err);
824824
assert_eq!(io::ErrorKind::OutOfMemory, io_err.kind());
825825
}
826+
827+
#[test]
828+
#[cfg(all(windows, unix, not(miri)))]
829+
fn pipe_creation_clone_and_rw() {
830+
let (rx, tx) = std::io::pipe().unwrap();
831+
832+
tx.try_clone().unwrap().write_all(b"12345").unwrap();
833+
drop(tx);
834+
835+
let mut rx2 = rx.try_clone().unwrap();
836+
drop(rx);
837+
838+
let mut s = String::new();
839+
rx2.read_to_string(&mut s).unwrap();
840+
drop(rx2);
841+
assert_eq!(s, "12345");
842+
}

‎library/std/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -596,8 +596,6 @@ pub mod panic;
596596
#[unstable(feature = "pattern_type_macro", issue = "123646")]
597597
pub mod pat;
598598
pub mod path;
599-
#[unstable(feature = "anonymous_pipe", issue = "127154")]
600-
pub mod pipe;
601599
pub mod process;
602600
#[unstable(feature = "random", issue = "130703")]
603601
pub mod random;

‎library/std/src/pipe.rs

Lines changed: 0 additions & 258 deletions
This file was deleted.

‎library/std/src/pipe/tests.rs

Lines changed: 0 additions & 19 deletions
This file was deleted.

‎library/std/src/sys/anonymous_pipe/unix.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use crate::io;
1+
use crate::io::{self, PipeReader, PipeWriter};
22
use crate::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
3-
use crate::pipe::{PipeReader, PipeWriter};
43
use crate::process::Stdio;
54
use crate::sys::fd::FileDesc;
65
use crate::sys::pipe::anon_pipe;

‎library/std/src/sys/anonymous_pipe/unsupported.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use crate::io;
2-
use crate::pipe::{PipeReader, PipeWriter};
1+
use crate::io::{self, PipeReader, PipeWriter};
32
use crate::process::Stdio;
43
pub use crate::sys::pipe::AnonPipe;
54

‎library/std/src/sys/anonymous_pipe/windows.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1+
use crate::io::{self, PipeReader, PipeWriter};
12
use crate::os::windows::io::{
23
AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle,
34
};
4-
use crate::pipe::{PipeReader, PipeWriter};
55
use crate::process::Stdio;
6+
use crate::ptr;
67
use crate::sys::c;
78
use crate::sys::handle::Handle;
89
use crate::sys_common::{FromInner, IntoInner};
9-
use crate::{io, ptr};
1010

1111
pub type AnonPipe = Handle;
1212

‎library/std/src/sys/pal/unix/kernel_copy.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,14 @@ use crate::cmp::min;
5252
use crate::fs::{File, Metadata};
5353
use crate::io::copy::generic_copy;
5454
use crate::io::{
55-
BufRead, BufReader, BufWriter, Error, Read, Result, StderrLock, StdinLock, StdoutLock, Take,
56-
Write,
55+
BufRead, BufReader, BufWriter, Error, PipeReader, PipeWriter, Read, Result, StderrLock,
56+
StdinLock, StdoutLock, Take, Write,
5757
};
5858
use crate::mem::ManuallyDrop;
5959
use crate::net::TcpStream;
6060
use crate::os::unix::fs::FileTypeExt;
6161
use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd};
6262
use crate::os::unix::net::UnixStream;
63-
use crate::pipe::{PipeReader, PipeWriter};
6463
use crate::process::{ChildStderr, ChildStdin, ChildStdout};
6564
use crate::ptr;
6665
use crate::sync::atomic::{AtomicBool, AtomicU8, Ordering};

‎library/std/tests/pipe_subprocess.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
fn main() {
44
#[cfg(all(not(miri), any(unix, windows)))]
55
{
6-
use std::io::Read;
7-
use std::pipe::pipe;
6+
use std::io::{Read, pipe};
87
use std::{env, process};
98

109
if env::var("I_AM_THE_CHILD").is_ok() {

‎tests/run-make/broken-pipe-no-ice/rmake.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ enum Binary {
2525
}
2626

2727
fn check_broken_pipe_handled_gracefully(bin: Binary, mut cmd: Command) {
28-
let (reader, writer) = std::pipe::pipe().unwrap();
28+
let (reader, writer) = std::io::pipe().unwrap();
2929
drop(reader); // close read-end
3030
cmd.stdout(writer).stderr(Stdio::piped());
3131

0 commit comments

Comments
 (0)
Please sign in to comment.