diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index 4ec41e9488a0a..98613f885cd45 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -87,13 +87,14 @@ pub use types::common::c95::{FILE, c_void, fpos_t}; pub use types::common::c99::{int8_t, int16_t, int32_t, int64_t}; pub use types::common::c99::{uint8_t, uint16_t, uint32_t, uint64_t}; pub use types::common::posix88::{DIR, dirent_t}; +pub use types::os::common::posix01::{timeval}; pub use types::os::common::bsd44::{addrinfo, in_addr, in6_addr, sockaddr_storage}; pub use types::os::common::bsd44::{ip_mreq, ip6_mreq, sockaddr, sockaddr_un}; pub use types::os::common::bsd44::{sa_family_t, sockaddr_in, sockaddr_in6, socklen_t}; pub use types::os::arch::c95::{c_char, c_double, c_float, c_int, c_uint}; pub use types::os::arch::c95::{c_long, c_short, c_uchar, c_ulong}; pub use types::os::arch::c95::{c_ushort, clock_t, ptrdiff_t}; -pub use types::os::arch::c95::{size_t, time_t}; +pub use types::os::arch::c95::{size_t, time_t, suseconds_t}; pub use types::os::arch::c99::{c_longlong, c_ulonglong}; pub use types::os::arch::c99::{intptr_t, uintptr_t}; pub use types::os::arch::posix88::{dev_t, ino_t, mode_t}; @@ -113,7 +114,7 @@ pub use consts::os::posix88::{STDERR_FILENO, STDIN_FILENO, S_IXUSR}; pub use consts::os::posix88::{STDOUT_FILENO, W_OK, X_OK}; pub use consts::os::bsd44::{AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM}; pub use consts::os::bsd44::{IPPROTO_IP, IPPROTO_IPV6, IPPROTO_TCP, TCP_NODELAY}; -pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE}; +pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR}; pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP}; pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP}; pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP}; @@ -170,14 +171,13 @@ pub use funcs::bsd43::{shutdown}; #[cfg(unix)] pub use consts::os::posix88::{ECONNREFUSED, ECONNRESET, EPERM, EPIPE}; #[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR}; #[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK}; -#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT}; +#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS}; #[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE}; #[cfg(unix)] pub use consts::os::posix01::{SIG_IGN, WNOHANG}; #[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX}; -#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone, timeval}; +#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone}; -#[cfg(unix)] pub use types::os::arch::c95::{suseconds_t}; #[cfg(unix)] pub use types::os::arch::posix88::{uid_t, gid_t}; #[cfg(unix)] pub use types::os::arch::posix01::{pthread_attr_t}; #[cfg(unix)] pub use types::os::arch::posix01::{stat, utimbuf}; @@ -195,6 +195,7 @@ pub use funcs::bsd43::{shutdown}; #[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES}; #[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED}; #[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR}; +#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS}; #[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER}; #[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS}; #[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE}; @@ -1708,6 +1709,7 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 8; pub static SO_BROADCAST: c_int = 32; pub static SO_REUSEADDR: c_int = 4; + pub static SO_ERROR: c_int = 0x1007; pub static SHUT_RD: c_int = 0; pub static SHUT_WR: c_int = 1; @@ -2496,6 +2498,7 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 9; pub static SO_BROADCAST: c_int = 6; pub static SO_REUSEADDR: c_int = 2; + pub static SO_ERROR: c_int = 4; pub static SHUT_RD: c_int = 0; pub static SHUT_WR: c_int = 1; @@ -2954,6 +2957,7 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 0x0008; pub static SO_BROADCAST: c_int = 0x0020; pub static SO_REUSEADDR: c_int = 0x0004; + pub static SO_ERROR: c_int = 0x1007; pub static SHUT_RD: c_int = 0; pub static SHUT_WR: c_int = 1; @@ -3340,6 +3344,7 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 0x0008; pub static SO_BROADCAST: c_int = 0x0020; pub static SO_REUSEADDR: c_int = 0x0004; + pub static SO_ERROR: c_int = 0x1007; pub static SHUT_RD: c_int = 0; pub static SHUT_WR: c_int = 1; diff --git a/src/libnative/io/c_unix.rs b/src/libnative/io/c_unix.rs new file mode 100644 index 0000000000000..e2bf515a1e523 --- /dev/null +++ b/src/libnative/io/c_unix.rs @@ -0,0 +1,76 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! C definitions used by libnative that don't belong in liblibc + +pub use self::select::fd_set; + +use libc; + +#[cfg(target_os = "macos")] +#[cfg(target_os = "freebsd")] +pub static FIONBIO: libc::c_ulong = 0x8004667e; +#[cfg(target_os = "linux")] +#[cfg(target_os = "android")] +pub static FIONBIO: libc::c_ulong = 0x5421; +#[cfg(target_os = "macos")] +#[cfg(target_os = "freebsd")] +pub static FIOCLEX: libc::c_ulong = 0x20006601; +#[cfg(target_os = "linux")] +#[cfg(target_os = "android")] +pub static FIOCLEX: libc::c_ulong = 0x5451; + +extern { + pub fn gettimeofday(timeval: *mut libc::timeval, + tzp: *libc::c_void) -> libc::c_int; + pub fn select(nfds: libc::c_int, + readfds: *fd_set, + writefds: *fd_set, + errorfds: *fd_set, + timeout: *libc::timeval) -> libc::c_int; + pub fn getsockopt(sockfd: libc::c_int, + level: libc::c_int, + optname: libc::c_int, + optval: *mut libc::c_void, + optlen: *mut libc::socklen_t) -> libc::c_int; + pub fn ioctl(fd: libc::c_int, req: libc::c_ulong, ...) -> libc::c_int; + +} + +#[cfg(target_os = "macos")] +mod select { + pub static FD_SETSIZE: uint = 1024; + + pub struct fd_set { + fds_bits: [i32, ..(FD_SETSIZE / 32)] + } + + pub fn fd_set(set: &mut fd_set, fd: i32) { + set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32); + } +} + +#[cfg(target_os = "android")] +#[cfg(target_os = "freebsd")] +#[cfg(target_os = "linux")] +mod select { + use std::uint; + + pub static FD_SETSIZE: uint = 1024; + + pub struct fd_set { + fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)] + } + + pub fn fd_set(set: &mut fd_set, fd: i32) { + let fd = fd as uint; + set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS); + } +} diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs new file mode 100644 index 0000000000000..8d75a6739146d --- /dev/null +++ b/src/libnative/io/c_win32.rs @@ -0,0 +1,62 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! C definitions used by libnative that don't belong in liblibc + +#![allow(type_overflow)] + +use libc; + +pub static WSADESCRIPTION_LEN: uint = 256; +pub static WSASYS_STATUS_LEN: uint = 128; +pub static FIONBIO: libc::c_long = 0x8004667e; +static FD_SETSIZE: uint = 64; + +pub struct WSADATA { + pub wVersion: libc::WORD, + pub wHighVersion: libc::WORD, + pub szDescription: [u8, ..WSADESCRIPTION_LEN + 1], + pub szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1], + pub iMaxSockets: u16, + pub iMaxUdpDg: u16, + pub lpVendorInfo: *u8, +} + +pub type LPWSADATA = *mut WSADATA; + +pub struct fd_set { + fd_count: libc::c_uint, + fd_array: [libc::SOCKET, ..FD_SETSIZE], +} + +pub fn fd_set(set: &mut fd_set, s: libc::SOCKET) { + set.fd_array[set.fd_count as uint] = s; + set.fd_count += 1; +} + +#[link(name = "ws2_32")] +extern "system" { + pub fn WSAStartup(wVersionRequested: libc::WORD, + lpWSAData: LPWSADATA) -> libc::c_int; + pub fn WSAGetLastError() -> libc::c_int; + + pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long, + argp: *mut libc::c_ulong) -> libc::c_int; + pub fn select(nfds: libc::c_int, + readfds: *mut fd_set, + writefds: *mut fd_set, + exceptfds: *mut fd_set, + timeout: *libc::timeval) -> libc::c_int; + pub fn getsockopt(sockfd: libc::SOCKET, + level: libc::c_int, + optname: libc::c_int, + optval: *mut libc::c_char, + optlen: *mut libc::c_int) -> libc::c_int; +} diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 78d17bc8d747f..19cb5c5f1d4f0 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -71,6 +71,9 @@ pub mod pipe; #[path = "pipe_win32.rs"] pub mod pipe; +#[cfg(unix)] #[path = "c_unix.rs"] mod c; +#[cfg(windows)] #[path = "c_win32.rs"] mod c; + mod timer_helper; pub type IoResult = Result; @@ -161,8 +164,9 @@ impl IoFactory { impl rtio::IoFactory for IoFactory { // networking - fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send> { - net::TcpStream::connect(addr).map(|s| ~s as ~RtioTcpStream:Send) + fn tcp_connect(&mut self, addr: SocketAddr, + timeout: Option) -> IoResult<~RtioTcpStream:Send> { + net::TcpStream::connect(addr, timeout).map(|s| ~s as ~RtioTcpStream:Send) } fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send> { net::TcpListener::bind(addr).map(|s| ~s as ~RtioTcpListener:Send) diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 2e64b82a84a31..be597761b1a8f 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -8,15 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc; use std::cast; use std::io::net::ip; use std::io; -use libc; use std::mem; +use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; use super::{IoResult, retry, keep_going}; +use super::c; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings @@ -115,12 +117,26 @@ fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, } } +fn getsockopt(fd: sock_t, opt: libc::c_int, + val: libc::c_int) -> IoResult { + unsafe { + let mut slot: T = mem::init(); + let mut len = mem::size_of::() as libc::socklen_t; + let ret = c::getsockopt(fd, opt, val, + &mut slot as *mut _ as *mut _, + &mut len); + if ret != 0 { + Err(last_error()) + } else { + assert!(len as uint == mem::size_of::()); + Ok(slot) + } + } +} + #[cfg(windows)] fn last_error() -> io::IoError { - extern "system" { - fn WSAGetLastError() -> libc::c_int; - } - io::IoError::from_errno(unsafe { WSAGetLastError() } as uint, true) + io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true) } #[cfg(not(windows))] @@ -197,24 +213,6 @@ pub fn init() {} #[cfg(windows)] pub fn init() { - static WSADESCRIPTION_LEN: uint = 256; - static WSASYS_STATUS_LEN: uint = 128; - struct WSADATA { - wVersion: libc::WORD, - wHighVersion: libc::WORD, - szDescription: [u8, ..WSADESCRIPTION_LEN + 1], - szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1], - iMaxSockets: u16, - iMaxUdpDg: u16, - lpVendorInfo: *u8, - } - type LPWSADATA = *mut WSADATA; - - #[link(name = "ws2_32")] - extern "system" { - fn WSAStartup(wVersionRequested: libc::WORD, - lpWSAData: LPWSADATA) -> libc::c_int; - } unsafe { use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; @@ -223,9 +221,9 @@ pub fn init() { let _guard = LOCK.lock(); if !INITIALIZED { - let mut data: WSADATA = mem::init(); - let ret = WSAStartup(0x202, // version 2.2 - &mut data); + let mut data: c::WSADATA = mem::init(); + let ret = c::WSAStartup(0x202, // version 2.2 + &mut data); assert_eq!(ret, 0); INITIALIZED = true; } @@ -245,22 +243,118 @@ struct Inner { } impl TcpStream { - pub fn connect(addr: ip::SocketAddr) -> IoResult { - unsafe { - socket(addr, libc::SOCK_STREAM).and_then(|fd| { - let (addr, len) = addr_to_sockaddr(addr); - let addrp = &addr as *libc::sockaddr_storage; - let inner = Inner { fd: fd }; - let ret = TcpStream { inner: UnsafeArc::new(inner) }; - match retry(|| { - libc::connect(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { + pub fn connect(addr: ip::SocketAddr, + timeout: Option) -> IoResult { + let fd = try!(socket(addr, libc::SOCK_STREAM)); + let (addr, len) = addr_to_sockaddr(addr); + let inner = Inner { fd: fd }; + let ret = TcpStream { inner: UnsafeArc::new(inner) }; + + let len = len as libc::socklen_t; + let addrp = &addr as *_ as *libc::sockaddr; + match timeout { + Some(timeout) => { + try!(TcpStream::connect_timeout(fd, addrp, len, timeout)); + Ok(ret) + }, + None => { + match retry(|| unsafe { libc::connect(fd, addrp, len) }) { -1 => Err(last_error()), _ => Ok(ret), } + } + } + } + + // See http://developerweb.net/viewtopic.php?id=3196 for where this is + // derived from. + fn connect_timeout(fd: sock_t, + addrp: *libc::sockaddr, + len: libc::socklen_t, + timeout: u64) -> IoResult<()> { + use std::os; + #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; + #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; + #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; + #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; + + // Make sure the call to connect() doesn't block + try!(set_nonblocking(fd, true)); + + let ret = match unsafe { libc::connect(fd, addrp, len) } { + // If the connection is in progress, then we need to wait for it to + // finish (with a timeout). The current strategy for doing this is + // to use select() with a timeout. + -1 if os::errno() as int == INPROGRESS as int || + os::errno() as int == WOULDBLOCK as int => { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + match await(fd, &mut set, timeout) { + 0 => Err(io::IoError { + kind: io::TimedOut, + desc: "connection timed out", + detail: None, + }), + -1 => Err(last_error()), + _ => { + let err: libc::c_int = try!( + getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); + if err == 0 { + Ok(()) + } else { + Err(io::IoError::from_errno(err as uint, true)) + } + } + } + } + + -1 => Err(last_error()), + _ => Ok(()), + }; + + // be sure to turn blocking I/O back on + try!(set_nonblocking(fd, false)); + return ret; + + #[cfg(unix)] + fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { + let set = nb as libc::c_int; + super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) + } + #[cfg(windows)] + fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { + let mut set = nb as libc::c_ulong; + if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { + Err(last_error()) + } else { + Ok(()) + } + } + + #[cfg(unix)] + fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { + let start = ::io::timer::now(); + retry(|| unsafe { + // Recalculate the timeout each iteration (it is generally + // undefined what the value of the 'tv' is after select + // returns EINTR). + let timeout = timeout - (::io::timer::now() - start); + let tv = libc::timeval { + tv_sec: (timeout / 1000) as libc::time_t, + tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t, + }; + c::select(fd + 1, ptr::null(), set as *mut _ as *_, + ptr::null(), &tv) }) } + #[cfg(windows)] + fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { + let tv = libc::timeval { + tv_sec: (timeout / 1000) as libc::time_t, + tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t, + }; + unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) } + } } pub fn fd(&self) -> sock_t { diff --git a/src/libnative/io/process.rs b/src/libnative/io/process.rs index a29a5b631c660..efdab990d1822 100644 --- a/src/libnative/io/process.rs +++ b/src/libnative/io/process.rs @@ -454,7 +454,7 @@ fn spawn_process_os(config: p::ProcessConfig, err_fd: c_int) -> IoResult { use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp}; use libc::funcs::bsd44::getdtablesize; - use libc::c_ulong; + use io::c; mod rustrt { extern { @@ -475,16 +475,7 @@ fn spawn_process_os(config: p::ProcessConfig, } unsafe fn set_cloexec(fd: c_int) { - extern { fn ioctl(fd: c_int, req: c_ulong) -> c_int; } - - #[cfg(target_os = "macos")] - #[cfg(target_os = "freebsd")] - static FIOCLEX: c_ulong = 0x20006601; - #[cfg(target_os = "linux")] - #[cfg(target_os = "android")] - static FIOCLEX: c_ulong = 0x5451; - - let ret = ioctl(fd, FIOCLEX); + let ret = c::ioctl(fd, c::FIOCLEX); assert_eq!(ret, 0); } diff --git a/src/libnative/io/timer_unix.rs b/src/libnative/io/timer_unix.rs index 0a38a6ff0be35..e5d4a6bb02b86 100644 --- a/src/libnative/io/timer_unix.rs +++ b/src/libnative/io/timer_unix.rs @@ -53,8 +53,9 @@ use std::ptr; use std::rt::rtio; use std::sync::atomics; -use io::file::FileDesc; use io::IoResult; +use io::c; +use io::file::FileDesc; use io::timer_helper; pub struct Timer { @@ -84,16 +85,16 @@ pub enum Req { } // returns the current time (in milliseconds) -fn now() -> u64 { +pub fn now() -> u64 { unsafe { let mut now: libc::timeval = mem::init(); - assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0); + assert_eq!(c::gettimeofday(&mut now, ptr::null()), 0); return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000; } } fn helper(input: libc::c_int, messages: Receiver) { - let mut set: imp::fd_set = unsafe { mem::init() }; + let mut set: c::fd_set = unsafe { mem::init() }; let mut fd = FileDesc::new(input, true); let mut timeout: libc::timeval = unsafe { mem::init() }; @@ -150,9 +151,9 @@ fn helper(input: libc::c_int, messages: Receiver) { &timeout as *libc::timeval }; - imp::fd_set(&mut set, input); + c::fd_set(&mut set, input); match unsafe { - imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout) + c::select(input + 1, &set, ptr::null(), ptr::null(), timeout) } { // timed out 0 => signal(&mut active, &mut dead), @@ -283,59 +284,3 @@ impl Drop for Timer { self.inner = Some(self.inner()); } } - -#[cfg(target_os = "macos")] -mod imp { - use libc; - - pub static FD_SETSIZE: uint = 1024; - - pub struct fd_set { - fds_bits: [i32, ..(FD_SETSIZE / 32)] - } - - pub fn fd_set(set: &mut fd_set, fd: i32) { - set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32); - } - - extern { - pub fn select(nfds: libc::c_int, - readfds: *fd_set, - writefds: *fd_set, - errorfds: *fd_set, - timeout: *libc::timeval) -> libc::c_int; - - pub fn gettimeofday(timeval: *mut libc::timeval, - tzp: *libc::c_void) -> libc::c_int; - } -} - -#[cfg(target_os = "android")] -#[cfg(target_os = "freebsd")] -#[cfg(target_os = "linux")] -mod imp { - use libc; - use std::uint; - - pub static FD_SETSIZE: uint = 1024; - - pub struct fd_set { - fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)] - } - - pub fn fd_set(set: &mut fd_set, fd: i32) { - let fd = fd as uint; - set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS); - } - - extern { - pub fn select(nfds: libc::c_int, - readfds: *fd_set, - writefds: *fd_set, - errorfds: *fd_set, - timeout: *libc::timeval) -> libc::c_int; - - pub fn gettimeofday(timeval: *mut libc::timeval, - tzp: *libc::c_void) -> libc::c_int; - } -} diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index f30c04b405b3f..4f1ca0b02d3d3 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -412,6 +412,7 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { uvll::EPIPE => io::BrokenPipe, uvll::ECONNABORTED => io::ConnectionAborted, uvll::EADDRNOTAVAIL => io::ConnectionRefused, + uvll::ECANCELED => io::TimedOut, err => { uvdebug!("uverr.code {}", err as int); // FIXME: Need to map remaining uv error types diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 73454aaf13f7d..cbda25485c737 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -25,6 +25,7 @@ use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, wait_until_woken_after, wakeup}; +use timer::TimerWatcher; use uvio::UvIoFactory; use uvll; @@ -198,10 +199,14 @@ impl TcpWatcher { } } - pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr) - -> Result - { - struct Ctx { status: c_int, task: Option } + pub fn connect(io: &mut UvIoFactory, + address: ip::SocketAddr, + timeout: Option) -> Result { + struct Ctx { + status: c_int, + task: Option, + timer: Option<~TimerWatcher>, + } let tcp = TcpWatcher::new(io); let (addr, _len) = addr_to_sockaddr(address); @@ -215,24 +220,72 @@ impl TcpWatcher { return match result { 0 => { req.defuse(); // uv callback now owns this request - let mut cx = Ctx { status: 0, task: None }; + let mut cx = Ctx { status: -1, task: None, timer: None }; + match timeout { + Some(t) => { + let mut timer = TimerWatcher::new(io); + timer.start(timer_cb, t, 0); + cx.timer = Some(timer); + } + None => {} + } wait_until_woken_after(&mut cx.task, &io.loop_, || { - req.set_data(&cx); + let data = &cx as *_; + match cx.timer { + Some(ref mut timer) => unsafe { timer.set_data(data) }, + None => {} + } + req.set_data(data); }); + // Make sure an erroneously fired callback doesn't have access + // to the context any more. + req.set_data(0 as *int); + + // If we failed because of a timeout, drop the TcpWatcher as + // soon as possible because it's data is now set to null and we + // want to cancel the callback ASAP. match cx.status { 0 => Ok(tcp), - n => Err(UvError(n)), + n => { drop(tcp); Err(UvError(n)) } } } n => Err(UvError(n)) }; + extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { + // Don't close the corresponding tcp request, just wake up the task + // and let RAII take care of the pending watcher. + assert_eq!(status, 0); + let cx: &mut Ctx = unsafe { + &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx) + }; + cx.status = uvll::ECANCELED; + wakeup(&mut cx.task); + } + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + // This callback can be invoked with ECANCELED if the watcher is + // closed by the timeout callback. In that case we just want to free + // the request and be along our merry way. let req = Request::wrap(req); - assert!(status != uvll::ECANCELED); + if status == uvll::ECANCELED { return } + let cx: &mut Ctx = unsafe { req.get_data() }; cx.status = status; - wakeup(&mut cx.task); + match cx.timer { + Some(ref mut t) => t.stop(), + None => {} + } + // Note that the timer callback doesn't cancel the connect request + // (that's the job of uv_close()), so it's possible for this + // callback to get triggered after the timeout callback fires, but + // before the task wakes up. In that case, we did indeed + // successfully connect, but we don't need to wake someone up. We + // updated the status above (correctly so), and the task will pick + // up on this when it wakes up. + if cx.task.is_some() { + wakeup(&mut cx.task); + } } } } @@ -741,7 +794,7 @@ mod test { #[test] fn connect_close_ip4() { - match TcpWatcher::connect(local_loop(), next_test_ip4()) { + match TcpWatcher::connect(local_loop(), next_test_ip4(), None) { Ok(..) => fail!(), Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()), } @@ -749,7 +802,7 @@ mod test { #[test] fn connect_close_ip6() { - match TcpWatcher::connect(local_loop(), next_test_ip6()) { + match TcpWatcher::connect(local_loop(), next_test_ip6(), None) { Ok(..) => fail!(), Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()), } @@ -799,7 +852,7 @@ mod test { }); rx.recv(); - let mut w = match TcpWatcher::connect(local_loop(), addr) { + let mut w = match TcpWatcher::connect(local_loop(), addr, None) { Ok(w) => w, Err(e) => fail!("{:?}", e) }; match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { @@ -835,7 +888,7 @@ mod test { }); rx.recv(); - let mut w = match TcpWatcher::connect(local_loop(), addr) { + let mut w = match TcpWatcher::connect(local_loop(), addr, None) { Ok(w) => w, Err(e) => fail!("{:?}", e) }; match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { @@ -928,7 +981,7 @@ mod test { }); rx.recv(); - let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); + let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap(); let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; while total_bytes_read < MAX { @@ -1036,7 +1089,7 @@ mod test { spawn(proc() { let rx = rx.recv(); - let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); + let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); rx.recv(); @@ -1088,9 +1141,9 @@ mod test { } }); - let mut stream = TcpWatcher::connect(local_loop(), addr); + let mut stream = TcpWatcher::connect(local_loop(), addr, None); while stream.is_err() { - stream = TcpWatcher::connect(local_loop(), addr); + stream = TcpWatcher::connect(local_loop(), addr, None); } stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); } @@ -1115,7 +1168,7 @@ mod test { drop(w.accept().unwrap()); }); rx.recv(); - let _w = TcpWatcher::connect(local_loop(), addr).unwrap(); + let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap(); fail!(); } diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 58008002837d1..3710d97827f28 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -48,15 +48,19 @@ impl TimerWatcher { return me.install(); } - fn start(&mut self, msecs: u64, period: u64) { + pub fn start(&mut self, f: uvll::uv_timer_cb, msecs: u64, period: u64) { assert_eq!(unsafe { - uvll::uv_timer_start(self.handle, timer_cb, msecs, period) + uvll::uv_timer_start(self.handle, f, msecs, period) }, 0) } - fn stop(&mut self) { + pub fn stop(&mut self) { assert_eq!(unsafe { uvll::uv_timer_stop(self.handle) }, 0) } + + pub unsafe fn set_data(&mut self, data: *T) { + uvll::set_data_for_uv_handle(self.handle, data); + } } impl HomingIO for TimerWatcher { @@ -92,7 +96,7 @@ impl RtioTimer for TimerWatcher { self.action = Some(WakeTask); wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || { - self.start(msecs, 0); + self.start(timer_cb, msecs, 0); }); self.stop(); } @@ -106,7 +110,7 @@ impl RtioTimer for TimerWatcher { let _m = self.fire_homing_missile(); self.id += 1; self.stop(); - self.start(msecs, 0); + self.start(timer_cb, msecs, 0); mem::replace(&mut self.action, Some(SendOnce(tx))) }; @@ -122,7 +126,7 @@ impl RtioTimer for TimerWatcher { let _m = self.fire_homing_missile(); self.id += 1; self.stop(); - self.start(msecs, msecs); + self.start(timer_cb, msecs, msecs); mem::replace(&mut self.action, Some(SendMany(tx, self.id))) }; diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 55456bb548e91..3769a1b8d6db4 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -143,10 +143,10 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn tcp_connect(&mut self, addr: SocketAddr) + fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option) -> Result<~rtio::RtioTcpStream:Send, IoError> { - match TcpWatcher::connect(self, addr) { + match TcpWatcher::connect(self, addr, timeout) { Ok(t) => Ok(~t as ~rtio::RtioTcpStream:Send), Err(e) => Err(uv_error_to_io_error(e)), } diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index d8267e472bd0a..9c163523abef5 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -430,6 +430,8 @@ pub enum IoErrorKind { IoUnavailable, /// A parameter was incorrect in a way that caused an I/O error not part of this list. InvalidInput, + /// The I/O operation's timeout expired, causing it to be canceled. + TimedOut, } /// A trait for objects which are byte-oriented streams. Readers are defined by diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 49e6bcff8eb27..4f1e6bd741817 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -22,6 +22,7 @@ use io::IoResult; use io::net::ip::SocketAddr; use io::{Reader, Writer, Listener, Acceptor}; use kinds::Send; +use option::{None, Some}; use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener}; use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; @@ -57,7 +58,21 @@ impl TcpStream { /// If no error is encountered, then `Ok(stream)` is returned. pub fn connect(addr: SocketAddr) -> IoResult { LocalIo::maybe_raise(|io| { - io.tcp_connect(addr).map(TcpStream::new) + io.tcp_connect(addr, None).map(TcpStream::new) + }) + } + + /// Creates a TCP connection to a remote socket address, timing out after + /// the specified number of milliseconds. + /// + /// This is the same as the `connect` method, except that if the timeout + /// specified (in milliseconds) elapses before a connection is made an error + /// will be returned. The error's kind will be `TimedOut`. + #[experimental = "the timeout argument may eventually change types"] + pub fn connect_timeout(addr: SocketAddr, + timeout_ms: u64) -> IoResult { + LocalIo::maybe_raise(|io| { + io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new) }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index cc8356d2b9a04..0f3fc9c21ced0 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -146,7 +146,8 @@ impl<'a> LocalIo<'a> { pub trait IoFactory { // networking - fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send>; + fn tcp_connect(&mut self, addr: SocketAddr, + timeout: Option) -> IoResult<~RtioTcpStream:Send>; fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send>; fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>; fn unix_bind(&mut self, path: &CString) diff --git a/src/test/run-pass/tcp-connect-timeouts.rs b/src/test/run-pass/tcp-connect-timeouts.rs new file mode 100644 index 0000000000000..26f9b2ea6b7b4 --- /dev/null +++ b/src/test/run-pass/tcp-connect-timeouts.rs @@ -0,0 +1,92 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// ignore-pretty +// compile-flags:--test +// exec-env:RUST_TEST_TASKS=1 + +// Tests for the connect_timeout() function on a TcpStream. This runs with only +// one test task to ensure that errors are timeouts, not file descriptor +// exhaustion. + +#![feature(macro_rules, globs)] +#![allow(experimental)] + +extern crate native; +extern crate green; +extern crate rustuv; + +#[cfg(test)] #[start] +fn start(argc: int, argv: **u8) -> int { + green::start(argc, argv, rustuv::event_loop, __test::main) +} + +macro_rules! iotest ( + { fn $name:ident() $b:block $($a:attr)* } => ( + mod $name { + #![allow(unused_imports)] + + use std::io::*; + use std::io::net::tcp::*; + use std::io::test::*; + use std::io; + + fn f() $b + + $($a)* #[test] fn green() { f() } + $($a)* #[test] fn native() { + use native; + let (tx, rx) = channel(); + native::task::spawn(proc() { tx.send(f()) }); + rx.recv(); + } + } + ) +) + +iotest!(fn eventual_timeout() { + use native; + let addr = next_test_ip4(); + + // Use a native task to receive connections because it turns out libuv is + // really good at accepting connections and will likely run out of file + // descriptors before timing out. + let (tx1, rx1) = channel(); + let (_tx2, rx2) = channel::<()>(); + native::task::spawn(proc() { + let _l = TcpListener::bind(addr).unwrap().listen(); + tx1.send(()); + let _ = rx2.recv_opt(); + }); + rx1.recv(); + + let mut v = Vec::new(); + for _ in range(0, 10000) { + match TcpStream::connect_timeout(addr, 100) { + Ok(e) => v.push(e), + Err(ref e) if e.kind == io::TimedOut => return, + Err(e) => fail!("other error: {}", e), + } + } + fail!("never timed out!"); +}) + +iotest!(fn timeout_success() { + let addr = next_test_ip4(); + let _l = TcpListener::bind(addr).unwrap().listen(); + + assert!(TcpStream::connect_timeout(addr, 1000).is_ok()); +}) + +iotest!(fn timeout_error() { + let addr = next_test_ip4(); + + assert!(TcpStream::connect_timeout(addr, 1000).is_err()); +})