Skip to content

std: Add an experimental connect_timeout function #13604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/liblibc/lib.rs
Original file line number Diff line number Diff line change
@@ -87,13 +87,14 @@ pub use types::common::c95::{FILE, c_void, fpos_t};
pub use types::common::c99::{int8_t, int16_t, int32_t, int64_t};
pub use types::common::c99::{uint8_t, uint16_t, uint32_t, uint64_t};
pub use types::common::posix88::{DIR, dirent_t};
pub use types::os::common::posix01::{timeval};
pub use types::os::common::bsd44::{addrinfo, in_addr, in6_addr, sockaddr_storage};
pub use types::os::common::bsd44::{ip_mreq, ip6_mreq, sockaddr, sockaddr_un};
pub use types::os::common::bsd44::{sa_family_t, sockaddr_in, sockaddr_in6, socklen_t};
pub use types::os::arch::c95::{c_char, c_double, c_float, c_int, c_uint};
pub use types::os::arch::c95::{c_long, c_short, c_uchar, c_ulong};
pub use types::os::arch::c95::{c_ushort, clock_t, ptrdiff_t};
pub use types::os::arch::c95::{size_t, time_t};
pub use types::os::arch::c95::{size_t, time_t, suseconds_t};
pub use types::os::arch::c99::{c_longlong, c_ulonglong};
pub use types::os::arch::c99::{intptr_t, uintptr_t};
pub use types::os::arch::posix88::{dev_t, ino_t, mode_t};
@@ -113,7 +114,7 @@ pub use consts::os::posix88::{STDERR_FILENO, STDIN_FILENO, S_IXUSR};
pub use consts::os::posix88::{STDOUT_FILENO, W_OK, X_OK};
pub use consts::os::bsd44::{AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM};
pub use consts::os::bsd44::{IPPROTO_IP, IPPROTO_IPV6, IPPROTO_TCP, TCP_NODELAY};
pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE};
pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR};
pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP};
pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP};
@@ -170,14 +171,13 @@ pub use funcs::bsd43::{shutdown};
#[cfg(unix)] pub use consts::os::posix88::{ECONNREFUSED, ECONNRESET, EPERM, EPIPE};
#[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR};
#[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT};
#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
#[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN, WNOHANG};
#[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};

#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone, timeval};
#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone};

#[cfg(unix)] pub use types::os::arch::c95::{suseconds_t};
#[cfg(unix)] pub use types::os::arch::posix88::{uid_t, gid_t};
#[cfg(unix)] pub use types::os::arch::posix01::{pthread_attr_t};
#[cfg(unix)] pub use types::os::arch::posix01::{stat, utimbuf};
@@ -195,6 +195,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES};
#[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED};
#[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR};
#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS};
#[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER};
#[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS};
#[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE};
@@ -1708,6 +1709,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 8;
pub static SO_BROADCAST: c_int = 32;
pub static SO_REUSEADDR: c_int = 4;
pub static SO_ERROR: c_int = 0x1007;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
@@ -2496,6 +2498,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 9;
pub static SO_BROADCAST: c_int = 6;
pub static SO_REUSEADDR: c_int = 2;
pub static SO_ERROR: c_int = 4;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
@@ -2954,6 +2957,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;
pub static SO_ERROR: c_int = 0x1007;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
@@ -3340,6 +3344,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;
pub static SO_ERROR: c_int = 0x1007;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
76 changes: 76 additions & 0 deletions src/libnative/io/c_unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! C definitions used by libnative that don't belong in liblibc
pub use self::select::fd_set;

use libc;

#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
pub static FIONBIO: libc::c_ulong = 0x8004667e;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
pub static FIONBIO: libc::c_ulong = 0x5421;
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
pub static FIOCLEX: libc::c_ulong = 0x20006601;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
pub static FIOCLEX: libc::c_ulong = 0x5451;

extern {
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn getsockopt(sockfd: libc::c_int,
level: libc::c_int,
optname: libc::c_int,
optval: *mut libc::c_void,
optlen: *mut libc::socklen_t) -> libc::c_int;
pub fn ioctl(fd: libc::c_int, req: libc::c_ulong, ...) -> libc::c_int;

}

