Skip to content

Commit 62d7d00

Browse files
committed
auto merge of #12103 : alexcrichton/rust/unix, r=brson
There's a few parts to this PR * Implement unix pipes in libnative for unix platforms (thanks @Geal!) * Implement named pipes in libnative for windows (terrible, terrible code) * Remove `#[cfg(unix)]` from `mod unix` in `std::io::net`. This is a terrible name for what it is, but that's the topic of #12093. The windows implementation was significantly more complicated than I thought it would be, but it seems to be passing all the tests. now. Closes #11201
2 parents 03c5342 + a526aa1 commit 62d7d00

File tree

6 files changed

+928
-46
lines changed

6 files changed

+928
-46
lines changed

src/libnative/io/mod.rs

+17-4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ pub mod timer;
6060
#[path = "timer_win32.rs"]
6161
pub mod timer;
6262

63+
#[cfg(unix)]
64+
#[path = "pipe_unix.rs"]
65+
pub mod pipe;
66+
67+
#[cfg(windows)]
68+
#[path = "pipe_win32.rs"]
69+
pub mod pipe;
70+
6371
mod timer_helper;
6472

6573
pub type IoResult<T> = Result<T, IoError>;
@@ -77,6 +85,9 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
7785
fn get_err(errno: i32) -> (io::IoErrorKind, &'static str) {
7886
match errno {
7987
libc::EOF => (io::EndOfFile, "end of file"),
88+
libc::ERROR_NO_DATA => (io::BrokenPipe, "the pipe is being closed"),
89+
libc::ERROR_FILE_NOT_FOUND => (io::FileNotFound, "file not found"),
90+
libc::ERROR_INVALID_NAME => (io::InvalidInput, "invalid file name"),
8091
libc::WSAECONNREFUSED => (io::ConnectionRefused, "connection refused"),
8192
libc::WSAECONNRESET => (io::ConnectionReset, "connection reset"),
8293
libc::WSAEACCES => (io::PermissionDenied, "permission denied"),
@@ -86,6 +97,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
8697
libc::WSAECONNABORTED => (io::ConnectionAborted, "connection aborted"),
8798
libc::WSAEADDRNOTAVAIL => (io::ConnectionRefused, "address not available"),
8899
libc::WSAEADDRINUSE => (io::ConnectionRefused, "address in use"),
100+
libc::ERROR_BROKEN_PIPE => (io::BrokenPipe, "the pipe has ended"),
89101

90102
x => {
91103
debug!("ignoring {}: {}", x, os::last_os_error());
@@ -108,6 +120,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
108120
libc::ECONNABORTED => (io::ConnectionAborted, "connection aborted"),
109121
libc::EADDRNOTAVAIL => (io::ConnectionRefused, "address not available"),
110122
libc::EADDRINUSE => (io::ConnectionRefused, "address in use"),
123+
libc::ENOENT => (io::FileNotFound, "no such file or directory"),
111124

112125
// These two constants can have the same value on some systems, but
113126
// different values on others, so we can't use a match clause
@@ -196,11 +209,11 @@ impl rtio::IoFactory for IoFactory {
196209
fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket> {
197210
net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket)
198211
}
199-
fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> {
200-
Err(unimpl())
212+
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener> {
213+
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener)
201214
}
202-
fn unix_connect(&mut self, _path: &CString) -> IoResult<~RtioPipe> {
203-
Err(unimpl())
215+
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe> {
216+
pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe)
204217
}
205218
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
206219
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {

src/libnative/io/pipe_unix.rs

+285
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
use std::c_str::CString;
12+
use std::cast;
13+
use std::io;
14+
use std::libc;
15+
use std::mem;
16+
use std::rt::rtio;
17+
use std::sync::arc::UnsafeArc;
18+
use std::unstable::intrinsics;
19+
20+
use super::{IoResult, retry};
21+
use super::file::{keep_going, fd_t};
22+
23+
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
24+
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
25+
-1 => Err(super::last_error()),
26+
fd => Ok(fd)
27+
}
28+
}
29+
30+
fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
31+
// the sun_path length is limited to SUN_LEN (with null)
32+
assert!(mem::size_of::<libc::sockaddr_storage>() >=
33+
mem::size_of::<libc::sockaddr_un>());
34+
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
35+
let s: &mut libc::sockaddr_un = unsafe { cast::transmute(&mut storage) };
36+
37+
let len = addr.len();
38+
if len > s.sun_path.len() - 1 {
39+
return Err(io::IoError {
40+
kind: io::InvalidInput,
41+
desc: "path must be smaller than SUN_LEN",
42+
detail: None,
43+
})
44+
}
45+
s.sun_family = libc::AF_UNIX as libc::sa_family_t;
46+
for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
47+
*slot = value;
48+
}
49+
50+
// count the null terminator
51+
let len = mem::size_of::<libc::sa_family_t>() + len + 1;
52+
return Ok((storage, len));
53+
}
54+
55+
fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
56+
len: uint) -> IoResult<CString> {
57+
match storage.ss_family as libc::c_int {
58+
libc::AF_UNIX => {
59+
assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
60+
let storage: &libc::sockaddr_un = unsafe {
61+
cast::transmute(storage)
62+
};
63+
unsafe {
64+
Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
65+
}
66+
}
67+
_ => Err(io::standard_error(io::InvalidInput))
68+
}
69+
}
70+
71+
struct Inner {
72+
fd: fd_t,
73+
}
74+
75+
impl Drop for Inner {
76+
fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
77+
}
78+
79+
fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
80+
let (addr, len) = if_ok!(addr_to_sockaddr_un(addr));
81+
let inner = Inner { fd: if_ok!(unix_socket(ty)) };
82+
let addrp = &addr as *libc::sockaddr_storage;
83+
match retry(|| unsafe {
84+
libc::connect(inner.fd, addrp as *libc::sockaddr,
85+
len as libc::socklen_t)
86+
}) {
87+
-1 => Err(super::last_error()),
88+
_ => Ok(inner)
89+
}
90+
}
91+
92+
fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
93+
let (addr, len) = if_ok!(addr_to_sockaddr_un(addr));
94+
let inner = Inner { fd: if_ok!(unix_socket(ty)) };
95+
let addrp = &addr as *libc::sockaddr_storage;
96+
match unsafe {
97+
libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t)
98+
} {
99+
-1 => Err(super::last_error()),
100+
_ => Ok(inner)
101+
}
102+
}
103+
104+
////////////////////////////////////////////////////////////////////////////////
105+
// Unix Streams
106+
////////////////////////////////////////////////////////////////////////////////
107+
108+
pub struct UnixStream {
109+
priv inner: UnsafeArc<Inner>,
110+
}
111+
112+
impl UnixStream {
113+
pub fn connect(addr: &CString) -> IoResult<UnixStream> {
114+
connect(addr, libc::SOCK_STREAM).map(|inner| {
115+
UnixStream { inner: UnsafeArc::new(inner) }
116+
})
117+
}
118+
119+
fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
120+
}
121+
122+
impl rtio::RtioPipe for UnixStream {
123+
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
124+
let ret = retry(|| unsafe {
125+
libc::recv(self.fd(),
126+
buf.as_ptr() as *mut libc::c_void,
127+
buf.len() as libc::size_t,
128+
0) as libc::c_int
129+
});
130+
if ret == 0 {
131+
Err(io::standard_error(io::EndOfFile))
132+
} else if ret < 0 {
133+
Err(super::last_error())
134+
} else {
135+
Ok(ret as uint)
136+
}
137+
}
138+
139+
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
140+
let ret = keep_going(buf, |buf, len| unsafe {
141+
libc::send(self.fd(),
142+
buf as *mut libc::c_void,
143+
len as libc::size_t,
144+
0) as i64
145+
});
146+
if ret < 0 {
147+
Err(super::last_error())
148+
} else {
149+
Ok(())
150+
}
151+
}
152+
153+
fn clone(&self) -> ~rtio::RtioPipe {
154+
~UnixStream { inner: self.inner.clone() } as ~rtio::RtioPipe
155+
}
156+
}
157+
158+
////////////////////////////////////////////////////////////////////////////////
159+
// Unix Datagram
160+
////////////////////////////////////////////////////////////////////////////////
161+
162+
pub struct UnixDatagram {
163+
priv inner: UnsafeArc<Inner>,
164+
}
165+
166+
impl UnixDatagram {
167+
pub fn connect(addr: &CString) -> IoResult<UnixDatagram> {
168+
connect(addr, libc::SOCK_DGRAM).map(|inner| {
169+
UnixDatagram { inner: UnsafeArc::new(inner) }
170+
})
171+
}
172+
173+
pub fn bind(addr: &CString) -> IoResult<UnixDatagram> {
174+
bind(addr, libc::SOCK_DGRAM).map(|inner| {
175+
UnixDatagram { inner: UnsafeArc::new(inner) }
176+
})
177+
}
178+
179+
fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
180+
181+
pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> {
182+
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
183+
let storagep = &mut storage as *mut libc::sockaddr_storage;
184+
let mut addrlen: libc::socklen_t =
185+
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
186+
let ret = retry(|| unsafe {
187+
libc::recvfrom(self.fd(),
188+
buf.as_ptr() as *mut libc::c_void,
189+
buf.len() as libc::size_t,
190+
0,
191+
storagep as *mut libc::sockaddr,
192+
&mut addrlen) as libc::c_int
193+
});
194+
if ret < 0 { return Err(super::last_error()) }
195+
sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| {
196+
Ok((ret as uint, addr))
197+
})
198+
}
199+
200+
pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> {
201+
let (dst, len) = if_ok!(addr_to_sockaddr_un(dst));
202+
let dstp = &dst as *libc::sockaddr_storage;
203+
let ret = retry(|| unsafe {
204+
libc::sendto(self.fd(),
205+
buf.as_ptr() as *libc::c_void,
206+
buf.len() as libc::size_t,
207+
0,
208+
dstp as *libc::sockaddr,
209+
len as libc::socklen_t) as libc::c_int
210+
});
211+
match ret {
212+
-1 => Err(super::last_error()),
213+
n if n as uint != buf.len() => {
214+
Err(io::IoError {
215+
kind: io::OtherIoError,
216+
desc: "couldn't send entire packet at once",
217+
detail: None,
218+
})
219+
}
220+
_ => Ok(())
221+
}
222+
}
223+
224+
pub fn clone(&mut self) -> UnixDatagram {
225+
UnixDatagram { inner: self.inner.clone() }
226+
}
227+
}
228+
229+
////////////////////////////////////////////////////////////////////////////////
230+
// Unix Listener
231+
////////////////////////////////////////////////////////////////////////////////
232+
233+
pub struct UnixListener {
234+
priv inner: Inner,
235+
}
236+
237+
impl UnixListener {
238+
pub fn bind(addr: &CString) -> IoResult<UnixListener> {
239+
bind(addr, libc::SOCK_STREAM).map(|fd| UnixListener { inner: fd })
240+
}
241+
242+
fn fd(&self) -> fd_t { self.inner.fd }
243+
244+
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
245+
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
246+
-1 => Err(super::last_error()),
247+
_ => Ok(UnixAcceptor { listener: self })
248+
}
249+
}
250+
}
251+
252+
impl rtio::RtioUnixListener for UnixListener {
253+
fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> {
254+
self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor)
255+
}
256+
}
257+
258+
pub struct UnixAcceptor {
259+
priv listener: UnixListener,
260+
}
261+
262+
impl UnixAcceptor {
263+
fn fd(&self) -> fd_t { self.listener.fd() }
264+
265+
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
266+
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
267+
let storagep = &mut storage as *mut libc::sockaddr_storage;
268+
let size = mem::size_of::<libc::sockaddr_storage>();
269+
let mut size = size as libc::socklen_t;
270+
match retry(|| unsafe {
271+
libc::accept(self.fd(),
272+
storagep as *mut libc::sockaddr,
273+
&mut size as *mut libc::socklen_t) as libc::c_int
274+
}) {
275+
-1 => Err(super::last_error()),
276+
fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) })
277+
}
278+
}
279+
}
280+
281+
impl rtio::RtioUnixAcceptor for UnixAcceptor {
282+
fn accept(&mut self) -> IoResult<~rtio::RtioPipe> {
283+
self.native_accept().map(|s| ~s as ~rtio::RtioPipe)
284+
}
285+
}

0 commit comments

Comments
 (0)