diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 69ef10ac11bea..0f9439b3eb5b8 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -60,6 +60,14 @@ pub mod timer; #[path = "timer_win32.rs"] pub mod timer; +#[cfg(unix)] +#[path = "pipe_unix.rs"] +pub mod pipe; + +#[cfg(windows)] +#[path = "pipe_win32.rs"] +pub mod pipe; + mod timer_helper; pub type IoResult = Result; @@ -77,6 +85,9 @@ fn translate_error(errno: i32, detail: bool) -> IoError { fn get_err(errno: i32) -> (io::IoErrorKind, &'static str) { match errno { libc::EOF => (io::EndOfFile, "end of file"), + libc::ERROR_NO_DATA => (io::BrokenPipe, "the pipe is being closed"), + libc::ERROR_FILE_NOT_FOUND => (io::FileNotFound, "file not found"), + libc::ERROR_INVALID_NAME => (io::InvalidInput, "invalid file name"), libc::WSAECONNREFUSED => (io::ConnectionRefused, "connection refused"), libc::WSAECONNRESET => (io::ConnectionReset, "connection reset"), libc::WSAEACCES => (io::PermissionDenied, "permission denied"), @@ -86,6 +97,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError { libc::WSAECONNABORTED => (io::ConnectionAborted, "connection aborted"), libc::WSAEADDRNOTAVAIL => (io::ConnectionRefused, "address not available"), libc::WSAEADDRINUSE => (io::ConnectionRefused, "address in use"), + libc::ERROR_BROKEN_PIPE => (io::BrokenPipe, "the pipe has ended"), x => { debug!("ignoring {}: {}", x, os::last_os_error()); @@ -108,6 +120,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError { libc::ECONNABORTED => (io::ConnectionAborted, "connection aborted"), libc::EADDRNOTAVAIL => (io::ConnectionRefused, "address not available"), libc::EADDRINUSE => (io::ConnectionRefused, "address in use"), + libc::ENOENT => (io::FileNotFound, "no such file or directory"), // These two constants can have the same value on some systems, but // different values on others, so we can't use a match clause @@ -196,11 +209,11 @@ impl rtio::IoFactory for IoFactory { fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket> { net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket) } - fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> { - Err(unimpl()) + fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener> { + pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener) } - fn unix_connect(&mut self, _path: &CString) -> IoResult<~RtioPipe> { - Err(unimpl()) + fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe> { + pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs new file mode 100644 index 0000000000000..a6d75d93d6761 --- /dev/null +++ b/src/libnative/io/pipe_unix.rs @@ -0,0 +1,285 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::c_str::CString; +use std::cast; +use std::io; +use std::libc; +use std::mem; +use std::rt::rtio; +use std::sync::arc::UnsafeArc; +use std::unstable::intrinsics; + +use super::{IoResult, retry}; +use super::file::{keep_going, fd_t}; + +fn unix_socket(ty: libc::c_int) -> IoResult { + match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { + -1 => Err(super::last_error()), + fd => Ok(fd) + } +} + +fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> { + // the sun_path length is limited to SUN_LEN (with null) + assert!(mem::size_of::() >= + mem::size_of::()); + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let s: &mut libc::sockaddr_un = unsafe { cast::transmute(&mut storage) }; + + let len = addr.len(); + if len > s.sun_path.len() - 1 { + return Err(io::IoError { + kind: io::InvalidInput, + desc: "path must be smaller than SUN_LEN", + detail: None, + }) + } + s.sun_family = libc::AF_UNIX as libc::sa_family_t; + for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) { + *slot = value; + } + + // count the null terminator + let len = mem::size_of::() + len + 1; + return Ok((storage, len)); +} + +fn sockaddr_to_unix(storage: &libc::sockaddr_storage, + len: uint) -> IoResult { + match storage.ss_family as libc::c_int { + libc::AF_UNIX => { + assert!(len as uint <= mem::size_of::()); + let storage: &libc::sockaddr_un = unsafe { + cast::transmute(storage) + }; + unsafe { + Ok(CString::new(storage.sun_path.as_ptr(), false).clone()) + } + } + _ => Err(io::standard_error(io::InvalidInput)) + } +} + +struct Inner { + fd: fd_t, +} + +impl Drop for Inner { + fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } +} + +fn connect(addr: &CString, ty: libc::c_int) -> IoResult { + let (addr, len) = if_ok!(addr_to_sockaddr_un(addr)); + let inner = Inner { fd: if_ok!(unix_socket(ty)) }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| unsafe { + libc::connect(inner.fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } +} + +fn bind(addr: &CString, ty: libc::c_int) -> IoResult { + let (addr, len) = if_ok!(addr_to_sockaddr_un(addr)); + let inner = Inner { fd: if_ok!(unix_socket(ty)) }; + let addrp = &addr as *libc::sockaddr_storage; + match unsafe { + libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t) + } { + -1 => Err(super::last_error()), + _ => Ok(inner) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + priv inner: UnsafeArc, +} + +impl UnixStream { + pub fn connect(addr: &CString) -> IoResult { + connect(addr, libc::SOCK_STREAM).map(|inner| { + UnixStream { inner: UnsafeArc::new(inner) } + }) + } + + fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } +} + +impl rtio::RtioPipe for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + let ret = retry(|| unsafe { + libc::recv(self.fd(), + buf.as_ptr() as *mut libc::c_void, + buf.len() as libc::size_t, + 0) as libc::c_int + }); + if ret == 0 { + Err(io::standard_error(io::EndOfFile)) + } else if ret < 0 { + Err(super::last_error()) + } else { + Ok(ret as uint) + } + } + + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let ret = keep_going(buf, |buf, len| unsafe { + libc::send(self.fd(), + buf as *mut libc::c_void, + len as libc::size_t, + 0) as i64 + }); + if ret < 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } + + fn clone(&self) -> ~rtio::RtioPipe { + ~UnixStream { inner: self.inner.clone() } as ~rtio::RtioPipe + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Datagram +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixDatagram { + priv inner: UnsafeArc, +} + +impl UnixDatagram { + pub fn connect(addr: &CString) -> IoResult { + connect(addr, libc::SOCK_DGRAM).map(|inner| { + UnixDatagram { inner: UnsafeArc::new(inner) } + }) + } + + pub fn bind(addr: &CString) -> IoResult { + bind(addr, libc::SOCK_DGRAM).map(|inner| { + UnixDatagram { inner: UnsafeArc::new(inner) } + }) + } + + fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } + + pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let storagep = &mut storage as *mut libc::sockaddr_storage; + let mut addrlen: libc::socklen_t = + mem::size_of::() as libc::socklen_t; + let ret = retry(|| unsafe { + libc::recvfrom(self.fd(), + buf.as_ptr() as *mut libc::c_void, + buf.len() as libc::size_t, + 0, + storagep as *mut libc::sockaddr, + &mut addrlen) as libc::c_int + }); + if ret < 0 { return Err(super::last_error()) } + sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { + Ok((ret as uint, addr)) + }) + } + + pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { + let (dst, len) = if_ok!(addr_to_sockaddr_un(dst)); + let dstp = &dst as *libc::sockaddr_storage; + let ret = retry(|| unsafe { + libc::sendto(self.fd(), + buf.as_ptr() as *libc::c_void, + buf.len() as libc::size_t, + 0, + dstp as *libc::sockaddr, + len as libc::socklen_t) as libc::c_int + }); + match ret { + -1 => Err(super::last_error()), + n if n as uint != buf.len() => { + Err(io::IoError { + kind: io::OtherIoError, + desc: "couldn't send entire packet at once", + detail: None, + }) + } + _ => Ok(()) + } + } + + pub fn clone(&mut self) -> UnixDatagram { + UnixDatagram { inner: self.inner.clone() } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + priv inner: Inner, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult { + bind(addr, libc::SOCK_STREAM).map(|fd| UnixListener { inner: fd }) + } + + fn fd(&self) -> fd_t { self.inner.fd } + + pub fn native_listen(self, backlog: int) -> IoResult { + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(super::last_error()), + _ => Ok(UnixAcceptor { listener: self }) + } + } +} + +impl rtio::RtioUnixListener for UnixListener { + fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { + self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor) + } +} + +pub struct UnixAcceptor { + priv listener: UnixListener, +} + +impl UnixAcceptor { + fn fd(&self) -> fd_t { self.listener.fd() } + + pub fn native_accept(&mut self) -> IoResult { + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::(); + let mut size = size as libc::socklen_t; + match retry(|| unsafe { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) { + -1 => Err(super::last_error()), + fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) }) + } + } +} + +impl rtio::RtioUnixAcceptor for UnixAcceptor { + fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { + self.native_accept().map(|s| ~s as ~rtio::RtioPipe) + } +} diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs new file mode 100644 index 0000000000000..83731cc02a6b6 --- /dev/null +++ b/src/libnative/io/pipe_win32.rs @@ -0,0 +1,492 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Named pipes implementation for windows +//! +//! If are unfortunate enough to be reading this code, I would like to first +//! apologize. This was my first encounter with windows named pipes, and it +//! didn't exactly turn out very cleanly. If you, too, are new to named pipes, +//! read on as I'll try to explain some fun things that I ran into. +//! +//! # Unix pipes vs Named pipes +//! +//! As with everything else, named pipes on windows are pretty different from +//! unix pipes on unix. On unix, you use one "server pipe" to accept new client +//! pipes. So long as this server pipe is active, new children pipes can +//! connect. On windows, you instead have a number of "server pipes", and each +//! of these server pipes can throughout their lifetime be attached to a client +//! or not. Once attached to a client, a server pipe may then disconnect at a +//! later date. +//! +//! # Accepting clients +//! +//! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces +//! are built around the unix flavors. This means that we have one "server +//! pipe" to which many clients can connect. In order to make this compatible +//! with the windows model, each connected client consumes ownership of a server +//! pipe, and then a new server pipe is created for the next client. +//! +//! Note that the server pipes attached to clients are never given back to the +//! listener for recycling. This could possibly be implemented with a channel so +//! the listener half can re-use server pipes, but for now I err'd on the simple +//! side of things. Each stream accepted by a listener will destroy the server +//! pipe after the stream is dropped. +//! +//! This model ends up having a small race or two, and you can find more details +//! on the `native_accept` method. +//! +//! # Simultaneous reads and writes +//! +//! In testing, I found that two simultaneous writes and two simultaneous reads +//! on a pipe ended up working out just fine, but problems were encountered when +//! a read was executed simultaneously with a write. After some googling around, +//! it sounded like named pipes just weren't built for this kind of interaction, +//! and the suggested solution was to use overlapped I/O. +//! +//! I don't realy know what overlapped I/O is, but my basic understanding after +//! reading about it is that you have an external Event which is used to signal +//! I/O completion, passed around in some OVERLAPPED structures. As to what this +//! is, I'm not exactly sure. +//! +//! This problem implies that all named pipes are created with the +//! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is +//! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and +//! inside of this structure is a HANDLE from CreateEvent. After the I/O is +//! determined to be pending (may complete in the future), the +//! GetOverlappedResult function is used to block on the event, waiting for the +//! I/O to finish. +//! +//! This scheme ended up working well enough. There were two snags that I ran +//! into, however: +//! +//! * Each UnixStream instance needs its own read/write events to wait on. These +//! can't be shared among clones of the same stream because the documentation +//! states that it unsets the event when the I/O is started (would possibly +//! corrupt other events simultaneously waiting). For convenience's sake, +//! these events are lazily initialized. +//! +//! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition +//! to all pipes created through `connect`. Notably this means that the +//! ConnectNamedPipe function is nonblocking, implying that the Listener needs +//! to have yet another event to do the actual blocking. +//! +//! # Conclusion +//! +//! The conclusion here is that I probably don't know the best way to work with +//! windows named pipes, but the solution here seems to work well enough to get +//! the test suite passing (the suite is in libstd), and that's good enough for +//! me! + +use std::c_str::CString; +use std::libc; +use std::os::win32::as_utf16_p; +use std::ptr; +use std::rt::rtio; +use std::sync::arc::UnsafeArc; +use std::unstable::intrinsics; + +use super::IoResult; + +struct Event(libc::HANDLE); + +impl Event { + fn new(manual_reset: bool, initial_state: bool) -> IoResult { + let event = unsafe { + libc::CreateEventW(ptr::mut_null(), + manual_reset as libc::BOOL, + initial_state as libc::BOOL, + ptr::null()) + }; + if event as uint == 0 { + Err(super::last_error()) + } else { + Ok(Event(event)) + } + } + + fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle } +} + +impl Drop for Event { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle()); } + } +} + +struct Inner { + handle: libc::HANDLE, +} + +impl Drop for Inner { + fn drop(&mut self) { + unsafe { + let _ = libc::FlushFileBuffers(self.handle); + let _ = libc::CloseHandle(self.handle); + } + } +} + +unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE { + libc::CreateNamedPipeW( + name, + libc::PIPE_ACCESS_DUPLEX | + if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} | + libc::FILE_FLAG_OVERLAPPED, + libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT, + libc::PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + ptr::mut_null() + ) +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + priv inner: UnsafeArc, + priv write: Option, + priv read: Option, +} + +impl UnixStream { + fn try_connect(p: *u16) -> Option { + // Note that most of this is lifted from the libuv implementation. + // The idea is that if we fail to open a pipe in read/write mode + // that we try afterwards in just read or just write + let mut result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::GENERIC_WRITE, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + } + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + } + None + } + + pub fn connect(addr: &CString) -> IoResult { + as_utf16_p(addr.as_str().unwrap(), |p| { + loop { + match UnixStream::try_connect(p) { + Some(handle) => { + let inner = Inner { handle: handle }; + let mut mode = libc::PIPE_TYPE_BYTE | + libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT; + let ret = unsafe { + libc::SetNamedPipeHandleState(inner.handle, + &mut mode, + ptr::mut_null(), + ptr::mut_null()) + }; + return if ret == 0 { + Err(super::last_error()) + } else { + Ok(UnixStream { + inner: UnsafeArc::new(inner), + read: None, + write: None, + }) + } + } + None => {} + } + + // On windows, if you fail to connect, you may need to call the + // `WaitNamedPipe` function, and this is indicated with an error + // code of ERROR_PIPE_BUSY. + let code = unsafe { libc::GetLastError() }; + if code as int != libc::ERROR_PIPE_BUSY as int { + return Err(super::last_error()) + } + + // An example I found on microsoft's website used 20 seconds, + // libuv uses 30 seconds, hence we make the obvious choice of + // waiting for 25 seconds. + if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { + return Err(super::last_error()) + } + } + }) + } + + fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } +} + +impl rtio::RtioPipe for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + if self.read.is_none() { + self.read = Some(if_ok!(Event::new(true, false))); + } + + let mut bytes_read = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.read.get_ref().handle(); + + let ret = unsafe { + libc::ReadFile(self.handle(), + buf.as_ptr() as libc::LPVOID, + buf.len() as libc::DWORD, + &mut bytes_read, + &mut overlapped) + }; + if ret == 0 { + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_read, + libc::TRUE) + }; + if ret == 0 { + return Err(super::last_error()) + } + } else { + return Err(super::last_error()) + } + } + + Ok(bytes_read as uint) + } + + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + if self.write.is_none() { + self.write = Some(if_ok!(Event::new(true, false))); + } + + let mut offset = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.write.get_ref().handle(); + + while offset < buf.len() { + let mut bytes_written = 0; + let ret = unsafe { + libc::WriteFile(self.handle(), + buf.slice_from(offset).as_ptr() as libc::LPVOID, + (buf.len() - offset) as libc::DWORD, + &mut bytes_written, + &mut overlapped) + }; + if ret == 0 { + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_written, + libc::TRUE) + }; + if ret == 0 { + return Err(super::last_error()) + } + } else { + return Err(super::last_error()) + } + } + offset += bytes_written as uint; + } + Ok(()) + } + + fn clone(&self) -> ~rtio::RtioPipe { + ~UnixStream { + inner: self.inner.clone(), + read: None, + write: None, + } as ~rtio::RtioPipe + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + priv handle: libc::HANDLE, + priv name: CString, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult { + // Although we technically don't need the pipe until much later, we + // create the initial handle up front to test the validity of the name + // and such. + as_utf16_p(addr.as_str().unwrap(), |p| { + let ret = unsafe { pipe(p, true) }; + if ret == libc::INVALID_HANDLE_VALUE as libc::HANDLE { + Err(super::last_error()) + } else { + Ok(UnixListener { handle: ret, name: addr.clone() }) + } + }) + } + + pub fn native_listen(self) -> IoResult { + Ok(UnixAcceptor { + listener: self, + event: if_ok!(Event::new(true, false)), + }) + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle); } + } +} + +impl rtio::RtioUnixListener for UnixListener { + fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { + self.native_listen().map(|a| ~a as ~rtio::RtioUnixAcceptor) + } +} + +pub struct UnixAcceptor { + priv listener: UnixListener, + priv event: Event, +} + +impl UnixAcceptor { + pub fn native_accept(&mut self) -> IoResult { + // This function has some funky implementation details when working with + // unix pipes. On windows, each server named pipe handle can be + // connected to a one or zero clients. To the best of my knowledge, a + // named server is considered active and present if there exists at + // least one server named pipe for it. + // + // The model of this function is to take the current known server + // handle, connect a client to it, and then transfer ownership to the + // UnixStream instance. The next time accept() is invoked, it'll need a + // different server handle to connect a client to. + // + // Note that there is a possible race here. Once our server pipe is + // handed off to a `UnixStream` object, the stream could be closed, + // meaning that there would be no active server pipes, hence even though + // we have a valid `UnixAcceptor`, no one can connect to it. For this + // reason, we generate the next accept call's server pipe at the end of + // this function call. + // + // This provides us an invariant that we always have at least one server + // connection open at a time, meaning that all connects to this acceptor + // should succeed while this is active. + // + // The actual implementation of doing this is a little tricky. Once a + // server pipe is created, a client can connect to it at any time. I + // assume that which server a client connects to is nondeterministic, so + // we also need to guarantee that the only server able to be connected + // to is the one that we're calling ConnectNamedPipe on. This means that + // we have to create the second server pipe *after* we've already + // accepted a connection. In order to at least somewhat gracefully + // handle errors, this means that if the second server pipe creation + // fails that we disconnect the connected client and then just keep + // using the original server pipe. + let handle = self.listener.handle; + + // Once we've got a "server handle", we need to wait for a client to + // connect. The ConnectNamedPipe function will block this thread until + // someone on the other end connects. This function can "fail" if a + // client connects after we created the pipe but before we got down + // here. Thanks windows. + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.event.handle(); + if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { + let mut err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + let mut transfer = 0; + libc::GetOverlappedResult(handle, + &mut overlapped, + &mut transfer, + libc::TRUE) + }; + if ret == 0 { + err = unsafe { libc::GetLastError() }; + } else { + // we succeeded, bypass the check below + err = libc::ERROR_PIPE_CONNECTED as libc::DWORD; + } + } + if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD { + return Err(super::last_error()) + } + } + + // Now that we've got a connected client to our handle, we need to + // create a second server pipe. If this fails, we disconnect the + // connected client and return an error (see comments above). + let new_handle = as_utf16_p(self.listener.name.as_str().unwrap(), |p| { + unsafe { pipe(p, false) } + }); + if new_handle == libc::INVALID_HANDLE_VALUE as libc::HANDLE { + let ret = Err(super::last_error()); + // If our disconnection fails, then there's not really a whole lot + // that we can do, so fail the task. + let err = unsafe { libc::DisconnectNamedPipe(handle) }; + assert!(err != 0); + return ret; + } else { + self.listener.handle = new_handle; + } + + // Transfer ownership of our handle into this stream + Ok(UnixStream { + inner: UnsafeArc::new(Inner { handle: handle }), + read: None, + write: None, + }) + } +} + +impl rtio::RtioUnixAcceptor for UnixAcceptor { + fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { + self.native_accept().map(|s| ~s as ~rtio::RtioPipe) + } +} + diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs index cf109167089d4..436156a12190b 100644 --- a/src/libstd/io/net/mod.rs +++ b/src/libstd/io/net/mod.rs @@ -14,5 +14,5 @@ pub mod addrinfo; pub mod tcp; pub mod udp; pub mod ip; -#[cfg(unix)] +// FIXME(#12093) - this should not be called unix pub mod unix; diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index 23c01aa635444..a1f3cbbe32643 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -134,7 +134,7 @@ mod tests { use io::*; use io::test::*; - fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { + pub fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { let path1 = next_test_unix(); let path2 = path1.clone(); let (port, chan) = Chan::new(); @@ -149,25 +149,32 @@ mod tests { server(acceptor.accept().unwrap()); } - #[test] - fn bind_error() { - match UnixListener::bind(&("path/to/nowhere")) { + iotest!(fn bind_error() { + let path = "path/to/nowhere"; + match UnixListener::bind(&path) { Ok(..) => fail!(), - Err(e) => assert_eq!(e.kind, PermissionDenied), + Err(e) => { + assert!(e.kind == PermissionDenied || e.kind == FileNotFound || + e.kind == InvalidInput); + } } - } - - #[test] - fn connect_error() { - match UnixStream::connect(&("path/to/nowhere")) { + }) + + iotest!(fn connect_error() { + let path = if cfg!(windows) { + r"\\.\pipe\this_should_not_exist_ever" + } else { + "path/to/nowhere" + }; + match UnixStream::connect(&path) { Ok(..) => fail!(), - Err(e) => assert_eq!(e.kind, - if cfg!(windows) {OtherIoError} else {FileNotFound}) + Err(e) => { + assert!(e.kind == FileNotFound || e.kind == OtherIoError); + } } - } + }) - #[test] - fn smoke() { + iotest!(fn smoke() { smalltest(proc(mut server) { let mut buf = [0]; server.read(buf).unwrap(); @@ -175,10 +182,9 @@ mod tests { }, proc(mut client) { client.write([99]).unwrap(); }) - } + }) - #[test] - fn read_eof() { + iotest!(fn read_eof() { smalltest(proc(mut server) { let mut buf = [0]; assert!(server.read(buf).is_err()); @@ -186,17 +192,18 @@ mod tests { }, proc(_client) { // drop the client }) - } + }) - #[test] - fn write_begone() { + iotest!(fn write_begone() { smalltest(proc(mut server) { let buf = [0]; loop { match server.write(buf) { Ok(..) => {} Err(e) => { - assert!(e.kind == BrokenPipe || e.kind == NotConnected, + assert!(e.kind == BrokenPipe || + e.kind == NotConnected || + e.kind == ConnectionReset, "unknown error {:?}", e); break; } @@ -205,10 +212,9 @@ mod tests { }, proc(_client) { // drop the client }) - } + }) - #[test] - fn accept_lots() { + iotest!(fn accept_lots() { let times = 10; let path1 = next_test_unix(); let path2 = path1.clone(); @@ -218,38 +224,49 @@ mod tests { port.recv(); for _ in range(0, times) { let mut stream = UnixStream::connect(&path2); - stream.write([100]).unwrap(); + match stream.write([100]) { + Ok(..) => {} + Err(e) => fail!("failed write: {}", e) + } } }); - let mut acceptor = UnixListener::bind(&path1).listen(); + let mut acceptor = match UnixListener::bind(&path1).listen() { + Ok(a) => a, + Err(e) => fail!("failed listen: {}", e), + }; chan.send(()); for _ in range(0, times) { let mut client = acceptor.accept(); let mut buf = [0]; - client.read(buf).unwrap(); + match client.read(buf) { + Ok(..) => {} + Err(e) => fail!("failed read/accept: {}", e), + } assert_eq!(buf[0], 100); } - } + }) - #[test] - fn path_exists() { + #[cfg(unix)] + iotest!(fn path_exists() { let path = next_test_unix(); let _acceptor = UnixListener::bind(&path).listen(); assert!(path.exists()); - } + }) - #[test] - fn unix_clone_smoke() { + iotest!(fn unix_clone_smoke() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); spawn(proc() { let mut s = UnixStream::connect(&addr); let mut buf = [0, 0]; + debug!("client reading"); assert_eq!(s.read(buf), Ok(1)); assert_eq!(buf[0], 1); + debug!("client writing"); s.write([2]).unwrap(); + debug!("client dropping"); }); let mut s1 = acceptor.accept().unwrap(); @@ -260,17 +277,20 @@ mod tests { spawn(proc() { let mut s2 = s2; p1.recv(); + debug!("writer writing"); s2.write([1]).unwrap(); + debug!("writer done"); c2.send(()); }); c1.send(()); let mut buf = [0, 0]; + debug!("reader reading"); assert_eq!(s1.read(buf), Ok(1)); + debug!("reader done"); p2.recv(); - } + }) - #[test] - fn unix_clone_two_read() { + iotest!(fn unix_clone_two_read() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); let (p, c) = Chan::new(); @@ -300,10 +320,9 @@ mod tests { c.send(()); p.recv(); - } + }) - #[test] - fn unix_clone_two_write() { + iotest!(fn unix_clone_two_write() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); @@ -326,5 +345,5 @@ mod tests { s1.write([2]).unwrap(); p.recv(); - } + }) } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 7dc4c692f6319..73bf4a1e69a88 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -319,6 +319,10 @@ pub mod types { ai_canonname: *c_char, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_family: sa_family_t, + sun_path: [c_char, ..108] + } } } @@ -691,6 +695,11 @@ pub mod types { ai_addr: *sockaddr, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_len: u8, + sun_family: sa_family_t, + sun_path: [c_char, ..104] + } } } @@ -884,6 +893,10 @@ pub mod types { ai_addr: *sockaddr, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_family: sa_family_t, + sun_path: [c_char, ..108] + } } } @@ -1252,6 +1265,11 @@ pub mod types { ai_addr: *sockaddr, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_len: u8, + sun_family: sa_family_t, + sun_path: [c_char, ..104] + } } } @@ -1605,11 +1623,19 @@ pub mod consts { pub static O_NOINHERIT: c_int = 128; pub static ERROR_SUCCESS : c_int = 0; + pub static ERROR_FILE_NOT_FOUND: c_int = 2; + pub static ERROR_ACCESS_DENIED: c_int = 5; pub static ERROR_INVALID_HANDLE : c_int = 6; + pub static ERROR_BROKEN_PIPE: c_int = 109; pub static ERROR_DISK_FULL : c_int = 112; pub static ERROR_INSUFFICIENT_BUFFER : c_int = 122; + pub static ERROR_INVALID_NAME : c_int = 123; pub static ERROR_ALREADY_EXISTS : c_int = 183; + pub static ERROR_PIPE_BUSY: c_int = 231; + pub static ERROR_NO_DATA: c_int = 232; pub static ERROR_INVALID_ADDRESS : c_int = 487; + pub static ERROR_PIPE_CONNECTED: c_int = 535; + pub static ERROR_IO_PENDING: c_int = 997; pub static ERROR_FILE_INVALID : c_int = 1006; pub static INVALID_HANDLE_VALUE : c_int = -1; @@ -1748,6 +1774,7 @@ pub mod consts { pub static FILE_FLAG_SESSION_AWARE: DWORD = 0x00800000; pub static FILE_FLAG_SEQUENTIAL_SCAN: DWORD = 0x08000000; pub static FILE_FLAG_WRITE_THROUGH: DWORD = 0x80000000; + pub static FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000; pub static FILE_NAME_NORMALIZED: DWORD = 0x0; pub static FILE_NAME_OPENED: DWORD = 0x8; @@ -1761,6 +1788,8 @@ pub mod consts { pub static GENERIC_WRITE: DWORD = 0x40000000; pub static GENERIC_EXECUTE: DWORD = 0x20000000; pub static GENERIC_ALL: DWORD = 0x10000000; + pub static FILE_WRITE_ATTRIBUTES: DWORD = 0x00000100; + pub static FILE_READ_ATTRIBUTES: DWORD = 0x00000080; pub static FILE_BEGIN: DWORD = 0; pub static FILE_CURRENT: DWORD = 1; @@ -1772,6 +1801,19 @@ pub mod consts { pub static DETACHED_PROCESS: DWORD = 0x00000008; pub static CREATE_NEW_PROCESS_GROUP: DWORD = 0x00000200; + + pub static PIPE_ACCESS_DUPLEX: DWORD = 0x00000003; + pub static PIPE_ACCESS_INBOUND: DWORD = 0x00000001; + pub static PIPE_ACCESS_OUTBOUND: DWORD = 0x00000002; + pub static PIPE_TYPE_BYTE: DWORD = 0x00000000; + pub static PIPE_TYPE_MESSAGE: DWORD = 0x00000004; + pub static PIPE_READMODE_BYTE: DWORD = 0x00000000; + pub static PIPE_READMODE_MESSAGE: DWORD = 0x00000002; + pub static PIPE_WAIT: DWORD = 0x00000000; + pub static PIPE_NOWAIT: DWORD = 0x00000001; + pub static PIPE_ACCEPT_REMOTE_CLIENTS: DWORD = 0x00000000; + pub static PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008; + pub static PIPE_UNLIMITED_INSTANCES: DWORD = 255; } pub mod sysconf { } @@ -2310,6 +2352,7 @@ pub mod consts { pub static MADV_UNMERGEABLE : c_int = 13; pub static MADV_HWPOISON : c_int = 100; + pub static AF_UNIX: c_int = 1; pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 10; pub static SOCK_STREAM: c_int = 1; @@ -2761,6 +2804,7 @@ pub mod consts { pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 28; + pub static AF_UNIX: c_int = 1; pub static SOCK_STREAM: c_int = 1; pub static SOCK_DGRAM: c_int = 2; pub static IPPROTO_TCP: c_int = 6; @@ -3137,6 +3181,7 @@ pub mod consts { pub static MINCORE_REFERENCED_OTHER : c_int = 0x8; pub static MINCORE_MODIFIED_OTHER : c_int = 0x10; + pub static AF_UNIX: c_int = 1; pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 30; pub static SOCK_STREAM: c_int = 1; @@ -4153,6 +4198,34 @@ pub mod funcs { lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL; pub fn GetCurrentProcessId() -> DWORD; + pub fn CreateNamedPipeW( + lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPSECURITY_ATTRIBUTES + ) -> HANDLE; + pub fn ConnectNamedPipe(hNamedPipe: HANDLE, + lpOverlapped: LPOVERLAPPED) -> BOOL; + pub fn WaitNamedPipeW(lpNamedPipeName: LPCWSTR, + nTimeOut: DWORD) -> BOOL; + pub fn SetNamedPipeHandleState(hNamedPipe: HANDLE, + lpMode: LPDWORD, + lpMaxCollectionCount: LPDWORD, + lpCollectDataTimeout: LPDWORD) + -> BOOL; + pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR) -> HANDLE; + pub fn GetOverlappedResult(hFile: HANDLE, + lpOverlapped: LPOVERLAPPED, + lpNumberOfBytesTransferred: LPDWORD, + bWait: BOOL) -> BOOL; + pub fn DisconnectNamedPipe(hNamedPipe: HANDLE) -> BOOL; } }