#[cfg(target_os = "macos")]
mod select {
pub static FD_SETSIZE: uint = 1024;

pub struct fd_set {
fds_bits: [i32, ..(FD_SETSIZE / 32)]
}

pub fn fd_set(set: &mut fd_set, fd: i32) {
set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32);
}
}

#[cfg(target_os = "android")]
#[cfg(target_os = "freebsd")]
#[cfg(target_os = "linux")]
mod select {
use std::uint;

pub static FD_SETSIZE: uint = 1024;

pub struct fd_set {
fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)]
}

pub fn fd_set(set: &mut fd_set, fd: i32) {
let fd = fd as uint;
set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
}
}
62 changes: 62 additions & 0 deletions src/libnative/io/c_win32.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! C definitions used by libnative that don't belong in liblibc
#![allow(type_overflow)]

use libc;

pub static WSADESCRIPTION_LEN: uint = 256;
pub static WSASYS_STATUS_LEN: uint = 128;
pub static FIONBIO: libc::c_long = 0x8004667e;
static FD_SETSIZE: uint = 64;

pub struct WSADATA {
pub wVersion: libc::WORD,
pub wHighVersion: libc::WORD,
pub szDescription: [u8, ..WSADESCRIPTION_LEN + 1],
pub szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1],
pub iMaxSockets: u16,
pub iMaxUdpDg: u16,
pub lpVendorInfo: *u8,
}

pub type LPWSADATA = *mut WSADATA;

pub struct fd_set {
fd_count: libc::c_uint,
fd_array: [libc::SOCKET, ..FD_SETSIZE],
}

pub fn fd_set(set: &mut fd_set, s: libc::SOCKET) {
set.fd_array[set.fd_count as uint] = s;
set.fd_count += 1;
}

