diff --git a/src/libnative/io/file.rs b/src/libnative/io/file.rs index cc5b0770d4d20..25fb2809e764a 100644 --- a/src/libnative/io/file.rs +++ b/src/libnative/io/file.rs @@ -10,6 +10,7 @@ //! Blocking posix-based file I/O +use std::sync::arc::UnsafeArc; use std::c_str::CString; use std::io::IoError; use std::io; @@ -55,9 +56,13 @@ pub fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 { pub type fd_t = libc::c_int; +struct Inner { + fd: fd_t, + close_on_drop: bool, +} + pub struct FileDesc { - priv fd: fd_t, - priv close_on_drop: bool, + priv inner: UnsafeArc } impl FileDesc { @@ -70,7 +75,10 @@ impl FileDesc { /// Note that all I/O operations done on this object will be *blocking*, but /// they do not require the runtime to be active. pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc { - FileDesc { fd: fd, close_on_drop: close_on_drop } + FileDesc { inner: UnsafeArc::new(Inner { + fd: fd, + close_on_drop: close_on_drop + }) } } // FIXME(#10465) these functions should not be public, but anything in @@ -80,7 +88,7 @@ impl FileDesc { #[cfg(windows)] type rlen = libc::c_uint; #[cfg(not(windows))] type rlen = libc::size_t; let ret = retry(|| unsafe { - libc::read(self.fd, + libc::read(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as rlen) as libc::c_int }); @@ -97,7 +105,7 @@ impl FileDesc { #[cfg(not(windows))] type wlen = libc::size_t; let ret = keep_going(buf, |buf, len| { unsafe { - libc::write(self.fd, buf as *libc::c_void, len as wlen) as i64 + libc::write(self.fd(), buf as *libc::c_void, len as wlen) as i64 } }); if ret < 0 { @@ -107,7 +115,11 @@ impl FileDesc { } } - pub fn fd(&self) -> fd_t { self.fd } + pub fn fd(&self) -> fd_t { + // This unsafety is fine because we're just reading off the file + // descriptor, no one is modifying this. + unsafe { (*self.inner.get()).fd } + } } impl io::Reader for FileDesc { @@ -130,7 +142,7 @@ impl rtio::RtioFileStream for FileDesc { self.inner_write(buf) } fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result { - return os_pread(self.fd, buf.as_ptr(), buf.len(), offset); + return os_pread(self.fd(), buf.as_ptr(), buf.len(), offset); #[cfg(windows)] fn os_pread(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult { @@ -162,7 +174,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> { - return os_pwrite(self.fd, buf.as_ptr(), buf.len(), offset); + return os_pwrite(self.fd(), buf.as_ptr(), buf.len(), offset); #[cfg(windows)] fn os_pwrite(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<()> { @@ -197,7 +209,7 @@ impl rtio::RtioFileStream for FileDesc { io::SeekCur => libc::FILE_CURRENT, }; unsafe { - let handle = libc::get_osfhandle(self.fd) as libc::HANDLE; + let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE; let mut newpos = 0; match libc::SetFilePointerEx(handle, pos, &mut newpos, whence) { 0 => Err(super::last_error()), @@ -212,7 +224,7 @@ impl rtio::RtioFileStream for FileDesc { io::SeekEnd => libc::SEEK_END, io::SeekCur => libc::SEEK_CUR, }; - let n = unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) }; + let n = unsafe { libc::lseek(self.fd(), pos as libc::off_t, whence) }; if n < 0 { Err(super::last_error()) } else { @@ -220,7 +232,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn tell(&self) -> Result { - let n = unsafe { libc::lseek(self.fd, 0, libc::SEEK_CUR) }; + let n = unsafe { libc::lseek(self.fd(), 0, libc::SEEK_CUR) }; if n < 0 { Err(super::last_error()) } else { @@ -228,7 +240,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn fsync(&mut self) -> Result<(), IoError> { - return os_fsync(self.fd); + return os_fsync(self.fd()); #[cfg(windows)] fn os_fsync(fd: c_int) -> IoResult<()> { @@ -247,7 +259,7 @@ impl rtio::RtioFileStream for FileDesc { #[cfg(not(windows))] fn datasync(&mut self) -> Result<(), IoError> { - return super::mkerr_libc(os_datasync(self.fd)); + return super::mkerr_libc(os_datasync(self.fd())); #[cfg(target_os = "macos")] fn os_datasync(fd: c_int) -> c_int { @@ -270,7 +282,7 @@ impl rtio::RtioFileStream for FileDesc { Ok(_) => {}, Err(e) => return Err(e), }; let ret = unsafe { - let handle = libc::get_osfhandle(self.fd) as libc::HANDLE; + let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE; match libc::SetEndOfFile(handle) { 0 => Err(super::last_error()), _ => Ok(()) @@ -282,7 +294,7 @@ impl rtio::RtioFileStream for FileDesc { #[cfg(unix)] fn truncate(&mut self, offset: i64) -> Result<(), IoError> { super::mkerr_libc(retry(|| unsafe { - libc::ftruncate(self.fd, offset as libc::off_t) + libc::ftruncate(self.fd(), offset as libc::off_t) })) } } @@ -294,6 +306,9 @@ impl rtio::RtioPipe for FileDesc { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { self.inner_write(buf) } + fn clone(&self) -> ~rtio::RtioPipe { + ~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe + } } impl rtio::RtioTTY for FileDesc { @@ -312,7 +327,7 @@ impl rtio::RtioTTY for FileDesc { fn isatty(&self) -> bool { false } } -impl Drop for FileDesc { +impl Drop for Inner { fn drop(&mut self) { // closing stdio file handles makes no sense, so never do it. Also, note // that errors are ignored when closing a file descriptor. The reason diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index dd916c8f3c4b9..32cd6337f993d 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -14,6 +14,7 @@ use std::io; use std::libc; use std::mem; use std::rt::rtio; +use std::sync::arc::UnsafeArc; use std::unstable::intrinsics; use super::{IoResult, retry}; @@ -108,10 +109,27 @@ fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, let ret = libc::setsockopt(fd, opt, val, payload, mem::size_of::() as libc::socklen_t); - super::mkerr_libc(ret) + if ret != 0 { + Err(last_error()) + } else { + Ok(()) + } } } +#[cfg(windows)] +fn last_error() -> io::IoError { + extern "system" { + fn WSAGetLastError() -> libc::c_int; + } + super::translate_error(unsafe { WSAGetLastError() }, true) +} + +#[cfg(not(windows))] +fn last_error() -> io::IoError { + super::last_error() +} + #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } @@ -128,7 +146,7 @@ fn sockname(fd: sock_t, storage as *mut libc::sockaddr, &mut len as *mut libc::socklen_t); if ret != 0 { - return Err(super::last_error()) + return Err(last_error()) } } return sockaddr_to_addr(&storage, len as uint); @@ -222,7 +240,11 @@ pub fn init() { //////////////////////////////////////////////////////////////////////////////// pub struct TcpStream { - priv fd: sock_t, + priv inner: UnsafeArc, +} + +struct Inner { + fd: sock_t, } impl TcpStream { @@ -231,27 +253,31 @@ impl TcpStream { socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = TcpStream { fd: fd }; + let inner = Inner { fd: fd }; + let ret = TcpStream { inner: UnsafeArc::new(inner) }; match retry(|| { libc::connect(fd, addrp as *libc::sockaddr, len as libc::socklen_t) }) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // This unsafety is fine because it's just a read-only arc + unsafe { (*self.inner.get()).fd } + } fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_NODELAY, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, nodelay as libc::c_int) } fn set_keepalive(&mut self, seconds: Option) -> IoResult<()> { - let ret = setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE, + let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE, seconds.is_some() as libc::c_int); match seconds { Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)), @@ -261,12 +287,12 @@ impl TcpStream { #[cfg(target_os = "macos")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, seconds as libc::c_int) } #[cfg(target_os = "freebsd")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, seconds as libc::c_int) } #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))] @@ -282,7 +308,7 @@ impl rtio::RtioTcpStream for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { let ret = retry(|| { unsafe { - libc::recv(self.fd, + libc::recv(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as wrlen, 0) as libc::c_int @@ -291,7 +317,7 @@ impl rtio::RtioTcpStream for TcpStream { if ret == 0 { Err(io::standard_error(io::EndOfFile)) } else if ret < 0 { - Err(super::last_error()) + Err(last_error()) } else { Ok(ret as uint) } @@ -299,20 +325,20 @@ impl rtio::RtioTcpStream for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { let ret = keep_going(buf, |buf, len| { unsafe { - libc::send(self.fd, + libc::send(self.fd(), buf as *mut libc::c_void, len as wrlen, 0) as i64 } }); if ret < 0 { - Err(super::last_error()) + Err(last_error()) } else { Ok(()) } } fn peer_name(&mut self) -> IoResult { - sockname(self.fd, libc::getpeername) + sockname(self.fd(), libc::getpeername) } fn control_congestion(&mut self) -> IoResult<()> { self.set_nodelay(false) @@ -326,15 +352,19 @@ impl rtio::RtioTcpStream for TcpStream { fn letdie(&mut self) -> IoResult<()> { self.set_keepalive(None) } + + fn clone(&self) -> ~rtio::RtioTcpStream { + ~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream + } } impl rtio::RtioSocket for TcpStream { fn socket_name(&mut self) -> IoResult { - sockname(self.fd, libc::getsockname) + sockname(self.fd(), libc::getsockname) } } -impl Drop for TcpStream { +impl Drop for Inner { fn drop(&mut self) { unsafe { close(self.fd); } } } @@ -343,7 +373,7 @@ impl Drop for TcpStream { //////////////////////////////////////////////////////////////////////////////// pub struct TcpListener { - priv fd: sock_t, + priv inner: UnsafeArc, } impl TcpListener { @@ -352,7 +382,8 @@ impl TcpListener { socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = TcpListener { fd: fd }; + let inner = Inner { fd: fd }; + let ret = TcpListener { inner: UnsafeArc::new(inner) }; // On platforms with Berkeley-derived sockets, this allows // to quickly rebind a socket, without needing to wait for // the OS to clean up the previous one. @@ -366,18 +397,21 @@ impl TcpListener { } match libc::bind(fd, addrp as *libc::sockaddr, len as libc::socklen_t) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // This is just a read-only arc so the unsafety is fine + unsafe { (*self.inner.get()).fd } + } pub fn native_listen(self, backlog: int) -> IoResult { - match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { - -1 => Err(super::last_error()), + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(last_error()), _ => Ok(TcpAcceptor { listener: self }) } } @@ -391,20 +425,16 @@ impl rtio::RtioTcpListener for TcpListener { impl rtio::RtioSocket for TcpListener { fn socket_name(&mut self) -> IoResult { - sockname(self.fd, libc::getsockname) + sockname(self.fd(), libc::getsockname) } } -impl Drop for TcpListener { - fn drop(&mut self) { unsafe { close(self.fd); } } -} - pub struct TcpAcceptor { priv listener: TcpListener, } impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd } + pub fn fd(&self) -> sock_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult { unsafe { @@ -417,8 +447,8 @@ impl TcpAcceptor { storagep as *mut libc::sockaddr, &mut size as *mut libc::socklen_t) as libc::c_int }) as sock_t { - -1 => Err(super::last_error()), - fd => Ok(TcpStream { fd: fd }) + -1 => Err(last_error()), + fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })}) } } } @@ -444,7 +474,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { //////////////////////////////////////////////////////////////////////////////// pub struct UdpSocket { - priv fd: sock_t, + priv inner: UnsafeArc, } impl UdpSocket { @@ -453,25 +483,29 @@ impl UdpSocket { socket(addr, libc::SOCK_DGRAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = UdpSocket { fd: fd }; + let inner = Inner { fd: fd }; + let ret = UdpSocket { inner: UnsafeArc::new(inner) }; match libc::bind(fd, addrp as *libc::sockaddr, len as libc::socklen_t) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // unsafety is fine because it's just a read-only arc + unsafe { (*self.inner.get()).fd } + } pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_BROADCAST, + setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, on as libc::c_int) } pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, on as libc::c_int) } @@ -484,14 +518,14 @@ impl UdpSocket { // interface == INADDR_ANY imr_interface: libc::in_addr { s_addr: 0x0 }, }; - setsockopt(self.fd, libc::IPPROTO_IP, opt, mreq) + setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq) } In6Addr(addr) => { let mreq = libc::ip6_mreq { ipv6mr_multiaddr: addr, ipv6mr_interface: 0, }; - setsockopt(self.fd, libc::IPPROTO_IPV6, opt, mreq) + setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq) } } } @@ -514,14 +548,14 @@ impl rtio::RtioUdpSocket for UdpSocket { let mut addrlen: libc::socklen_t = mem::size_of::() as libc::socklen_t; let ret = retry(|| { - libc::recvfrom(self.fd, + libc::recvfrom(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as msglen_t, 0, storagep as *mut libc::sockaddr, &mut addrlen) as libc::c_int }); - if ret < 0 { return Err(super::last_error()) } + if ret < 0 { return Err(last_error()) } sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { Ok((ret as uint, addr)) }) @@ -532,7 +566,7 @@ impl rtio::RtioUdpSocket for UdpSocket { let dstp = &dst as *libc::sockaddr_storage; unsafe { let ret = retry(|| { - libc::sendto(self.fd, + libc::sendto(self.fd(), buf.as_ptr() as *libc::c_void, buf.len() as msglen_t, 0, @@ -540,7 +574,7 @@ impl rtio::RtioUdpSocket for UdpSocket { len as libc::socklen_t) as libc::c_int }); match ret { - -1 => Err(super::last_error()), + -1 => Err(last_error()), n if n as uint != buf.len() => { Err(io::IoError { kind: io::OtherIoError, @@ -582,11 +616,11 @@ impl rtio::RtioUdpSocket for UdpSocket { } fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, ttl as libc::c_int) } fn time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) } fn hear_broadcasts(&mut self) -> IoResult<()> { @@ -595,8 +629,8 @@ impl rtio::RtioUdpSocket for UdpSocket { fn ignore_broadcasts(&mut self) -> IoResult<()> { self.set_broadcast(false) } -} -impl Drop for UdpSocket { - fn drop(&mut self) { unsafe { close(self.fd) } } + fn clone(&self) -> ~rtio::RtioUdpSocket { + ~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket + } } diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs new file mode 100644 index 0000000000000..9d06593a6eafd --- /dev/null +++ b/src/librustuv/access.rs @@ -0,0 +1,109 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// An exclusive access primitive +/// +/// This primitive is used to gain exclusive access to read() and write() in uv. +/// It is assumed that all invocations of this struct happen on the same thread +/// (the uv event loop). + +use std::cast; +use std::sync::arc::UnsafeArc; +use std::rt::task::{BlockedTask, Task}; +use std::rt::local::Local; + +use homing::HomingMissile; + +pub struct Access { + priv inner: UnsafeArc, +} + +pub struct Guard<'a> { + priv access: &'a mut Access, + priv missile: Option, +} + +struct Inner { + queue: ~[BlockedTask], + held: bool, +} + +impl Access { + pub fn new() -> Access { + Access { + inner: UnsafeArc::new(Inner { + queue: ~[], + held: false, + }) + } + } + + pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> { + // This unsafety is actually OK because the homing missile argument + // guarantees that we're on the same event loop as all the other objects + // attempting to get access granted. + let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) }; + + if inner.held { + let t: ~Task = Local::take(); + t.deschedule(1, |task| { + inner.queue.push(task); + Ok(()) + }); + assert!(inner.held); + } else { + inner.held = true; + } + + Guard { access: self, missile: Some(missile) } + } +} + +impl Clone for Access { + fn clone(&self) -> Access { + Access { inner: self.inner.clone() } + } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + // This guard's homing missile is still armed, so we're guaranteed to be + // on the same I/O event loop, so this unsafety should be ok. + assert!(self.missile.is_some()); + let inner: &mut Inner = unsafe { + cast::transmute(self.access.inner.get()) + }; + + match inner.queue.shift() { + // Here we have found a task that was waiting for access, and we + // current have the "access lock" we need to relinquish access to + // this sleeping task. + // + // To do so, we first drop out homing missile and we then reawaken + // the task. In reawakening the task, it will be immediately + // scheduled on this scheduler. Because we might be woken up on some + // other scheduler, we drop our homing missile before we reawaken + // the task. + Some(task) => { + drop(self.missile.take()); + let _ = task.wake().map(|t| t.reawaken()); + } + None => { inner.held = false; } + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + assert!(!self.held); + assert_eq!(self.queue.len(), 0); + } +} diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs index a2f3457a9430c..25c929c995de7 100644 --- a/src/librustuv/homing.rs +++ b/src/librustuv/homing.rs @@ -125,8 +125,8 @@ pub trait HomingIO { /// After a homing operation has been completed, this will return the current /// task back to its appropriate home (if applicable). The field is used to /// assert that we are where we think we are. -struct HomingMissile { - io_home: uint, +pub struct HomingMissile { + priv io_home: uint, } impl HomingMissile { diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 39d6f851e1722..b463bb7fd733d 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -68,8 +68,10 @@ pub use self::tty::TtyWatcher; mod macros; -mod queue; +mod access; mod homing; +mod queue; +mod rc; /// The implementation of `rtio` for libuv pub mod uvio; diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 5461fc6272d35..7660d2c4f2b3e 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -19,7 +19,9 @@ use std::rt::rtio; use std::rt::task::BlockedTask; use std::unstable::intrinsics; +use access::Access; use homing::{HomingIO, HomeHandle}; +use rc::Refcount; use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, @@ -152,6 +154,14 @@ pub struct TcpWatcher { handle: *uvll::uv_tcp_t, stream: StreamWatcher, home: HomeHandle, + priv refcount: Refcount, + + // libuv can't support concurrent reads and concurrent writes of the same + // stream object, so we use these access guards in order to arbitrate among + // multiple concurrent reads and writes. Note that libuv *can* read and + // write simultaneously, it just can't read and read simultaneously. + priv read_access: Access, + priv write_access: Access, } pub struct TcpListener { @@ -183,6 +193,9 @@ impl TcpWatcher { home: home, handle: handle, stream: StreamWatcher::new(handle), + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), } } @@ -238,12 +251,14 @@ impl rtio::RtioSocket for TcpWatcher { impl rtio::RtioTcpStream for TcpWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.read_access.grant(m); self.stream.read(buf).map_err(uv_error_to_io_error) } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.write_access.grant(m); self.stream.write(buf).map_err(uv_error_to_io_error) } @@ -280,6 +295,17 @@ impl rtio::RtioTcpStream for TcpWatcher { uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint) }) } + + fn clone(&self) -> ~rtio::RtioTcpStream { + ~TcpWatcher { + handle: self.handle, + stream: StreamWatcher::new(self.handle), + home: self.home.clone(), + refcount: self.refcount.clone(), + write_access: self.write_access.clone(), + read_access: self.read_access.clone(), + } as ~rtio::RtioTcpStream + } } impl UvHandle for TcpWatcher { @@ -289,7 +315,9 @@ impl UvHandle for TcpWatcher { impl Drop for TcpWatcher { fn drop(&mut self) { let _m = self.fire_homing_missile(); - self.close(); + if self.refcount.decrement() { + self.close(); + } } } @@ -415,6 +443,11 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { pub struct UdpWatcher { handle: *uvll::uv_udp_t, home: HomeHandle, + + // See above for what these fields are + priv refcount: Refcount, + priv read_access: Access, + priv write_access: Access, } impl UdpWatcher { @@ -423,6 +456,9 @@ impl UdpWatcher { let udp = UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, home: io.make_handle(), + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), }; assert_eq!(unsafe { uvll::uv_udp_init(io.uv_loop(), udp.handle) @@ -463,7 +499,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { buf: Option, result: Option<(ssize_t, Option)>, } - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.read_access.grant(m); let a = match unsafe { uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) @@ -533,7 +570,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> { struct Ctx { task: Option, result: c_int } - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.write_access.grant(m); let mut req = Request::new(uvll::UV_UDP_SEND); let buf = slice_to_uv_buf(buf); @@ -636,13 +674,25 @@ impl rtio::RtioUdpSocket for UdpWatcher { 0 as c_int) }) } + + fn clone(&self) -> ~rtio::RtioUdpSocket { + ~UdpWatcher { + handle: self.handle, + home: self.home.clone(), + refcount: self.refcount.clone(), + write_access: self.write_access.clone(), + read_access: self.read_access.clone(), + } as ~rtio::RtioUdpSocket + } } impl Drop for UdpWatcher { fn drop(&mut self) { // Send ourselves home to close this handle (blocking while doing so). let _m = self.fire_homing_missile(); - self.close(); + if self.refcount.decrement() { + self.close(); + } } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index a021a13e2d98d..c312f112d28b4 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -14,7 +14,9 @@ use std::libc; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; use std::rt::task::BlockedTask; +use access::Access; use homing::{HomingIO, HomeHandle}; +use rc::Refcount; use stream::StreamWatcher; use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, wait_until_woken_after, wakeup}; @@ -25,6 +27,11 @@ pub struct PipeWatcher { stream: StreamWatcher, home: HomeHandle, priv defused: bool, + priv refcount: Refcount, + + // see comments in TcpWatcher for why these exist + priv write_access: Access, + priv read_access: Access, } pub struct PipeListener { @@ -61,6 +68,9 @@ impl PipeWatcher { stream: StreamWatcher::new(handle), home: home, defused: false, + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), } } @@ -118,14 +128,27 @@ impl PipeWatcher { impl RtioPipe for PipeWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.read_access.grant(m); self.stream.read(buf).map_err(uv_error_to_io_error) } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); + let m = self.fire_homing_missile(); + let _g = self.write_access.grant(m); self.stream.write(buf).map_err(uv_error_to_io_error) } + + fn clone(&self) -> ~RtioPipe { + ~PipeWatcher { + stream: StreamWatcher::new(self.stream.handle), + defused: false, + home: self.home.clone(), + refcount: self.refcount.clone(), + read_access: self.read_access.clone(), + write_access: self.write_access.clone(), + } as ~RtioPipe + } } impl HomingIO for PipeWatcher { @@ -138,8 +161,8 @@ impl UvHandle for PipeWatcher { impl Drop for PipeWatcher { fn drop(&mut self) { - if !self.defused { - let _m = self.fire_homing_missile(); + let _m = self.fire_homing_missile(); + if !self.defused && self.refcount.decrement() { self.close(); } } diff --git a/src/librustuv/rc.rs b/src/librustuv/rc.rs new file mode 100644 index 0000000000000..f43cf72236109 --- /dev/null +++ b/src/librustuv/rc.rs @@ -0,0 +1,49 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Simple refcount structure for cloning handles +/// +/// This is meant to be an unintrusive solution to cloning handles in rustuv. +/// The handles themselves shouldn't be sharing memory because there are bits of +/// state in the rust objects which shouldn't be shared across multiple users of +/// the same underlying uv object, hence Rc is not used and this simple counter +/// should suffice. + +use std::sync::arc::UnsafeArc; + +pub struct Refcount { + priv rc: UnsafeArc, +} + +impl Refcount { + /// Creates a new refcount of 1 + pub fn new() -> Refcount { + Refcount { rc: UnsafeArc::new(1) } + } + + fn increment(&self) { + unsafe { *self.rc.get() += 1; } + } + + /// Returns whether the refcount just hit 0 or not + pub fn decrement(&self) -> bool { + unsafe { + *self.rc.get() -= 1; + *self.rc.get() == 0 + } + } +} + +impl Clone for Refcount { + fn clone(&self) -> Refcount { + self.increment(); + Refcount { rc: self.rc.clone() } + } +} diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index a0bdc193d980c..66ceb03082f4b 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -8,11 +8,42 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! TCP network connections +//! +//! This module contains the ability to open a TCP stream to a socket address, +//! as well as creating a socket server to accept incoming connections. The +//! destination and binding addresses can either be an IPv4 or IPv6 address. +//! +//! A TCP connection implements the `Reader` and `Writer` traits, while the TCP +//! listener (socket server) implements the `Listener` and `Acceptor` traits. + +#[deny(missing_doc)]; + +use clone::Clone; use io::net::ip::SocketAddr; -use io::{Reader, Writer, Listener, Acceptor, IoResult}; +use io::{Reader, Writer, Listener, Acceptor}; +use io::IoResult; use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener}; use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; +/// A structure which represents a TCP stream between a local socket and a +/// remote socket. +/// +/// # Example +/// +/// ```rust +/// # #[allow(unused_must_use)]; +/// use std::io::net::tcp::TcpStream; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 }; +/// let mut stream = TcpStream::connect(addr); +/// +/// stream.write([1]); +/// let mut buf = [0]; +/// stream.read(buf); +/// drop(stream); // close the connection +/// ``` pub struct TcpStream { priv obj: ~RtioTcpStream } @@ -22,21 +53,40 @@ impl TcpStream { TcpStream { obj: s } } + /// Creates a TCP connection to a remote socket address. + /// + /// If no error is encountered, then `Ok(stream)` is returned. pub fn connect(addr: SocketAddr) -> IoResult { LocalIo::maybe_raise(|io| { io.tcp_connect(addr).map(TcpStream::new) }) } + /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_name(&mut self) -> IoResult { self.obj.peer_name() } + /// Returns the socket address of the local half of this TCP connection. pub fn socket_name(&mut self) -> IoResult { self.obj.socket_name() } } +impl Clone for TcpStream { + /// Creates a new handle to this TCP stream, allowing for simultaneous reads + /// and writes of this connection. + /// + /// The underlying TCP stream will not be closed until all handles to the + /// stream have been deallocated. All handles will also follow the same + /// stream, but two concurrent reads will not receive the same data. + /// Instead, the first read will receive the first packet received, and the + /// second read will receive the second packet. + fn clone(&self) -> TcpStream { + TcpStream { obj: self.obj.clone() } + } +} + impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.obj.read(buf) } } @@ -45,17 +95,56 @@ impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) } } +/// A structure representing a socket server. This listener is used to create a +/// `TcpAcceptor` which can be used to accept sockets on a local port. +/// +/// # Example +/// +/// ```rust +/// # fn main() {} +/// # fn foo() { +/// # #[allow(unused_must_use, dead_code)]; +/// use std::io::net::tcp::TcpListener; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// use std::io::{Acceptor, Listener}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 }; +/// let listener = TcpListener::bind(addr); +/// +/// // bind the listener to the specified address +/// let mut acceptor = listener.listen(); +/// +/// // accept connections and process them +/// # fn handle_client(_: T) {} +/// for stream in acceptor.incoming() { +/// spawn(proc() { +/// handle_client(stream); +/// }); +/// } +/// +/// // close the socket server +/// drop(acceptor); +/// # } +/// ``` pub struct TcpListener { priv obj: ~RtioTcpListener } impl TcpListener { + /// Creates a new `TcpListener` which will be bound to the specified local + /// socket address. This listener is not ready for accepting connections, + /// `listen` must be called on it before that's possible. + /// + /// Binding with a port number of 0 will request that the OS assigns a port + /// to this listener. The port allocated can be queried via the + /// `socket_name` function. pub fn bind(addr: SocketAddr) -> IoResult { LocalIo::maybe_raise(|io| { io.tcp_bind(addr).map(|l| TcpListener { obj: l }) }) } + /// Returns the local socket address of this listener. pub fn socket_name(&mut self) -> IoResult { self.obj.socket_name() } @@ -67,6 +156,9 @@ impl Listener for TcpListener { } } +/// The accepting half of a TCP socket server. This structure is created through +/// a `TcpListener`'s `listen` method, and this object can be used to accept new +/// `TcpStream` instances. pub struct TcpAcceptor { priv obj: ~RtioTcpAcceptor } @@ -573,4 +665,91 @@ mod test { } let _listener = TcpListener::bind(addr); }) + + iotest!(fn tcp_clone_smoke() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + }) + + iotest!(fn tcp_clone_two_read() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn tcp_clone_two_write() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + }) } + diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 0ef62648afcb7..3c02f56384792 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use clone::Clone; use result::{Ok, Err}; use io::net::ip::SocketAddr; use io::{Reader, Writer, IoResult}; @@ -41,6 +42,19 @@ impl UdpSocket { } } +impl Clone for UdpSocket { + /// Creates a new handle to this UDP socket, allowing for simultaneous reads + /// and writes of the socket. + /// + /// The underlying UDP socket will not be closed until all handles to the + /// socket have been deallocated. Two concurrent reads will not receive the + /// same data. Instead, the first read will receive the first packet + /// received, and the second read will receive the second packet. + fn clone(&self) -> UdpSocket { + UdpSocket { obj: self.obj.clone() } + } +} + pub struct UdpStream { priv socket: UdpSocket, priv connectedTo: SocketAddr @@ -250,4 +264,107 @@ mod test { iotest!(fn socket_name_ip6() { socket_name(next_test_ip6()); }) + + iotest!(fn udp_clone_smoke() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 0]; + assert_eq!(sock2.recvfrom(buf), Ok((1, addr1))); + assert_eq!(buf[0], 1); + sock2.sendto([2], addr1).unwrap(); + }); + + let sock3 = sock1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + p1.recv(); + sock3.sendto([1], addr2).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(sock1.recvfrom(buf), Ok((1, addr2))); + p2.recv(); + }) + + iotest!(fn udp_clone_two_read() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut sock2 = sock2; + sock2.sendto([1], addr1).unwrap(); + p.recv(); + sock2.sendto([2], addr1).unwrap(); + p.recv(); + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + let mut buf = [0, 0]; + sock3.recvfrom(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + sock1.recvfrom(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn udp_clone_two_write() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + let (p, c) = SharedChan::new(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 1]; + + for _ in p.iter() { + match sock2.recvfrom(buf) { + Ok(..) => {} + Err(e) => fail!("failed receive: {}", e), + } + } + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + let c2 = c.clone(); + spawn(proc() { + let mut sock3 = sock3; + match sock3.sendto([1], addr2) { + Ok(..) => c2.send(()), + Err(..) => {} + } + done.send(()); + }); + match sock1.sendto([2], addr2) { + Ok(..) => c.send(()), + Err(..) => {} + } + + p.recv(); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index ce95b987663f7..3c7db9c868618 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -25,6 +25,7 @@ instances as clients. use prelude::*; use c_str::ToCStr; +use clone::Clone; use rt::rtio::{IoFactory, LocalIo, RtioUnixListener}; use rt::rtio::{RtioUnixAcceptor, RtioPipe}; use io::pipe::PipeStream; @@ -62,6 +63,12 @@ impl UnixStream { } } +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream { obj: self.obj.clone() } + } +} + impl Reader for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.obj.read(buf) } } @@ -228,4 +235,93 @@ mod tests { let _acceptor = UnixListener::bind(&path).listen(); assert!(path.exists()); } + + #[test] + fn unix_clone_smoke() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + } + + #[test] + fn unix_clone_two_read() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + } + + #[test] + fn unix_clone_two_write() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + } } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index ca85707149b92..83250bdae7361 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -51,6 +51,12 @@ impl PipeStream { } } +impl Clone for PipeStream { + fn clone(&self) -> PipeStream { + PipeStream { obj: self.obj.clone() } + } +} + impl Reader for PipeStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.obj.read(buf) } } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 11a7b5dd19171..057d618f44490 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -960,6 +960,8 @@ pub mod types { } pub mod extra { use ptr; + use libc::consts::os::extra::{MAX_PROTOCOL_CHAIN, + WSAPROTOCOL_LEN}; use libc::types::common::c95::c_void; use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t}; use libc::types::os::arch::c95::{c_long, c_ulong}; @@ -1106,6 +1108,47 @@ pub mod types { } pub type LPFILETIME = *mut FILETIME; + + pub struct GUID { + Data1: DWORD, + Data2: DWORD, + Data3: DWORD, + Data4: [BYTE, ..8], + } + + struct WSAPROTOCOLCHAIN { + ChainLen: c_int, + ChainEntries: [DWORD, ..MAX_PROTOCOL_CHAIN], + } + + pub type LPWSAPROTOCOLCHAIN = *mut WSAPROTOCOLCHAIN; + + pub struct WSAPROTOCOL_INFO { + dwServiceFlags1: DWORD, + dwServiceFlags2: DWORD, + dwServiceFlags3: DWORD, + dwServiceFlags4: DWORD, + dwProviderFlags: DWORD, + ProviderId: GUID, + dwCatalogEntryId: DWORD, + ProtocolChain: WSAPROTOCOLCHAIN, + iVersion: c_int, + iAddressFamily: c_int, + iMaxSockAddr: c_int, + iMinSockAddr: c_int, + iSocketType: c_int, + iProtocol: c_int, + iProtocolMaxOffset: c_int, + iNetworkByteOrder: c_int, + iSecurityScheme: c_int, + dwMessageSize: DWORD, + dwProviderReserved: DWORD, + szProtocol: [u8, ..WSAPROTOCOL_LEN+1], + } + + pub type LPWSAPROTOCOL_INFO = *mut WSAPROTOCOL_INFO; + + pub type GROUP = c_uint; } } } @@ -1721,6 +1764,10 @@ pub mod consts { pub static FILE_BEGIN: DWORD = 0; pub static FILE_CURRENT: DWORD = 1; pub static FILE_END: DWORD = 2; + + pub static MAX_PROTOCOL_CHAIN: DWORD = 7; + pub static WSAPROTOCOL_LEN: DWORD = 255; + pub static INVALID_SOCKET: DWORD = !0; } pub mod sysconf { } @@ -4098,6 +4145,8 @@ pub mod funcs { lpFrequency: *mut LARGE_INTEGER) -> BOOL; pub fn QueryPerformanceCounter( lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL; + + pub fn GetCurrentProcessId() -> DWORD; } } diff --git a/src/libstd/option.rs b/src/libstd/option.rs index 39b516aeb12a7..7bb29fdfacf65 100644 --- a/src/libstd/option.rs +++ b/src/libstd/option.rs @@ -480,7 +480,6 @@ mod tests { use iter::range; use str::StrSlice; - use util; use kinds::marker; use vec::ImmutableVector; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 35b1e21df0677..8d02048d55cf0 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -203,6 +203,7 @@ pub trait RtioTcpStream : RtioSocket { fn nodelay(&mut self) -> Result<(), IoError>; fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>; fn letdie(&mut self) -> Result<(), IoError>; + fn clone(&self) -> ~RtioTcpStream; } pub trait RtioSocket { @@ -224,6 +225,8 @@ pub trait RtioUdpSocket : RtioSocket { fn hear_broadcasts(&mut self) -> Result<(), IoError>; fn ignore_broadcasts(&mut self) -> Result<(), IoError>; + + fn clone(&self) -> ~RtioUdpSocket; } pub trait RtioTimer { @@ -253,6 +256,7 @@ pub trait RtioProcess { pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> Result; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; + fn clone(&self) -> ~RtioPipe; } pub trait RtioUnixListener { diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 4804de756876f..82957cd93ceb8 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -380,7 +380,6 @@ mod test { use super::{Mutex, MUTEX_INIT}; use rt::thread::Thread; - use task; #[test] fn somke_lock() { diff --git a/src/libstd/util.rs b/src/libstd/util.rs index c075f9b4ba84f..715a10b9112f9 100644 --- a/src/libstd/util.rs +++ b/src/libstd/util.rs @@ -69,7 +69,6 @@ impl Void { mod tests { use super::*; use prelude::*; - use mem::size_of; #[test] fn identity_crisis() { diff --git a/src/libstd/vec.rs b/src/libstd/vec.rs index 4a6a4d54ae3e4..d53c2dceba248 100644 --- a/src/libstd/vec.rs +++ b/src/libstd/vec.rs @@ -4253,7 +4253,7 @@ mod tests { let h = x.mut_last(); assert_eq!(*h.unwrap(), 5); - let mut y: &mut [int] = []; + let y: &mut [int] = []; assert!(y.mut_last().is_none()); } }