#[link(name = "ws2_32")]
extern "system" {
pub fn WSAStartup(wVersionRequested: libc::WORD,
lpWSAData: LPWSADATA) -> libc::c_int;
pub fn WSAGetLastError() -> libc::c_int;

pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
argp: *mut libc::c_ulong) -> libc::c_int;
pub fn select(nfds: libc::c_int,
readfds: *mut fd_set,
writefds: *mut fd_set,
exceptfds: *mut fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn getsockopt(sockfd: libc::SOCKET,
level: libc::c_int,
optname: libc::c_int,
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;
}
8 changes: 6 additions & 2 deletions src/libnative/io/mod.rs
Original file line number Diff line number Diff line change
@@ -71,6 +71,9 @@ pub mod pipe;
#[path = "pipe_win32.rs"]
pub mod pipe;

#[cfg(unix)] #[path = "c_unix.rs"] mod c;
#[cfg(windows)] #[path = "c_win32.rs"] mod c;

mod timer_helper;

pub type IoResult<T> = Result<T, IoError>;
@@ -161,8 +164,9 @@ impl IoFactory {

impl rtio::IoFactory for IoFactory {
// networking
fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send> {
net::TcpStream::connect(addr).map(|s| ~s as ~RtioTcpStream:Send)
fn tcp_connect(&mut self, addr: SocketAddr,
timeout: Option<u64>) -> IoResult<~RtioTcpStream:Send> {
net::TcpStream::connect(addr, timeout).map(|s| ~s as ~RtioTcpStream:Send)
}
fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send> {
net::TcpListener::bind(addr).map(|s| ~s as ~RtioTcpListener:Send)
168 changes: 131 additions & 37 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
@@ -8,15 +8,17 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use libc;
use std::cast;
use std::io::net::ip;
use std::io;
use libc;
use std::mem;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;

use super::{IoResult, retry, keep_going};
use super::c;

////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
@@ -115,12 +117,26 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
}
}

fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
unsafe {
let mut slot: T = mem::init();
let mut len = mem::size_of::<T>() as libc::socklen_t;
let ret = c::getsockopt(fd, opt, val,
&mut slot as *mut _ as *mut _,
&mut len);
if ret != 0 {
Err(last_error())
} else {
assert!(len as uint == mem::size_of::<T>());
Ok(slot)
}
}
}

#[cfg(windows)]
fn last_error() -> io::IoError {
extern "system" {
fn WSAGetLastError() -> libc::c_int;
}
io::IoError::from_errno(unsafe { WSAGetLastError() } as uint, true)
io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
}

#[cfg(not(windows))]
@@ -197,24 +213,6 @@ pub fn init() {}

#[cfg(windows)]
pub fn init() {
static WSADESCRIPTION_LEN: uint = 256;
static WSASYS_STATUS_LEN: uint = 128;
struct WSADATA {
wVersion: libc::WORD,
wHighVersion: libc::WORD,
szDescription: [u8, ..WSADESCRIPTION_LEN + 1],
szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1],
iMaxSockets: u16,
iMaxUdpDg: u16,
lpVendorInfo: *u8,
}
type LPWSADATA = *mut WSADATA;

#[link(name = "ws2_32")]
extern "system" {
fn WSAStartup(wVersionRequested: libc::WORD,
lpWSAData: LPWSADATA) -> libc::c_int;
}

unsafe {
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
@@ -223,9 +221,9 @@ pub fn init() {

let _guard = LOCK.lock();
if !INITIALIZED {
let mut data: WSADATA = mem::init();
let ret = WSAStartup(0x202, // version 2.2
&mut data);
let mut data: c::WSADATA = mem::init();
let ret = c::WSAStartup(0x202, // version 2.2
&mut data);
assert_eq!(ret, 0);
INITIALIZED = true;
}
@@ -245,22 +243,118 @@ struct Inner {
}

impl TcpStream {
pub fn connect(addr: ip::SocketAddr) -> IoResult<TcpStream> {
unsafe {
socket(addr, libc::SOCK_STREAM).and_then(|fd| {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let inner = Inner { fd: fd };
let ret = TcpStream { inner: UnsafeArc::new(inner) };
match retry(|| {
libc::connect(fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
}) {
pub fn connect(addr: ip::SocketAddr,
timeout: Option<u64>) -> IoResult<TcpStream> {
let fd = try!(socket(addr, libc::SOCK_STREAM));
let (addr, len) = addr_to_sockaddr(addr);
let inner = Inner { fd: fd };
let ret = TcpStream { inner: UnsafeArc::new(inner) };

let len = len as libc::socklen_t;
let addrp = &addr as *_ as *libc::sockaddr;
match timeout {
Some(timeout) => {
try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
Ok(ret)
},
None => {
match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
-1 => Err(last_error()),
_ => Ok(ret),
}
}
}
}

// See http://developerweb.net/viewtopic.php?id=3196 for where this is
// derived from.
fn connect_timeout(fd: sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout: u64) -> IoResult<()> {
use std::os;
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;

// Make sure the call to connect() doesn't block
try!(set_nonblocking(fd, true));

let ret = match unsafe { libc::connect(fd, addrp, len) } {
// If the connection is in progress, then we need to wait for it to
// finish (with a timeout). The current strategy for doing this is
// to use select() with a timeout.
-1 if os::errno() as int == INPROGRESS as int ||
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout) {
0 => Err(io::IoError {
kind: io::TimedOut,
desc: "connection timed out",
detail: None,
}),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
if err == 0 {
Ok(())
} else {
Err(io::IoError::from_errno(err as uint, true))
}
}
}
}

-1 => Err(last_error()),
_ => Ok(()),
};

// be sure to turn blocking I/O back on
try!(set_nonblocking(fd, false));
return ret;

#[cfg(unix)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}

#[cfg(unix)]
fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let start = ::io::timer::now();
retry(|| unsafe {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let timeout = timeout - (::io::timer::now() - start);
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
c::select(fd + 1, ptr::null(), set as *mut _ as *_,
ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) }
}
}

pub fn fd(&self) -> sock_t {
13 changes: 2 additions & 11 deletions src/libnative/io/process.rs
Original file line number Diff line number Diff line change
@@ -454,7 +454,7 @@ fn spawn_process_os(config: p::ProcessConfig,
err_fd: c_int) -> IoResult<SpawnProcessResult> {
use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
use libc::funcs::bsd44::getdtablesize;
use libc::c_ulong;
use io::c;

mod rustrt {
extern {
@@ -475,16 +475,7 @@ fn spawn_process_os(config: p::ProcessConfig,
}

unsafe fn set_cloexec(fd: c_int) {
extern { fn ioctl(fd: c_int, req: c_ulong) -> c_int; }

#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
static FIOCLEX: c_ulong = 0x20006601;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
static FIOCLEX: c_ulong = 0x5451;

let ret = ioctl(fd, FIOCLEX);
let ret = c::ioctl(fd, c::FIOCLEX);
assert_eq!(ret, 0);
}

69 changes: 7 additions & 62 deletions src/libnative/io/timer_unix.rs
Original file line number Diff line number Diff line change
@@ -53,8 +53,9 @@ use std::ptr;
use std::rt::rtio;
use std::sync::atomics;

use io::file::FileDesc;
use io::IoResult;
use io::c;
use io::file::FileDesc;
use io::timer_helper;

pub struct Timer {
@@ -84,16 +85,16 @@ pub enum Req {
}

// returns the current time (in milliseconds)
fn now() -> u64 {
pub fn now() -> u64 {
unsafe {
let mut now: libc::timeval = mem::init();
assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0);
assert_eq!(c::gettimeofday(&mut now, ptr::null()), 0);
return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
}
}

fn helper(input: libc::c_int, messages: Receiver<Req>) {
let mut set: imp::fd_set = unsafe { mem::init() };
let mut set: c::fd_set = unsafe { mem::init() };

let mut fd = FileDesc::new(input, true);
let mut timeout: libc::timeval = unsafe { mem::init() };
@@ -150,9 +151,9 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
&timeout as *libc::timeval
};

imp::fd_set(&mut set, input);
c::fd_set(&mut set, input);
match unsafe {
imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
c::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
} {
// timed out
0 => signal(&mut active, &mut dead),
@@ -283,59 +284,3 @@ impl Drop for Timer {
self.inner = Some(self.inner());
}
}

#[cfg(target_os = "macos")]
mod imp {
use libc;

pub static FD_SETSIZE: uint = 1024;

pub struct fd_set {
fds_bits: [i32, ..(FD_SETSIZE / 32)]
}

pub fn fd_set(set: &mut fd_set, fd: i32) {
set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32);
}

extern {
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;

pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
}
}

#[cfg(target_os = "android")]
#[cfg(target_os = "freebsd")]
#[cfg(target_os = "linux")]
mod imp {
use libc;
use std::uint;

pub static FD_SETSIZE: uint = 1024;

pub struct fd_set {
fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)]
}

pub fn fd_set(set: &mut fd_set, fd: i32) {
let fd = fd as uint;
set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
}

extern {
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;

pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
}
}
1 change: 1 addition & 0 deletions src/librustuv/lib.rs
Original file line number Diff line number Diff line change
@@ -412,6 +412,7 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
uvll::EPIPE => io::BrokenPipe,
uvll::ECONNABORTED => io::ConnectionAborted,
uvll::EADDRNOTAVAIL => io::ConnectionRefused,
uvll::ECANCELED => io::TimedOut,
err => {
uvdebug!("uverr.code {}", err as int);
// FIXME: Need to map remaining uv error types
89 changes: 71 additions & 18 deletions src/librustuv/net.rs
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
wait_until_woken_after, wakeup};
use timer::TimerWatcher;
use uvio::UvIoFactory;
use uvll;

@@ -198,10 +199,14 @@ impl TcpWatcher {
}
}

pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr)
-> Result<TcpWatcher, UvError>
{
struct Ctx { status: c_int, task: Option<BlockedTask> }
pub fn connect(io: &mut UvIoFactory,
address: ip::SocketAddr,
timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
struct Ctx {
status: c_int,
task: Option<BlockedTask>,
timer: Option<~TimerWatcher>,
}

let tcp = TcpWatcher::new(io);
let (addr, _len) = addr_to_sockaddr(address);
@@ -215,24 +220,72 @@ impl TcpWatcher {
return match result {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: 0, task: None };
let mut cx = Ctx { status: -1, task: None, timer: None };
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
cx.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut cx.task, &io.loop_, || {
req.set_data(&cx);
let data = &cx as *_;
match cx.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);

// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match cx.status {
0 => Ok(tcp),
n => Err(UvError(n)),
n => { drop(tcp); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};

extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
assert_eq!(status, 0);
let cx: &mut Ctx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}

extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
if status == uvll::ECANCELED { return }

let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.task);
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
@@ -741,15 +794,15 @@ mod test {

#[test]
fn connect_close_ip4() {
match TcpWatcher::connect(local_loop(), next_test_ip4()) {
match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
}

#[test]
fn connect_close_ip6() {
match TcpWatcher::connect(local_loop(), next_test_ip6()) {
match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
@@ -799,7 +852,7 @@ mod test {
});

rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr) {
let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
@@ -835,7 +888,7 @@ mod test {
});

rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr) {
let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
@@ -928,7 +981,7 @@ mod test {
});

rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
@@ -1036,7 +1089,7 @@ mod test {

spawn(proc() {
let rx = rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
rx.recv();
@@ -1088,9 +1141,9 @@ mod test {
}
});

let mut stream = TcpWatcher::connect(local_loop(), addr);
let mut stream = TcpWatcher::connect(local_loop(), addr, None);
while stream.is_err() {
stream = TcpWatcher::connect(local_loop(), addr);
stream = TcpWatcher::connect(local_loop(), addr, None);
}
stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
}
@@ -1115,7 +1168,7 @@ mod test {
drop(w.accept().unwrap());
});
rx.recv();
let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
fail!();
}

16 changes: 10 additions & 6 deletions src/librustuv/timer.rs
Original file line number Diff line number Diff line change
@@ -48,15 +48,19 @@ impl TimerWatcher {
return me.install();
}

fn start(&mut self, msecs: u64, period: u64) {
pub fn start(&mut self, f: uvll::uv_timer_cb, msecs: u64, period: u64) {
assert_eq!(unsafe {
uvll::uv_timer_start(self.handle, timer_cb, msecs, period)
uvll::uv_timer_start(self.handle, f, msecs, period)
}, 0)
}

fn stop(&mut self) {
pub fn stop(&mut self) {
assert_eq!(unsafe { uvll::uv_timer_stop(self.handle) }, 0)
}

pub unsafe fn set_data<T>(&mut self, data: *T) {
uvll::set_data_for_uv_handle(self.handle, data);
}
}

impl HomingIO for TimerWatcher {
@@ -92,7 +96,7 @@ impl RtioTimer for TimerWatcher {

self.action = Some(WakeTask);
wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
self.start(msecs, 0);
self.start(timer_cb, msecs, 0);
});
self.stop();
}
@@ -106,7 +110,7 @@ impl RtioTimer for TimerWatcher {
let _m = self.fire_homing_missile();
self.id += 1;
self.stop();
self.start(msecs, 0);
self.start(timer_cb, msecs, 0);
mem::replace(&mut self.action, Some(SendOnce(tx)))
};

@@ -122,7 +126,7 @@ impl RtioTimer for TimerWatcher {
let _m = self.fire_homing_missile();
self.id += 1;
self.stop();
self.start(msecs, msecs);
self.start(timer_cb, msecs, msecs);
mem::replace(&mut self.action, Some(SendMany(tx, self.id)))
};

4 changes: 2 additions & 2 deletions src/librustuv/uvio.rs
Original file line number Diff line number Diff line change
@@ -143,10 +143,10 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn tcp_connect(&mut self, addr: SocketAddr)
fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>)
-> Result<~rtio::RtioTcpStream:Send, IoError>
{
match TcpWatcher::connect(self, addr) {
match TcpWatcher::connect(self, addr, timeout) {
Ok(t) => Ok(~t as ~rtio::RtioTcpStream:Send),
Err(e) => Err(uv_error_to_io_error(e)),
}
2 changes: 2 additions & 0 deletions src/libstd/io/mod.rs
Original file line number Diff line number Diff line change
@@ -430,6 +430,8 @@ pub enum IoErrorKind {
IoUnavailable,
/// A parameter was incorrect in a way that caused an I/O error not part of this list.
InvalidInput,
/// The I/O operation's timeout expired, causing it to be canceled.
TimedOut,
}

/// A trait for objects which are byte-oriented streams. Readers are defined by
17 changes: 16 additions & 1 deletion src/libstd/io/net/tcp.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ use io::IoResult;
use io::net::ip::SocketAddr;
use io::{Reader, Writer, Listener, Acceptor};
use kinds::Send;
use option::{None, Some};
use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};

@@ -57,7 +58,21 @@ impl TcpStream {
/// If no error is encountered, then `Ok(stream)` is returned.
pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
LocalIo::maybe_raise(|io| {
io.tcp_connect(addr).map(TcpStream::new)
io.tcp_connect(addr, None).map(TcpStream::new)
})
}

/// Creates a TCP connection to a remote socket address, timing out after
/// the specified number of milliseconds.
///
/// This is the same as the `connect` method, except that if the timeout
/// specified (in milliseconds) elapses before a connection is made an error
/// will be returned. The error's kind will be `TimedOut`.
#[experimental = "the timeout argument may eventually change types"]
pub fn connect_timeout(addr: SocketAddr,
timeout_ms: u64) -> IoResult<TcpStream> {
LocalIo::maybe_raise(|io| {
io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
})
}

3 changes: 2 additions & 1 deletion src/libstd/rt/rtio.rs
Original file line number Diff line number Diff line change
@@ -146,7 +146,8 @@ impl<'a> LocalIo<'a> {

pub trait IoFactory {
// networking
fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send>;
fn tcp_connect(&mut self, addr: SocketAddr,
timeout: Option<u64>) -> IoResult<~RtioTcpStream:Send>;
fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send>;
fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>;
fn unix_bind(&mut self, path: &CString)
92 changes: 92 additions & 0 deletions src/test/run-pass/tcp-connect-timeouts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

// ignore-pretty
// compile-flags:--test
// exec-env:RUST_TEST_TASKS=1

// Tests for the connect_timeout() function on a TcpStream. This runs with only
// one test task to ensure that errors are timeouts, not file descriptor
// exhaustion.

#![feature(macro_rules, globs)]
#![allow(experimental)]

extern crate native;
extern crate green;
extern crate rustuv;

#[cfg(test)] #[start]
fn start(argc: int, argv: **u8) -> int {
green::start(argc, argv, rustuv::event_loop, __test::main)
}

macro_rules! iotest (
{ fn $name:ident() $b:block $($a:attr)* } => (
mod $name {
#![allow(unused_imports)]

use std::io::*;
use std::io::net::tcp::*;
use std::io::test::*;
use std::io;

fn f() $b

$($a)* #[test] fn green() { f() }
$($a)* #[test] fn native() {
use native;
let (tx, rx) = channel();
native::task::spawn(proc() { tx.send(f()) });
rx.recv();
}
}
)
)

iotest!(fn eventual_timeout() {
use native;
let addr = next_test_ip4();

// Use a native task to receive connections because it turns out libuv is
// really good at accepting connections and will likely run out of file
// descriptors before timing out.
let (tx1, rx1) = channel();
let (_tx2, rx2) = channel::<()>();
native::task::spawn(proc() {
let _l = TcpListener::bind(addr).unwrap().listen();
tx1.send(());
let _ = rx2.recv_opt();
});
rx1.recv();

let mut v = Vec::new();
for _ in range(0, 10000) {
match TcpStream::connect_timeout(addr, 100) {
Ok(e) => v.push(e),
Err(ref e) if e.kind == io::TimedOut => return,
Err(e) => fail!("other error: {}", e),
}
}
fail!("never timed out!");
})

iotest!(fn timeout_success() {
let addr = next_test_ip4();
let _l = TcpListener::bind(addr).unwrap().listen();

assert!(TcpStream::connect_timeout(addr, 1000).is_ok());
})

iotest!(fn timeout_error() {
let addr = next_test_ip4();

assert!(TcpStream::connect_timeout(addr, 1000).is_err());
})