diff --git a/src/libcore/core.rc b/src/libcore/core.rc index 96b5e1b781de4..eb94e9ca02869 100644 --- a/src/libcore/core.rc +++ b/src/libcore/core.rc @@ -205,8 +205,11 @@ mod unicode; #[path = "num/cmath.rs"] mod cmath; mod stackwalk; + +// XXX: This shouldn't be pub, and it should be reexported under 'unstable' +// but name resolution doesn't work without it being pub. #[path = "rt/mod.rs"] -mod rt; +pub mod rt; // A curious inner-module that's not exported that contains the binding // 'core' so that macro-expanded references to core::error and such diff --git a/src/libcore/logging.rs b/src/libcore/logging.rs index cea827298af06..b192333999ac4 100644 --- a/src/libcore/logging.rs +++ b/src/libcore/logging.rs @@ -10,17 +10,16 @@ //! Logging -pub mod rustrt { - use libc; - - pub extern { - unsafe fn rust_log_console_on(); - unsafe fn rust_log_console_off(); - unsafe fn rust_log_str(level: u32, - string: *libc::c_char, - size: libc::size_t); - } -} +use option::*; +use either::*; +use rt; +use rt::logging::{Logger, StdErrLogger}; +use io; +use libc; +use repr; +use vec; +use cast; +use str; /// Turns on logging to stdout globally pub fn console_on() { @@ -55,8 +54,46 @@ pub fn log_type(level: u32, object: &T) { let bytes = do io::with_bytes_writer |writer| { repr::write_repr(writer, object); }; + + match rt::context() { + rt::OldTaskContext => { + unsafe { + let len = bytes.len() as libc::size_t; + rustrt::rust_log_str(level, cast::transmute(vec::raw::to_ptr(bytes)), len); + } + } + _ => { + // XXX: Bad allocation + let msg = str::from_bytes(bytes); + newsched_log_str(msg); + } + } +} + +fn newsched_log_str(msg: ~str) { unsafe { - let len = bytes.len() as libc::size_t; - rustrt::rust_log_str(level, transmute(vec::raw::to_ptr(bytes)), len); + match rt::local_services::unsafe_try_borrow_local_services() { + Some(local) => { + // Use the available logger + (*local).logger.log(Left(msg)); + } + None => { + // There is no logger anywhere, just write to stderr + let mut logger = StdErrLogger; + logger.log(Left(msg)); + } + } + } +} + +pub mod rustrt { + use libc; + + pub extern { + unsafe fn rust_log_console_on(); + unsafe fn rust_log_console_off(); + unsafe fn rust_log_str(level: u32, + string: *libc::c_char, + size: libc::size_t); } } diff --git a/src/libcore/macros.rs b/src/libcore/macros.rs index b19a753b71577..fda48b6ffb7d9 100644 --- a/src/libcore/macros.rs +++ b/src/libcore/macros.rs @@ -30,10 +30,24 @@ macro_rules! rtdebug ( ($( $arg:expr),+) => ( $(let _ = $arg)*; ) ) +macro_rules! rtassert ( + ( $arg:expr ) => ( { + if !$arg { + abort!("assertion failed: %s", stringify!($arg)); + } + } ) +) + macro_rules! abort( ($( $msg:expr),+) => ( { rtdebug!($($msg),+); - unsafe { ::libc::abort(); } + do_abort(); + + // NB: This is in a fn to avoid putting the `unsafe` block in a macro, + // which causes spurious 'unnecessary unsafe block' warnings. + fn do_abort() -> ! { + unsafe { ::libc::abort(); } + } } ) ) diff --git a/src/libcore/os.rs b/src/libcore/os.rs index 9129b33fff545..93319efa3b761 100644 --- a/src/libcore/os.rs +++ b/src/libcore/os.rs @@ -722,7 +722,7 @@ pub fn list_dir(p: &Path) -> ~[~str] { use os::win32::{ as_utf16_p }; - use unstable::exchange_alloc::{malloc_raw, free_raw}; + use rt::global_heap::{malloc_raw, free_raw}; #[nolink] extern { unsafe fn rust_list_dir_wfd_size() -> libc::size_t; diff --git a/src/libcore/rt/context.rs b/src/libcore/rt/context.rs index 9c1e566f218f6..9c1612884f044 100644 --- a/src/libcore/rt/context.rs +++ b/src/libcore/rt/context.rs @@ -111,9 +111,9 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: let sp = align_down(sp); let sp = mut_offset(sp, -4); - unsafe { *sp = arg as uint; } + unsafe { *sp = arg as uint }; let sp = mut_offset(sp, -1); - unsafe { *sp = 0; } // The final return address + unsafe { *sp = 0 }; // The final return address regs.esp = sp as u32; regs.eip = fptr as u32; @@ -195,7 +195,7 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: fn align_down(sp: *mut uint) -> *mut uint { unsafe { - let sp = transmute::<*mut uint, uint>(sp); + let sp: uint = transmute(sp); let sp = sp & !(16 - 1); transmute::(sp) } diff --git a/src/libcore/unstable/exchange_alloc.rs b/src/libcore/rt/global_heap.rs similarity index 100% rename from src/libcore/unstable/exchange_alloc.rs rename to src/libcore/rt/global_heap.rs diff --git a/src/libcore/rt/io/file.rs b/src/libcore/rt/io/file.rs index 85dc180452ffc..1f61cf25fbdd4 100644 --- a/src/libcore/rt/io/file.rs +++ b/src/libcore/rt/io/file.rs @@ -10,7 +10,7 @@ use prelude::*; use super::support::PathLike; -use super::{Reader, Writer, Seek, Close}; +use super::{Reader, Writer, Seek}; use super::SeekStyle; /// # XXX @@ -69,10 +69,6 @@ impl Seek for FileStream { fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() } } -impl Close for FileStream { - fn close(&mut self) { fail!() } -} - #[test] #[ignore] fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() { diff --git a/src/libcore/rt/io/mod.rs b/src/libcore/rt/io/mod.rs index fea32bc5b7509..8f56005d0a4f6 100644 --- a/src/libcore/rt/io/mod.rs +++ b/src/libcore/rt/io/mod.rs @@ -238,6 +238,7 @@ Out of scope * How does I/O relate to the Iterator trait? * std::base64 filters * Using conditions is a big unknown since we don't have much experience with them +* Too many uses of OtherIoError */ @@ -252,7 +253,9 @@ pub use self::stdio::println; pub use self::file::FileStream; pub use self::net::ip::IpAddr; +#[cfg(not(stage0))] pub use self::net::tcp::TcpListener; +#[cfg(not(stage0))] pub use self::net::tcp::TcpStream; pub use self::net::udp::UdpStream; @@ -266,6 +269,7 @@ pub mod file; /// Synchronous, non-blocking network I/O. pub mod net { + #[cfg(not(stage0))] pub mod tcp; pub mod udp; pub mod ip; @@ -326,12 +330,14 @@ pub struct IoError { #[deriving(Eq)] pub enum IoErrorKind { + PreviousIoError, + OtherIoError, + EndOfFile, FileNotFound, - FilePermission, + PermissionDenied, ConnectionFailed, Closed, - OtherIoError, - PreviousIoError + ConnectionRefused, } // XXX: Can't put doc comments on macros @@ -383,16 +389,7 @@ pub trait Writer { fn flush(&mut self); } -/// I/O types that may be closed -/// -/// Any further operations performed on a closed resource will raise -/// on `io_error` -pub trait Close { - /// Close the I/O resource - fn close(&mut self); -} - -pub trait Stream: Reader + Writer + Close { } +pub trait Stream: Reader + Writer { } pub enum SeekStyle { /// Seek from the beginning of the stream diff --git a/src/libcore/rt/io/native/file.rs b/src/libcore/rt/io/native/file.rs index e203df815f2f4..31c90336a24c2 100644 --- a/src/libcore/rt/io/native/file.rs +++ b/src/libcore/rt/io/native/file.rs @@ -40,10 +40,6 @@ impl Writer for FileDesc { fn flush(&mut self) { fail!() } } -impl Close for FileDesc { - fn close(&mut self) { fail!() } -} - impl Seek for FileDesc { fn tell(&self) -> u64 { fail!() } @@ -72,10 +68,6 @@ impl Writer for CFile { fn flush(&mut self) { fail!() } } -impl Close for CFile { - fn close(&mut self) { fail!() } -} - impl Seek for CFile { fn tell(&self) -> u64 { fail!() } fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() } diff --git a/src/libcore/rt/io/net/tcp.rs b/src/libcore/rt/io/net/tcp.rs index c95b4344fe75d..addba40b31ebc 100644 --- a/src/libcore/rt/io/net/tcp.rs +++ b/src/libcore/rt/io/net/tcp.rs @@ -8,67 +8,273 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use prelude::*; -use super::super::*; -use super::ip::IpAddr; +use option::{Option, Some, None}; +use result::{Ok, Err}; +use rt::sched::local_sched::unsafe_borrow_io; +use rt::io::net::ip::IpAddr; +use rt::io::{Reader, Writer, Listener}; +use rt::io::io_error; +use rt::rtio::{IoFactory, + RtioTcpListener, RtioTcpListenerObject, + RtioTcpStream, RtioTcpStreamObject}; -pub struct TcpStream; +pub struct TcpStream { + rtstream: ~RtioTcpStreamObject +} impl TcpStream { - pub fn connect(_addr: IpAddr) -> Option { - fail!() + fn new(s: ~RtioTcpStreamObject) -> TcpStream { + TcpStream { + rtstream: s + } + } + + pub fn connect(addr: IpAddr) -> Option { + let stream = unsafe { + rtdebug!("borrowing io to connect"); + let io = unsafe_borrow_io(); + rtdebug!("about to connect"); + (*io).tcp_connect(addr) + }; + + match stream { + Ok(s) => { + Some(TcpStream::new(s)) + } + Err(ioerr) => { + rtdebug!("failed to connect: %?", ioerr); + io_error::cond.raise(ioerr); + return None; + } + } } } impl Reader for TcpStream { - fn read(&mut self, _buf: &mut [u8]) -> Option { fail!() } + fn read(&mut self, buf: &mut [u8]) -> Option { + let bytes_read = self.rtstream.read(buf); + match bytes_read { + Ok(read) => Some(read), + Err(_) => { + abort!("XXX"); + } + } + } fn eof(&mut self) -> bool { fail!() } } impl Writer for TcpStream { - fn write(&mut self, _buf: &[u8]) { fail!() } + fn write(&mut self, buf: &[u8]) { + let res = self.rtstream.write(buf); + match res { + Ok(_) => (), + Err(_) => { + abort!("XXX"); + } + } + } fn flush(&mut self) { fail!() } } -impl Close for TcpStream { - fn close(&mut self) { fail!() } +pub struct TcpListener { + rtlistener: ~RtioTcpListenerObject, } -pub struct TcpListener; - impl TcpListener { - pub fn bind(_addr: IpAddr) -> Option { - fail!() + pub fn bind(addr: IpAddr) -> Option { + let listener = unsafe { (*unsafe_borrow_io()).tcp_bind(addr) }; + match listener { + Ok(l) => { + Some(TcpListener { + rtlistener: l + }) + } + Err(ioerr) => { + io_error::cond.raise(ioerr); + return None; + } + } } } impl Listener for TcpListener { - fn accept(&mut self) -> Option { fail!() } + fn accept(&mut self) -> Option { + let rtstream = self.rtlistener.accept(); + match rtstream { + Ok(s) => { + Some(TcpStream::new(s)) + } + Err(_) => { + abort!("XXX"); + } + } + } } #[cfg(test)] mod test { + use super::*; + use int; + use cell::Cell; + use rt::test::*; + use rt::io::net::ip::Ipv4; + use rt::io::*; #[test] #[ignore] + fn bind_error() { + do run_in_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert!(e.kind == PermissionDenied); + called = true; + }).in { + let addr = Ipv4(0, 0, 0, 0, 1); + let listener = TcpListener::bind(addr); + assert!(listener.is_none()); + } + assert!(called); + } + } + + #[test] + fn connect_error() { + do run_in_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert!(e.kind == ConnectionRefused); + called = true; + }).in { + let addr = Ipv4(0, 0, 0, 0, 1); + let stream = TcpStream::connect(addr); + assert!(stream.is_none()); + } + assert!(called); + } + } + + #[test] fn smoke_test() { - /*do run_in_newsched_task { + do run_in_newsched_task { let addr = next_test_ip4(); - do spawn_immediately { - let listener = TcpListener::bind(addr); - do listener.accept() { + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + let mut stream = listener.accept(); + let mut buf = [0]; + stream.read(buf); + assert!(buf[0] == 99); + } + + do spawntask_immediately { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + } + } + } + + #[test] + fn multiple_connect_serial() { + do run_in_newsched_task { + let addr = next_test_ip4(); + let max = 10; + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + for max.times { + let mut stream = listener.accept(); let mut buf = [0]; - listener.read(buf); + stream.read(buf); assert!(buf[0] == 99); } } - do spawn_immediately { - let stream = TcpStream::connect(addr); - stream.write([99]); + do spawntask_immediately { + for max.times { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + } } - }*/ + } } + + #[test] + fn multiple_connect_interleaved_greedy_schedule() { + do run_in_newsched_task { + let addr = next_test_ip4(); + static MAX: int = 10; + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + for int::range(0, MAX) |i| { + let stream = Cell(listener.accept()); + rtdebug!("accepted"); + // Start another task to handle the connection + do spawntask_immediately { + let mut stream = stream.take(); + let mut buf = [0]; + stream.read(buf); + assert!(buf[0] == i as u8); + rtdebug!("read"); + } + } + } + + connect(0, addr); + + fn connect(i: int, addr: IpAddr) { + if i == MAX { return } + + do spawntask_immediately { + rtdebug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + rtdebug!("writing"); + stream.write([i as u8]); + } + } + } + } + + #[test] #[ignore(reason = "hangs on mac")] + fn multiple_connect_interleaved_lazy_schedule() { + do run_in_newsched_task { + let addr = next_test_ip4(); + static MAX: int = 10; + + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + for int::range(0, MAX) |_| { + let stream = Cell(listener.accept()); + rtdebug!("accepted"); + // Start another task to handle the connection + do spawntask_later { + let mut stream = stream.take(); + let mut buf = [0]; + stream.read(buf); + assert!(buf[0] == 99); + rtdebug!("read"); + } + } + } + + connect(0, addr); + + fn connect(i: int, addr: IpAddr) { + if i == MAX { return } + + do spawntask_later { + rtdebug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + rtdebug!("writing"); + stream.write([99]); + } + } + } + } + } diff --git a/src/libcore/rt/io/net/udp.rs b/src/libcore/rt/io/net/udp.rs index 1f1254a7029f0..bb5457e334dda 100644 --- a/src/libcore/rt/io/net/udp.rs +++ b/src/libcore/rt/io/net/udp.rs @@ -32,10 +32,6 @@ impl Writer for UdpStream { fn flush(&mut self) { fail!() } } -impl Close for UdpStream { - fn close(&mut self) { fail!() } -} - pub struct UdpListener; impl UdpListener { diff --git a/src/libcore/rt/io/net/unix.rs b/src/libcore/rt/io/net/unix.rs index f449a857467cc..b85b7dd059d82 100644 --- a/src/libcore/rt/io/net/unix.rs +++ b/src/libcore/rt/io/net/unix.rs @@ -32,10 +32,6 @@ impl Writer for UnixStream { fn flush(&mut self) { fail!() } } -impl Close for UnixStream { - fn close(&mut self) { fail!() } -} - pub struct UnixListener; impl UnixListener { diff --git a/src/libcore/rt/io/stdio.rs b/src/libcore/rt/io/stdio.rs index 26950986f7a09..247fe9544088b 100644 --- a/src/libcore/rt/io/stdio.rs +++ b/src/libcore/rt/io/stdio.rs @@ -9,7 +9,7 @@ // except according to those terms. use prelude::*; -use super::{Reader, Writer, Close}; +use super::{Reader, Writer}; pub fn stdin() -> StdReader { fail!() } @@ -39,10 +39,6 @@ impl Reader for StdReader { fn eof(&mut self) -> bool { fail!() } } -impl Close for StdReader { - fn close(&mut self) { fail!() } -} - pub struct StdWriter; impl StdWriter { @@ -55,6 +51,3 @@ impl Writer for StdWriter { fn flush(&mut self) { fail!() } } -impl Close for StdWriter { - fn close(&mut self) { fail!() } -} diff --git a/src/libcore/rt/sched/local_sched.rs b/src/libcore/rt/local_sched.rs similarity index 74% rename from src/libcore/rt/sched/local_sched.rs rename to src/libcore/rt/local_sched.rs index a7e02f30e0167..eb35eb7881d39 100644 --- a/src/libcore/rt/sched/local_sched.rs +++ b/src/libcore/rt/local_sched.rs @@ -13,18 +13,21 @@ use prelude::*; use ptr::mut_null; use libc::c_void; -use cast::transmute; +use cast; +use cell::Cell; -use super::Scheduler; -use super::super::rtio::IoFactoryObject; -use tls = super::super::thread_local_storage; -#[cfg(test)] use super::super::uvio::UvEventLoop; +use rt::sched::Scheduler; +use rt::rtio::{EventLoop, IoFactoryObject}; +use tls = rt::thread_local_storage; +use unstable::finally::Finally; + +#[cfg(test)] use rt::uv::uvio::UvEventLoop; /// Give the Scheduler to thread-local storage pub fn put(sched: ~Scheduler) { unsafe { let key = tls_key(); - let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched); + let void_sched: *mut c_void = cast::transmute(sched); tls::set(key, void_sched); } } @@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler { unsafe { let key = tls_key(); let void_sched: *mut c_void = tls::get(key); - assert!(void_sched.is_not_null()); - let sched = transmute::<*mut c_void, ~Scheduler>(void_sched); + rtassert!(void_sched.is_not_null()); + let sched: ~Scheduler = cast::transmute(void_sched); tls::set(key, mut_null()); return sched; } @@ -55,8 +58,18 @@ pub fn exists() -> bool { /// While the scheduler is borrowed it is not available in TLS. pub fn borrow(f: &fn(&mut Scheduler)) { let mut sched = take(); - f(sched); - put(sched); + + // XXX: Need a different abstraction from 'finally' here to avoid unsafety + unsafe { + let unsafe_sched = cast::transmute_mut_region(&mut *sched); + let sched = Cell(sched); + + do (|| { + f(unsafe_sched); + }).finally { + put(sched.take()); + } + } } /// Borrow a mutable reference to the thread-local Scheduler @@ -65,23 +78,22 @@ pub fn borrow(f: &fn(&mut Scheduler)) { /// /// Because this leaves the Scheduler in thread-local storage it is possible /// For the Scheduler pointer to be aliased -pub unsafe fn unsafe_borrow() -> &mut Scheduler { +pub unsafe fn unsafe_borrow() -> *mut Scheduler { let key = tls_key(); let mut void_sched: *mut c_void = tls::get(key); - assert!(void_sched.is_not_null()); + rtassert!(void_sched.is_not_null()); { - let void_sched_ptr = &mut void_sched; - let sched: &mut ~Scheduler = { - transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) - }; - let sched: &mut Scheduler = &mut **sched; + let sched: *mut *mut c_void = &mut void_sched; + let sched: *mut ~Scheduler = sched as *mut ~Scheduler; + let sched: *mut Scheduler = &mut **sched; return sched; } } -pub unsafe fn unsafe_borrow_io() -> &mut IoFactoryObject { +pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject { let sched = unsafe_borrow(); - return sched.event_loop.io().unwrap(); + let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap(); + return io; } fn tls_key() -> tls::Key { @@ -91,7 +103,7 @@ fn tls_key() -> tls::Key { fn maybe_tls_key() -> Option { unsafe { let key: *mut c_void = rust_get_sched_tls_key(); - let key: &mut tls::Key = transmute(key); + let key: &mut tls::Key = cast::transmute(key); let key = *key; // Check that the key has been initialized. @@ -105,7 +117,7 @@ fn maybe_tls_key() -> Option { // another thread. I think this is fine since the only action // they could take if it was initialized would be to check the // thread-local value and see that it's not set. - if key != 0 { + if key != -1 { return Some(key); } else { return None; diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs index bc945707e624f..98bfc2fa1686f 100644 --- a/src/libcore/rt/local_services.rs +++ b/src/libcore/rt/local_services.rs @@ -23,19 +23,19 @@ use libc::{c_void, uintptr_t}; use cast::transmute; use super::sched::local_sched; use super::local_heap::LocalHeap; +use rt::logging::StdErrLogger; pub struct LocalServices { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, - logger: Logger, + logger: StdErrLogger, unwinder: Option, destroyed: bool } pub struct GarbageCollector; pub struct LocalStorage(*c_void, Option<~fn(*c_void)>); -pub struct Logger; pub struct Unwinder { unwinding: bool, @@ -47,7 +47,7 @@ impl LocalServices { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), - logger: Logger, + logger: StdErrLogger, unwinder: Some(Unwinder { unwinding: false }), destroyed: false } @@ -58,7 +58,7 @@ impl LocalServices { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), - logger: Logger, + logger: StdErrLogger, unwinder: None, destroyed: false } @@ -169,19 +169,27 @@ pub fn borrow_local_services(f: &fn(&mut LocalServices)) { } } -pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices { - use cast::transmute_mut_region; - - match local_sched::unsafe_borrow().current_task { +pub unsafe fn unsafe_borrow_local_services() -> *mut LocalServices { + match (*local_sched::unsafe_borrow()).current_task { Some(~ref mut task) => { - transmute_mut_region(&mut task.local_services) + let s: *mut LocalServices = &mut task.local_services; + return s; } None => { - fail!("no local services for schedulers yet") + // Don't fail. Infinite recursion + abort!("no local services for schedulers yet") } } } +pub unsafe fn unsafe_try_borrow_local_services() -> Option<*mut LocalServices> { + if local_sched::exists() { + Some(unsafe_borrow_local_services()) + } else { + None + } +} + #[cfg(test)] mod test { use rt::test::*; @@ -229,4 +237,12 @@ mod test { let _ = r.next(); } } + + #[test] + fn logging() { + do run_in_newsched_task() { + info!("here i am. logging in a newsched task"); + } + } } + diff --git a/src/libcore/rt/logging.rs b/src/libcore/rt/logging.rs new file mode 100644 index 0000000000000..4ed09fd829f22 --- /dev/null +++ b/src/libcore/rt/logging.rs @@ -0,0 +1,38 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use either::*; + +pub trait Logger { + fn log(&mut self, msg: Either<~str, &'static str>); +} + +pub struct StdErrLogger; + +impl Logger for StdErrLogger { + fn log(&mut self, msg: Either<~str, &'static str>) { + use io::{Writer, WriterUtil}; + + let s: &str = match msg { + Left(ref s) => { + let s: &str = *s; + s + } + Right(ref s) => { + let s: &str = *s; + s + } + }; + let dbg = ::libc::STDERR_FILENO as ::io::fd_t; + dbg.write_str(s); + dbg.write_str("\n"); + dbg.flush(); + } +} \ No newline at end of file diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index fbbc82743407c..f04c38f79e800 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -8,40 +8,143 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -/*! The Rust runtime, including the scheduler and I/O interface */ +/*! The Rust Runtime, including the task scheduler and I/O + +The `rt` module provides the private runtime infrastructure necessary +to support core language features like the exchange and local heap, +the garbage collector, logging, local data and unwinding. It also +implements the default task scheduler and task model. Initialization +routines are provided for setting up runtime resources in common +configurations, including that used by `rustc` when generating +executables. + +It is intended that the features provided by `rt` can be factored in a +way such that the core library can be built with different 'profiles' +for different use cases, e.g. excluding the task scheduler. A number +of runtime features though are critical to the functioning of the +language and an implementation must be provided regardless of the +execution environment. + +Of foremost importance is the global exchange heap, in the module +`global_heap`. Very little practical Rust code can be written without +access to the global heap. Unlike most of `rt` the global heap is +truly a global resource and generally operates independently of the +rest of the runtime. + +All other runtime features are 'local', either thread-local or +task-local. Those critical to the functioning of the language are +defined in the module `local_services`. Local services are those which +are expected to be available to Rust code generally but rely on +thread- or task-local state. These currently include the local heap, +the garbage collector, local storage, logging and the stack unwinder. +Local services are primarily implemented for tasks, but may also +be implemented for use outside of tasks. + +The relationship between `rt` and the rest of the core library is +not entirely clear yet and some modules will be moving into or +out of `rt` as development proceeds. + +Several modules in `core` are clients of `rt`: + +* `core::task` - The user-facing interface to the Rust task model. +* `core::task::local_data` - The interface to local data. +* `core::gc` - The garbage collector. +* `core::unstable::lang` - Miscellaneous lang items, some of which rely on `core::rt`. +* `core::condition` - Uses local data. +* `core::cleanup` - Local heap destruction. +* `core::io` - In the future `core::io` will use an `rt` implementation. +* `core::logging` +* `core::pipes` +* `core::comm` +* `core::stackwalk` + +*/ #[doc(hidden)]; use libc::c_char; use ptr::Ptr; -#[path = "sched/mod.rs"] +/// The global (exchange) heap. +pub mod global_heap; + +/// The Scheduler and Task types. mod sched; + +/// Thread-local access to the current Scheduler. +pub mod local_sched; + +/// Synchronous I/O. +#[path = "io/mod.rs"] +pub mod io; + +/// Thread-local implementations of language-critical runtime features like @. +pub mod local_services; + +/// The EventLoop and internal synchronous I/O interface. mod rtio; -pub mod uvll; -mod uvio; + +/// libuv and default rtio implementation. #[path = "uv/mod.rs"] -mod uv; -#[path = "io/mod.rs"] -mod io; +pub mod uv; + // FIXME #5248: The import in `sched` doesn't resolve unless this is pub! +/// Bindings to pthread/windows thread-local storage. pub mod thread_local_storage; + +/// A parallel work-stealing dequeue. mod work_queue; + +/// Stack segments and caching. mod stack; + +/// CPU context swapping. mod context; + +/// Bindings to system threading libraries. mod thread; + +/// The runtime configuration, read from environment variables pub mod env; -pub mod local_services; + +/// The local, managed heap mod local_heap; +/// The Logger trait and implementations +pub mod logging; + /// Tools for testing the runtime #[cfg(test)] pub mod test; +/// Reference counting +pub mod rc; + +/// A simple single-threaded channel type for passing buffered data between +/// scheduler and task context +pub mod tube; + +/// Set up a default runtime configuration, given compiler-supplied arguments. +/// +/// This is invoked by the `start` _language item_ (unstable::lang) to +/// run a Rust executable. +/// +/// # Arguments +/// +/// * `main` - A C-abi function that takes no arguments and returns `c_void`. +/// It is a wrapper around the user-defined `main` function, and will be run +/// in a task. +/// * `argc` & `argv` - The argument vector. On Unix this information is used +/// by os::args. +/// * `crate_map` - Runtime information about the executing crate, mostly for logging +/// +/// # Return value +/// +/// The return value is used as the process return code. 0 on success, 101 on error. pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int { use self::sched::{Scheduler, Task}; - use self::uvio::UvEventLoop; + use self::uv::uvio::UvEventLoop; use sys::Closure; use ptr; use cast; @@ -72,6 +175,8 @@ pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int { /// Possible contexts in which Rust code may be executing. /// Different runtime services are available depending on context. +/// Mostly used for determining if we're using the new scheduler +/// or the old scheduler. #[deriving(Eq)] pub enum RuntimeContext { // Only the exchange heap is available @@ -84,6 +189,7 @@ pub enum RuntimeContext { OldTaskContext } +/// Determine the current RuntimeContext pub fn context() -> RuntimeContext { use task::rt::rust_task; @@ -119,7 +225,7 @@ pub fn context() -> RuntimeContext { fn test_context() { use unstable::run_in_bare_thread; use self::sched::{local_sched, Task}; - use self::uvio::UvEventLoop; + use rt::uv::uvio::UvEventLoop; use cell::Cell; assert!(context() == OldTaskContext); diff --git a/src/libcore/rt/rc.rs b/src/libcore/rt/rc.rs new file mode 100644 index 0000000000000..1c0c8c14fdfa6 --- /dev/null +++ b/src/libcore/rt/rc.rs @@ -0,0 +1,142 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! An owned, task-local, reference counted type +//! +//! # Safety note +//! +//! XXX There is currently no type-system mechanism for enforcing that +//! reference counted types are both allocated on the exchange heap +//! and also non-sendable +//! +//! This doesn't prevent borrowing multiple aliasable mutable pointers + +use ops::Drop; +use clone::Clone; +use libc::c_void; +use cast; + +pub struct RC { + p: *c_void // ~(uint, T) +} + +impl RC { + pub fn new(val: T) -> RC { + unsafe { + let v = ~(1, val); + let p: *c_void = cast::transmute(v); + RC { p: p } + } + } + + fn get_mut_state(&mut self) -> *mut (uint, T) { + unsafe { + let p: &mut ~(uint, T) = cast::transmute(&mut self.p); + let p: *mut (uint, T) = &mut **p; + return p; + } + } + + fn get_state(&self) -> *(uint, T) { + unsafe { + let p: &~(uint, T) = cast::transmute(&self.p); + let p: *(uint, T) = &**p; + return p; + } + } + + pub fn unsafe_borrow_mut(&mut self) -> *mut T { + unsafe { + match *self.get_mut_state() { + (_, ref mut p) => { + let p: *mut T = p; + return p; + } + } + } + } + + pub fn refcount(&self) -> uint { + unsafe { + match *self.get_state() { + (count, _) => count + } + } + } +} + +#[unsafe_destructor] +impl Drop for RC { + fn finalize(&self) { + assert!(self.refcount() > 0); + + unsafe { + // XXX: Mutable finalizer + let this: &mut RC = cast::transmute_mut(self); + + match *this.get_mut_state() { + (ref mut count, _) => { + *count = *count - 1 + } + } + + if this.refcount() == 0 { + let _: ~(uint, T) = cast::transmute(this.p); + } + } + } +} + +impl Clone for RC { + fn clone(&self) -> RC { + unsafe { + // XXX: Mutable clone + let this: &mut RC = cast::transmute_mut(self); + + match *this.get_mut_state() { + (ref mut count, _) => { + *count = *count + 1; + } + } + } + + RC { p: self.p } + } +} + +#[cfg(test)] +mod test { + use super::RC; + + #[test] + fn smoke_test() { + unsafe { + let mut v1 = RC::new(100); + assert!(*v1.unsafe_borrow_mut() == 100); + assert!(v1.refcount() == 1); + + let mut v2 = v1.clone(); + assert!(*v2.unsafe_borrow_mut() == 100); + assert!(v2.refcount() == 2); + + *v2.unsafe_borrow_mut() = 200; + assert!(*v2.unsafe_borrow_mut() == 200); + assert!(*v1.unsafe_borrow_mut() == 200); + + let v3 = v2.clone(); + assert!(v3.refcount() == 3); + { + let _v1 = v1; + let _v2 = v2; + } + assert!(v3.refcount() == 1); + } + } +} diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs index fd64438c61b46..497ff8841b6bd 100644 --- a/src/libcore/rt/rtio.rs +++ b/src/libcore/rt/rtio.rs @@ -11,14 +11,16 @@ use option::*; use result::*; +use rt::io::IoError; use super::io::net::ip::IpAddr; +use rt::uv::uvio; // XXX: ~object doesn't work currently so these are some placeholder // types to use instead -pub type EventLoopObject = super::uvio::UvEventLoop; -pub type IoFactoryObject = super::uvio::UvIoFactory; -pub type StreamObject = super::uvio::UvStream; -pub type TcpListenerObject = super::uvio::UvTcpListener; +pub type EventLoopObject = uvio::UvEventLoop; +pub type IoFactoryObject = uvio::UvIoFactory; +pub type RtioTcpStreamObject = uvio::UvTcpStream; +pub type RtioTcpListenerObject = uvio::UvTcpListener; pub trait EventLoop { fn run(&mut self); @@ -28,15 +30,15 @@ pub trait EventLoop { } pub trait IoFactory { - fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>; - fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>; + fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; + fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; } -pub trait TcpListener { - fn listen(&mut self) -> Option<~StreamObject>; +pub trait RtioTcpListener { + fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; } -pub trait Stream { - fn read(&mut self, buf: &mut [u8]) -> Result; - fn write(&mut self, buf: &[u8]) -> Result<(), ()>; +pub trait RtioTcpStream { + fn read(&mut self, buf: &mut [u8]) -> Result; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched.rs similarity index 92% rename from src/libcore/rt/sched/mod.rs rename to src/libcore/rt/sched.rs index dda1f27550f08..395f9099571a0 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched.rs @@ -19,7 +19,7 @@ use super::context::Context; use super::local_services::LocalServices; use cell::Cell; -#[cfg(test)] use super::uvio::UvEventLoop; +#[cfg(test)] use rt::uv::uvio::UvEventLoop; #[cfg(test)] use unstable::run_in_bare_thread; #[cfg(test)] use int; @@ -106,6 +106,7 @@ pub impl Scheduler { } } + let scheduler = &mut *scheduler; scheduler.event_loop.callback(run_scheduler_once); scheduler.event_loop.run(); } @@ -179,7 +180,7 @@ pub impl Scheduler { // Take pointers to both the task and scheduler's saved registers. unsafe { let sched = local_sched::unsafe_borrow(); - let (sched_context, _, next_task_context) = sched.get_contexts(); + let (sched_context, _, next_task_context) = (*sched).get_contexts(); let next_task_context = next_task_context.unwrap(); // Context switch to the task, restoring it's registers // and saving the scheduler's @@ -187,10 +188,10 @@ pub impl Scheduler { let sched = local_sched::unsafe_borrow(); // The running task should have passed ownership elsewhere - assert!(sched.current_task.is_none()); + assert!((*sched).current_task.is_none()); // Running tasks may have asked us to do some cleanup - sched.run_cleanup_job(); + (*sched).run_cleanup_job(); } } @@ -208,21 +209,25 @@ pub impl Scheduler { rtdebug!("blocking task"); - let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); + unsafe { + let blocked_task = this.current_task.swap_unwrap(); + let f_fake_region = transmute::<&fn(~Task), &fn(~Task)>(f); + let f_opaque = ClosureConverter::from_fn(f_fake_region); + this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); + } local_sched::put(this); - let sched = unsafe { local_sched::unsafe_borrow() }; - let (sched_context, last_task_context, _) = sched.get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); + unsafe { + let sched = local_sched::unsafe_borrow(); + let (sched_context, last_task_context, _) = (*sched).get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); - // We could be executing in a different thread now - let sched = unsafe { local_sched::unsafe_borrow() }; - sched.run_cleanup_job(); + // We could be executing in a different thread now + let sched = local_sched::unsafe_borrow(); + (*sched).run_cleanup_job(); + } } /// Switch directly to another task, without going through the scheduler. @@ -244,14 +249,14 @@ pub impl Scheduler { unsafe { let sched = local_sched::unsafe_borrow(); - let (_, last_task_context, next_task_context) = sched.get_contexts(); + let (_, last_task_context, next_task_context) = (*sched).get_contexts(); let last_task_context = last_task_context.unwrap(); let next_task_context = next_task_context.unwrap(); Context::swap(last_task_context, next_task_context); // We could be executing in a different thread now let sched = local_sched::unsafe_borrow(); - sched.run_cleanup_job(); + (*sched).run_cleanup_job(); } } @@ -356,10 +361,10 @@ pub impl Task { // have asked us to do some cleanup. unsafe { let sched = local_sched::unsafe_borrow(); - sched.run_cleanup_job(); + (*sched).run_cleanup_job(); let sched = local_sched::unsafe_borrow(); - let task = sched.current_task.get_mut_ref(); + let task = (*sched).current_task.get_mut_ref(); // FIXME #6141: shouldn't neet to put `start()` in another closure task.local_services.run(||start()); } diff --git a/src/libcore/rt/stack.rs b/src/libcore/rt/stack.rs index 3a4e9307d3b50..019540ce76b3f 100644 --- a/src/libcore/rt/stack.rs +++ b/src/libcore/rt/stack.rs @@ -11,21 +11,36 @@ use container::Container; use ptr::Ptr; use vec; +use ops::Drop; +use libc::{c_uint, uintptr_t}; pub struct StackSegment { - buf: ~[u8] + buf: ~[u8], + valgrind_id: c_uint } pub impl StackSegment { fn new(size: uint) -> StackSegment { - // Crate a block of uninitialized values - let mut stack = vec::with_capacity(size); unsafe { + // Crate a block of uninitialized values + let mut stack = vec::with_capacity(size); vec::raw::set_len(&mut stack, size); + + let mut stk = StackSegment { + buf: stack, + valgrind_id: 0 + }; + + // XXX: Using the FFI to call a C macro. Slow + stk.valgrind_id = rust_valgrind_stack_register(stk.start(), stk.end()); + return stk; } + } - StackSegment { - buf: stack + /// Point to the low end of the allocated stack + fn start(&self) -> *uint { + unsafe { + vec::raw::to_ptr(self.buf) as *uint } } @@ -37,6 +52,15 @@ pub impl StackSegment { } } +impl Drop for StackSegment { + fn finalize(&self) { + unsafe { + // XXX: Using the FFI to call a C macro. Slow + rust_valgrind_stack_deregister(self.valgrind_id); + } + } +} + pub struct StackPool(()); impl StackPool { @@ -49,3 +73,8 @@ impl StackPool { fn give_segment(&self, _stack: StackSegment) { } } + +extern { + fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint; + fn rust_valgrind_stack_deregister(id: c_uint); +} diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index 0c6843c605d15..8d0ae0caf4d62 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -19,7 +19,7 @@ use rt::local_services::LocalServices; pub fn run_in_newsched_task(f: ~fn()) { use unstable::run_in_bare_thread; use super::sched::Task; - use super::uvio::UvEventLoop; + use rt::uv::uvio::UvEventLoop; let f = Cell(f); @@ -64,6 +64,46 @@ pub fn spawntask_immediately(f: ~fn()) { } } +/// Create a new task and run it right now. Aborts on failure +pub fn spawntask_later(f: ~fn()) { + use super::sched::*; + + let mut sched = local_sched::take(); + let task = ~Task::with_local(&mut sched.stack_pool, + LocalServices::without_unwinding(), + f); + + sched.task_queue.push_front(task); + local_sched::put(sched); +} + +/// Spawn a task and either run it immediately or run it later +pub fn spawntask_random(f: ~fn()) { + use super::sched::*; + use rand::{Rand, rng}; + + let mut rng = rng(); + let run_now: bool = Rand::rand(&mut rng); + + let mut sched = local_sched::take(); + let task = ~Task::with_local(&mut sched.stack_pool, + LocalServices::without_unwinding(), + f); + + if run_now { + do sched.switch_running_tasks_and_then(task) |task| { + let task = Cell(task); + do local_sched::borrow |sched| { + sched.task_queue.push_front(task.take()); + } + } + } else { + sched.task_queue.push_front(task); + local_sched::put(sched); + } +} + + /// Spawn a task and wait for it to finish, returning whether it completed successfully or failed pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { use cell::Cell; diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs new file mode 100644 index 0000000000000..8e7bf72fa6308 --- /dev/null +++ b/src/libcore/rt/tube.rs @@ -0,0 +1,184 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A very simple unsynchronized channel type for sending buffered data from +//! scheduler context to task context. +//! +//! XXX: This would be safer to use if split into two types like Port/Chan + +use option::*; +use clone::Clone; +use super::rc::RC; +use rt::sched::Task; +use rt::{context, TaskContext, SchedulerContext}; +use rt::local_sched; +use vec::OwnedVector; +use container::Container; + +struct TubeState { + blocked_task: Option<~Task>, + buf: ~[T] +} + +pub struct Tube { + p: RC> +} + +impl Tube { + pub fn new() -> Tube { + Tube { + p: RC::new(TubeState { + blocked_task: None, + buf: ~[] + }) + } + } + + pub fn send(&mut self, val: T) { + rtdebug!("tube send"); + assert!(context() == SchedulerContext); + + unsafe { + let state = self.p.unsafe_borrow_mut(); + (*state).buf.push(val); + + if (*state).blocked_task.is_some() { + // There's a waiting task. Wake it up + rtdebug!("waking blocked tube"); + let task = (*state).blocked_task.swap_unwrap(); + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + } + } + + pub fn recv(&mut self) -> T { + assert!(context() == TaskContext); + + unsafe { + let state = self.p.unsafe_borrow_mut(); + if !(*state).buf.is_empty() { + return (*state).buf.shift(); + } else { + // Block and wait for the next message + rtdebug!("blocking on tube recv"); + assert!(self.p.refcount() > 1); // There better be somebody to wake us up + assert!((*state).blocked_task.is_none()); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + (*state).blocked_task = Some(task); + } + rtdebug!("waking after tube recv"); + let buf = &mut (*state).buf; + assert!(!buf.is_empty()); + return buf.shift(); + } + } + } +} + +impl Clone for Tube { + fn clone(&self) -> Tube { + Tube { p: self.p.clone() } + } +} + +#[cfg(test)] +mod test { + use int; + use cell::Cell; + use rt::local_sched; + use rt::test::*; + use rt::rtio::EventLoop; + use super::*; + + #[test] + fn simple_test() { + do run_in_newsched_task { + let mut tube: Tube = Tube::new(); + let tube_clone = tube.clone(); + let tube_clone_cell = Cell(tube_clone); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + let mut tube_clone = tube_clone_cell.take(); + tube_clone.send(1); + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + + assert!(tube.recv() == 1); + } + } + + #[test] + fn blocking_test() { + do run_in_newsched_task { + let mut tube: Tube = Tube::new(); + let tube_clone = tube.clone(); + let tube_clone = Cell(Cell(Cell(tube_clone))); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + let tube_clone = tube_clone.take(); + do local_sched::borrow |sched| { + let tube_clone = tube_clone.take(); + do sched.event_loop.callback { + let mut tube_clone = tube_clone.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube_clone.send(1); + } + } + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + + assert!(tube.recv() == 1); + } + } + + #[test] + fn many_blocking_test() { + static MAX: int = 100; + + do run_in_newsched_task { + let mut tube: Tube = Tube::new(); + let tube_clone = tube.clone(); + let tube_clone = Cell(tube_clone); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + callback_send(tube_clone.take(), 0); + + fn callback_send(tube: Tube, i: int) { + if i == 100 { return; } + + let tube = Cell(Cell(tube)); + do local_sched::borrow |sched| { + let tube = tube.take(); + do sched.event_loop.callback { + let mut tube = tube.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube.send(i); + callback_send(tube, i + 1); + } + } + } + + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + + for int::range(0, MAX) |i| { + let j = tube.recv(); + assert!(j == i); + } + } + } +} diff --git a/src/libcore/rt/uv/file.rs b/src/libcore/rt/uv/file.rs index a4aef7485d737..2d14505509759 100644 --- a/src/libcore/rt/uv/file.rs +++ b/src/libcore/rt/uv/file.rs @@ -11,15 +11,11 @@ use prelude::*; use ptr::null; use libc::c_void; -use super::{UvError, Callback, Request, NativeHandle, Loop}; -use super::super::uvll; -use super::super::uvll::*; - -pub type FsCallback = ~fn(FsRequest, Option); -impl Callback for FsCallback { } +use rt::uv::{Request, NativeHandle, Loop, FsCallback}; +use rt::uv::uvll; +use rt::uv::uvll::*; pub struct FsRequest(*uvll::uv_fs_t); - impl Request for FsRequest; impl FsRequest { diff --git a/src/libcore/rt/uv/idle.rs b/src/libcore/rt/uv/idle.rs new file mode 100644 index 0000000000000..fecb9391caa54 --- /dev/null +++ b/src/libcore/rt/uv/idle.rs @@ -0,0 +1,91 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::c_int; +use option::Some; +use rt::uv::uvll; +use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback}; +use rt::uv::status_to_maybe_uv_error; + +pub struct IdleWatcher(*uvll::uv_idle_t); +impl Watcher for IdleWatcher { } + +pub impl IdleWatcher { + fn new(loop_: &mut Loop) -> IdleWatcher { + unsafe { + let handle = uvll::idle_new(); + assert!(handle.is_not_null()); + assert!(0 == uvll::idle_init(loop_.native_handle(), handle)); + let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + return watcher + } + } + + fn start(&mut self, cb: IdleCallback) { + { + let data = self.get_watcher_data(); + data.idle_cb = Some(cb); + } + + unsafe { + assert!(0 == uvll::idle_start(self.native_handle(), idle_cb)) + }; + + extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { + let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + let data = idle_watcher.get_watcher_data(); + let cb: &IdleCallback = data.idle_cb.get_ref(); + let status = status_to_maybe_uv_error(handle, status); + (*cb)(idle_watcher, status); + } + } + + fn stop(&mut self) { + // NB: Not resetting the Rust idle_cb to None here because `stop` is likely + // called from *within* the idle callback, causing a use after free + + unsafe { + assert!(0 == uvll::idle_stop(self.native_handle())); + } + } + + fn close(self, cb: NullCallback) { + { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb) }; + + extern fn close_cb(handle: *uvll::uv_idle_t) { + unsafe { + let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + { + let mut data = idle_watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); + } + idle_watcher.drop_watcher_data(); + uvll::idle_delete(handle); + } + } + } +} + +impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { + fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher { + IdleWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_idle_t { + match self { &IdleWatcher(ptr) => ptr } + } +} diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 6499f0a3efdcf..e719449139758 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -10,7 +10,7 @@ /*! -Bindings to libuv. +Bindings to libuv, along with the default implementation of `core::rt::rtio`. UV types consist of the event loop (Loop), Watchers, Requests and Callbacks. @@ -38,29 +38,44 @@ use container::Container; use option::*; use str::raw::from_c_str; use to_str::ToStr; +use ptr::Ptr; +use libc; use vec; use ptr; -use ptr::Ptr; +use cast; +use str; +use option::*; +use str::raw::from_c_str; +use to_str::ToStr; use libc::{c_void, c_int, size_t, malloc, free}; use cast::transmute; use ptr::null; -use super::uvll; use unstable::finally::Finally; +use rt::io::IoError; + #[cfg(test)] use unstable::run_in_bare_thread; -pub use self::file::{FsRequest, FsCallback}; +pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher}; -pub use self::net::{ReadCallback, AllocCallback, ConnectionCallback, ConnectCallback}; +pub use self::idle::IdleWatcher; + +/// The implementation of `rtio` for libuv +pub mod uvio; + +/// C bindings to libuv +pub mod uvll; pub mod file; pub mod net; +pub mod idle; -/// A trait for callbacks to implement. Provides a little extra type safety -/// for generic, unsafe interop functions like `set_watcher_callback`. -pub trait Callback { } - -pub trait Request { } +/// XXX: Loop(*handle) is buggy with destructors. Normal structs +/// with dtors may not be destructured, but tuple structs can, +/// but the results are not correct. +pub struct Loop { + handle: *uvll::uv_loop_t +} /// The trait implemented by uv 'watchers' (handles). Watchers are /// non-owning wrappers around the uv handles and are not completely @@ -68,12 +83,9 @@ pub trait Request { } /// handle. Watchers are generally created, then `start`ed, `stop`ed /// and `close`ed, but due to their complex life cycle may not be /// entirely memory safe if used in unanticipated patterns. -pub trait Watcher { - fn event_loop(&self) -> Loop; -} +pub trait Watcher { } -pub type NullCallback = ~fn(); -impl Callback for NullCallback { } +pub trait Request { } /// A type that wraps a native handle pub trait NativeHandle { @@ -81,13 +93,6 @@ pub trait NativeHandle { pub fn native_handle(&self) -> T; } -/// XXX: Loop(*handle) is buggy with destructors. Normal structs -/// with dtors may not be destructured, but tuple structs can, -/// but the results are not correct. -pub struct Loop { - handle: *uvll::uv_loop_t -} - pub impl Loop { fn new() -> Loop { let handle = unsafe { uvll::loop_new() }; @@ -113,64 +118,71 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop { } } -pub struct IdleWatcher(*uvll::uv_idle_t); +// XXX: The uv alloc callback also has a *uv_handle_t arg +pub type AllocCallback = ~fn(uint) -> Buf; +pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); +pub type NullCallback = ~fn(); +pub type IdleCallback = ~fn(IdleWatcher, Option); +pub type ConnectionCallback = ~fn(StreamWatcher, Option); +pub type FsCallback = ~fn(FsRequest, Option); + -impl Watcher for IdleWatcher { - fn event_loop(&self) -> Loop { - loop_from_watcher(self) - } +/// Callbacks used by StreamWatchers, set as custom data on the foreign handle +struct WatcherData { + read_cb: Option, + write_cb: Option, + connect_cb: Option, + close_cb: Option, + alloc_cb: Option, + idle_cb: Option } -pub type IdleCallback = ~fn(IdleWatcher, Option); -impl Callback for IdleCallback { } +pub trait WatcherInterop { + fn event_loop(&self) -> Loop; + fn install_watcher_data(&mut self); + fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData; + fn drop_watcher_data(&mut self); +} -pub impl IdleWatcher { - fn new(loop_: &mut Loop) -> IdleWatcher { +impl> WatcherInterop for W { + /// Get the uv event loop from a Watcher + pub fn event_loop(&self) -> Loop { unsafe { - let handle = uvll::idle_new(); - assert!(handle.is_not_null()); - assert!(0 == uvll::idle_init(loop_.native_handle(), handle)); - uvll::set_data_for_uv_handle(handle, null::<()>()); - NativeHandle::from_native_handle(handle) + let handle = self.native_handle(); + let loop_ = uvll::get_loop_for_uv_handle(handle); + NativeHandle::from_native_handle(loop_) } } - fn start(&mut self, cb: IdleCallback) { - - set_watcher_callback(self, cb); + pub fn install_watcher_data(&mut self) { unsafe { - assert!(0 == uvll::idle_start(self.native_handle(), idle_cb)) - }; - - extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); - let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher); - let status = status_to_maybe_uv_error(handle, status); - (*cb)(idle_watcher, status); + let data = ~WatcherData { + read_cb: None, + write_cb: None, + connect_cb: None, + close_cb: None, + alloc_cb: None, + idle_cb: None + }; + let data = transmute::<~WatcherData, *c_void>(data); + uvll::set_data_for_uv_handle(self.native_handle(), data); } } - fn stop(&mut self) { - unsafe { assert!(0 == uvll::idle_stop(self.native_handle())); } - } - - fn close(self) { - unsafe { uvll::close(self.native_handle(), close_cb) }; - - extern fn close_cb(handle: *uvll::uv_idle_t) { - let mut idle_watcher = NativeHandle::from_native_handle(handle); - drop_watcher_callback::(&mut idle_watcher); - unsafe { uvll::idle_delete(handle) }; + pub fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData { + unsafe { + let data = uvll::get_data_for_uv_handle(self.native_handle()); + let data = transmute::<&*c_void, &mut ~WatcherData>(&data); + return &mut **data; } } -} -impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { - fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher { - IdleWatcher(handle) - } - fn native_handle(&self) -> *uvll::uv_idle_t { - match self { &IdleWatcher(ptr) => ptr } + pub fn drop_watcher_data(&mut self) { + unsafe { + let data = uvll::get_data_for_uv_handle(self.native_handle()); + let _data = transmute::<*c_void, ~WatcherData>(data); + uvll::set_data_for_uv_handle(self.native_handle(), null::<()>()); + } } } @@ -213,148 +225,70 @@ fn error_smoke_test() { assert!(err.to_str() == ~"EOF: end of file"); } - -/// Given a uv handle, convert a callback status to a UvError -// XXX: Follow the pattern below by parameterizing over T: Watcher, not T -pub fn status_to_maybe_uv_error(handle: *T, status: c_int) -> Option { - if status != -1 { - None - } else { - unsafe { - rtdebug!("handle: %x", handle as uint); - let loop_ = uvll::get_loop_for_uv_handle(handle); - rtdebug!("loop: %x", loop_ as uint); - let err = uvll::last_error(loop_); - Some(UvError(err)) - } - } -} - -/// Get the uv event loop from a Watcher -pub fn loop_from_watcher>( - watcher: &W) -> Loop { - - let handle = watcher.native_handle(); - let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) }; - NativeHandle::from_native_handle(loop_) -} - -/// Set the custom data on a handle to a callback Note: This is only -/// suitable for watchers that make just one type of callback. For -/// others use WatcherData -pub fn set_watcher_callback, CB: Callback>( - watcher: &mut W, cb: CB) { - - drop_watcher_callback::(watcher); - // XXX: Boxing the callback so it fits into a - // pointer. Unfortunate extra allocation - let boxed_cb = ~cb; - let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) }; - unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) }; -} - -/// Delete a callback from a handle's custom data -pub fn drop_watcher_callback, CB: Callback>( - watcher: &mut W) { - +pub fn last_uv_error>(watcher: &W) -> UvError { unsafe { - let handle = watcher.native_handle(); - let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); - if handle_data.is_not_null() { - // Take ownership of the callback and drop it - let _cb = transmute::<*c_void, ~CB>(handle_data); - // Make sure the pointer is zeroed - uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>()); - } + let loop_ = watcher.event_loop(); + UvError(uvll::last_error(loop_.native_handle())) } } -/// Take a pointer to the callback installed as custom data -pub fn borrow_callback_from_watcher, - CB: Callback>(watcher: &W) -> &CB { - - unsafe { - let handle = watcher.native_handle(); - let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); - assert!(handle_data.is_not_null()); - let cb = transmute::<&*c_void, &~CB>(&handle_data); - return &**cb; - } -} +pub fn uv_error_to_io_error(uverr: UvError) -> IoError { -/// Take ownership of the callback installed as custom data -pub fn take_callback_from_watcher, CB: Callback>( - watcher: &mut W) -> CB { + // XXX: Could go in str::raw + unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str { + let s = s as *u8; + let mut curr = s, len = 0u; + while *curr != 0u8 { + len += 1u; + curr = ptr::offset(s, len); + } - unsafe { - let handle = watcher.native_handle(); - let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); - assert!(handle_data.is_not_null()); - uvll::set_data_for_uv_handle(handle, null::<()>()); - let cb: ~CB = transmute::<*c_void, ~CB>(handle_data); - let cb = match cb { ~cb => cb }; - return cb; + str::raw::buf_as_slice(s, len, |d| cast::transmute(d)) } -} -/// Callbacks used by StreamWatchers, set as custom data on the foreign handle -struct WatcherData { - read_cb: Option, - write_cb: Option, - connect_cb: Option, - close_cb: Option, - alloc_cb: Option, - buf: Option -} -pub fn install_watcher_data>(watcher: &mut W) { unsafe { - let data = ~WatcherData { - read_cb: None, - write_cb: None, - connect_cb: None, - close_cb: None, - alloc_cb: None, - buf: None + // Importing error constants + use rt::uv::uvll::*; + use rt::io::*; + + // uv error descriptions are static + let c_desc = uvll::strerror(&*uverr); + let desc = c_str_to_static_slice(c_desc); + + let kind = match uverr.code { + UNKNOWN => OtherIoError, + OK => OtherIoError, + EOF => EndOfFile, + EACCES => PermissionDenied, + ECONNREFUSED => ConnectionRefused, + e => { + abort!("unknown uv error code: %u", e as uint); + } }; - let data = transmute::<~WatcherData, *c_void>(data); - uvll::set_data_for_uv_handle(watcher.native_handle(), data); - } -} - -pub fn get_watcher_data<'r, H, W: Watcher + NativeHandle<*H>>( - watcher: &'r mut W) -> &'r mut WatcherData { - unsafe { - let data = uvll::get_data_for_uv_handle(watcher.native_handle()); - let data = transmute::<&*c_void, &mut ~WatcherData>(&data); - return &mut **data; - } -} - -pub fn drop_watcher_data>(watcher: &mut W) { - unsafe { - let data = uvll::get_data_for_uv_handle(watcher.native_handle()); - let _data = transmute::<*c_void, ~WatcherData>(data); - uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>()); + IoError { + kind: kind, + desc: desc, + detail: None + } } } -#[test] -fn test_slice_to_uv_buf() { - let slice = [0, .. 20]; - let buf = slice_to_uv_buf(slice); - - assert!(buf.len == 20); - - unsafe { - let base = transmute::<*u8, *mut u8>(buf.base); - (*base) = 1; - (*ptr::mut_offset(base, 1)) = 2; +/// Given a uv handle, convert a callback status to a UvError +// XXX: Follow the pattern below by parameterizing over T: Watcher, not T +pub fn status_to_maybe_uv_error(handle: *T, status: c_int) -> Option { + if status != -1 { + None + } else { + unsafe { + rtdebug!("handle: %x", handle as uint); + let loop_ = uvll::get_loop_for_uv_handle(handle); + rtdebug!("loop: %x", loop_ as uint); + let err = uvll::last_error(loop_); + Some(UvError(err)) + } } - - assert!(slice[0] == 1); - assert!(slice[1] == 2); } /// The uv buffer type @@ -394,6 +328,24 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { } } +#[test] +fn test_slice_to_uv_buf() { + let slice = [0, .. 20]; + let buf = slice_to_uv_buf(slice); + + assert!(buf.len == 20); + + unsafe { + let base = transmute::<*u8, *mut u8>(buf.base); + (*base) = 1; + (*ptr::mut_offset(base, 1)) = 2; + } + + assert!(slice[0] == 1); + assert!(slice[1] == 2); +} + + #[test] fn loop_smoke_test() { do run_in_bare_thread { @@ -409,7 +361,7 @@ fn idle_new_then_close() { do run_in_bare_thread { let mut loop_ = Loop::new(); let idle_watcher = { IdleWatcher::new(&mut loop_) }; - idle_watcher.close(); + idle_watcher.close(||()); } } @@ -425,7 +377,7 @@ fn idle_smoke_test() { assert!(status.is_none()); if unsafe { *count_ptr == 10 } { idle_watcher.stop(); - idle_watcher.close(); + idle_watcher.close(||()); } else { unsafe { *count_ptr = *count_ptr + 1; } } @@ -449,7 +401,7 @@ fn idle_start_stop_start() { assert!(status.is_none()); let mut idle_watcher = idle_watcher; idle_watcher.stop(); - idle_watcher.close(); + idle_watcher.close(||()); } } loop_.run(); diff --git a/src/libcore/rt/uv/net.rs b/src/libcore/rt/uv/net.rs index 3e6aa657c57dd..fd78b552119b5 100644 --- a/src/libcore/rt/uv/net.rs +++ b/src/libcore/rt/uv/net.rs @@ -10,21 +10,15 @@ use prelude::*; use libc::{size_t, ssize_t, c_int, c_void}; -use cast::transmute_mut_region; -use super::super::uvll; -use super::super::uvll::*; -use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback, - loop_from_watcher, status_to_maybe_uv_error, - install_watcher_data, get_watcher_data, drop_watcher_data, - vec_to_uv_buf, vec_from_uv_buf}; -use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6}; - -#[cfg(test)] use cell::Cell; -#[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use super::super::thread::Thread; -#[cfg(test)] use super::super::test::*; - -fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) { +use rt::uv::uvll; +use rt::uv::uvll::*; +use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback}; +use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, + status_to_maybe_uv_error}; +use rt::io::net::ip::{IpAddr, Ipv4, Ipv6}; +use rt::uv::last_uv_error; + +fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T { match addr { Ipv4(a, b, c, d, p) => { unsafe { @@ -34,7 +28,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) { c as uint, d as uint), p as int); do (|| { - f(addr); + f(addr) }).finally { free_ip4_addr(addr); } @@ -47,34 +41,23 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) { // uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t // and uv_file_t pub struct StreamWatcher(*uvll::uv_stream_t); - -impl Watcher for StreamWatcher { - fn event_loop(&self) -> Loop { - loop_from_watcher(self) - } -} - -pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); -impl Callback for ReadCallback { } - -// XXX: The uv alloc callback also has a *uv_handle_t arg -pub type AllocCallback = ~fn(uint) -> Buf; -impl Callback for AllocCallback { } +impl Watcher for StreamWatcher { } pub impl StreamWatcher { fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { - // XXX: Borrowchk problems - let data = get_watcher_data(unsafe { transmute_mut_region(self) }); - data.alloc_cb = Some(alloc); - data.read_cb = Some(cb); + { + let data = self.get_watcher_data(); + data.alloc_cb = Some(alloc); + data.read_cb = Some(cb); + } let handle = self.native_handle(); unsafe { uvll::read_start(handle, alloc_cb, read_cb); } extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf { let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); - let data = get_watcher_data(&mut stream_watcher); + let data = stream_watcher.get_watcher_data(); let alloc_cb = data.alloc_cb.get_ref(); return (*alloc_cb)(suggested_size as uint); } @@ -83,7 +66,7 @@ pub impl StreamWatcher { rtdebug!("buf addr: %x", buf.base as uint); rtdebug!("buf len: %d", buf.len as int); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); - let data = get_watcher_data(&mut stream_watcher); + let data = stream_watcher.get_watcher_data(); let cb = data.read_cb.get_ref(); let status = status_to_maybe_uv_error(stream, nread as c_int); (*cb)(stream_watcher, nread as int, buf, status); @@ -98,22 +81,19 @@ pub impl StreamWatcher { unsafe { uvll::read_stop(handle); } } - // XXX: Needs to take &[u8], not ~[u8] - fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) { - // XXX: Borrowck - let data = get_watcher_data(unsafe { transmute_mut_region(self) }); - assert!(data.write_cb.is_none()); - data.write_cb = Some(cb); + fn write(&mut self, buf: Buf, cb: ConnectionCallback) { + { + let data = self.get_watcher_data(); + assert!(data.write_cb.is_none()); + data.write_cb = Some(cb); + } let req = WriteRequest::new(); - let buf = vec_to_uv_buf(msg); - assert!(data.buf.is_none()); - data.buf = Some(buf); let bufs = [buf]; unsafe { assert!(0 == uvll::write(req.native_handle(), - self.native_handle(), - bufs, write_cb)); + self.native_handle(), + bufs, write_cb)); } extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { @@ -121,8 +101,7 @@ pub impl StreamWatcher { let mut stream_watcher = write_request.stream(); write_request.delete(); let cb = { - let data = get_watcher_data(&mut stream_watcher); - let _vec = vec_from_uv_buf(data.buf.swap_unwrap()); + let data = stream_watcher.get_watcher_data(); let cb = data.write_cb.swap_unwrap(); cb }; @@ -142,7 +121,7 @@ pub impl StreamWatcher { fn close(self, cb: NullCallback) { { let mut this = self; - let data = get_watcher_data(&mut this); + let data = this.get_watcher_data(); assert!(data.close_cb.is_none()); data.close_cb = Some(cb); } @@ -152,9 +131,10 @@ pub impl StreamWatcher { extern fn close_cb(handle: *uvll::uv_stream_t) { let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); { - get_watcher_data(&mut stream_watcher).close_cb.swap_unwrap()(); + let mut data = stream_watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); } - drop_watcher_data(&mut stream_watcher); + stream_watcher.drop_watcher_data(); unsafe { free_handle(handle as *c_void) } } } @@ -171,15 +151,7 @@ impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { } pub struct TcpWatcher(*uvll::uv_tcp_t); - -impl Watcher for TcpWatcher { - fn event_loop(&self) -> Loop { - loop_from_watcher(self) - } -} - -pub type ConnectionCallback = ~fn(StreamWatcher, Option); -impl Callback for ConnectionCallback { } +impl Watcher for TcpWatcher { } pub impl TcpWatcher { fn new(loop_: &mut Loop) -> TcpWatcher { @@ -187,21 +159,24 @@ pub impl TcpWatcher { let handle = malloc_handle(UV_TCP); assert!(handle.is_not_null()); assert!(0 == uvll::tcp_init(loop_.native_handle(), handle)); - let mut watcher = NativeHandle::from_native_handle(handle); - install_watcher_data(&mut watcher); + let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); return watcher; } } - fn bind(&mut self, address: IpAddr) { + fn bind(&mut self, address: IpAddr) -> Result<(), UvError> { match address { Ipv4(*) => { do ip4_as_uv_ip4(address) |addr| { let result = unsafe { uvll::tcp_bind(self.native_handle(), addr) }; - // XXX: bind is likely to fail. need real error handling - assert!(result == 0); + if result == 0 { + Ok(()) + } else { + Err(last_uv_error(self)) + } } } _ => fail!() @@ -210,8 +185,8 @@ pub impl TcpWatcher { fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) { unsafe { - assert!(get_watcher_data(self).connect_cb.is_none()); - get_watcher_data(self).connect_cb = Some(cb); + assert!(self.get_watcher_data().connect_cb.is_none()); + self.get_watcher_data().connect_cb = Some(cb); let connect_handle = ConnectRequest::new().native_handle(); match address { @@ -232,7 +207,7 @@ pub impl TcpWatcher { let mut stream_watcher = connect_request.stream(); connect_request.delete(); let cb: ConnectionCallback = { - let data = get_watcher_data(&mut stream_watcher); + let data = stream_watcher.get_watcher_data(); data.connect_cb.swap_unwrap() }; let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status); @@ -242,10 +217,11 @@ pub impl TcpWatcher { } fn listen(&mut self, cb: ConnectionCallback) { - // XXX: Borrowck - let data = get_watcher_data(unsafe { transmute_mut_region(self) }); - assert!(data.connect_cb.is_none()); - data.connect_cb = Some(cb); + { + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); + } unsafe { static BACKLOG: c_int = 128; // XXX should be configurable @@ -257,9 +233,10 @@ pub impl TcpWatcher { extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { rtdebug!("connection_cb"); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap(); - let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status); - cb(stream_watcher, status); + let data = stream_watcher.get_watcher_data(); + let cb = data.connect_cb.get_ref(); + let status = status_to_maybe_uv_error(handle, status); + (*cb)(stream_watcher, status); } } @@ -277,12 +254,8 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher { } } -pub type ConnectCallback = ~fn(ConnectRequest, Option); -impl Callback for ConnectCallback { } - // uv_connect_t is a subclass of uv_req_t struct ConnectRequest(*uvll::uv_connect_t); - impl Request for ConnectRequest { } impl ConnectRequest { @@ -355,93 +328,109 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest { } -#[test] -fn connect_close() { - do run_in_bare_thread() { - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - // Connect to a port where nobody is listening - let addr = next_test_ip4(); - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("tcp_watcher.connect!"); - assert!(status.is_some()); - assert!(status.get().name() == ~"ECONNREFUSED"); - stream_watcher.close(||()); +#[cfg(test)] +mod test { + use super::*; + use util::ignore; + use cell::Cell; + use vec; + use unstable::run_in_bare_thread; + use rt::thread::Thread; + use rt::test::*; + use rt::uv::{Loop, AllocCallback}; + use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf}; + + #[test] + fn connect_close() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + // Connect to a port where nobody is listening + let addr = next_test_ip4(); + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("tcp_watcher.connect!"); + assert!(status.is_some()); + assert!(status.get().name() == ~"ECONNREFUSED"); + stream_watcher.close(||()); + } + loop_.run(); + loop_.close(); } - loop_.run(); - loop_.close(); } -} -#[test] -fn listen() { - do run_in_bare_thread() { - static MAX: int = 10; - let mut loop_ = Loop::new(); - let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; - let addr = next_test_ip4(); - server_tcp_watcher.bind(addr); - let loop_ = loop_; - rtdebug!("listening"); - do server_tcp_watcher.listen |server_stream_watcher, status| { - rtdebug!("listened!"); - assert!(status.is_none()); - let mut server_stream_watcher = server_stream_watcher; - let mut loop_ = loop_; - let client_tcp_watcher = TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = client_tcp_watcher.as_stream(); - server_stream_watcher.accept(client_tcp_watcher); - let count_cell = Cell(0); - let server_stream_watcher = server_stream_watcher; - rtdebug!("starting read"); - let alloc: AllocCallback = |size| { - vec_to_uv_buf(vec::from_elem(size, 0)) - }; - do client_tcp_watcher.read_start(alloc) - |stream_watcher, nread, buf, status| { - - rtdebug!("i'm reading!"); - let buf = vec_from_uv_buf(buf); - let mut count = count_cell.take(); - if status.is_none() { - rtdebug!("got %d bytes", nread); - let buf = buf.unwrap(); - for buf.slice(0, nread as uint).each |byte| { - assert!(*byte == count as u8); - rtdebug!("%u", *byte as uint); - count += 1; - } - } else { - assert!(count == MAX); - do stream_watcher.close { - server_stream_watcher.close(||()); + #[test] + fn listen() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = next_test_ip4(); + server_tcp_watcher.bind(addr); + let loop_ = loop_; + rtdebug!("listening"); + do server_tcp_watcher.listen |server_stream_watcher, status| { + rtdebug!("listened!"); + assert!(status.is_none()); + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = loop_; + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + server_stream_watcher.accept(client_tcp_watcher); + let count_cell = Cell(0); + let server_stream_watcher = server_stream_watcher; + rtdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0)) + }; + do client_tcp_watcher.read_start(alloc) + |stream_watcher, nread, buf, status| { + + rtdebug!("i'm reading!"); + let buf = vec_from_uv_buf(buf); + let mut count = count_cell.take(); + if status.is_none() { + rtdebug!("got %d bytes", nread); + let buf = buf.unwrap(); + for buf.slice(0, nread as uint).each |byte| { + assert!(*byte == count as u8); + rtdebug!("%u", *byte as uint); + count += 1; + } + } else { + assert!(count == MAX); + do stream_watcher.close { + server_stream_watcher.close(||()); + } } + count_cell.put_back(count); } - count_cell.put_back(count); } - } - let _client_thread = do Thread::start { - rtdebug!("starting client thread"); - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connecting"); - assert!(status.is_none()); - let mut stream_watcher = stream_watcher; - let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; - do stream_watcher.write(msg) |stream_watcher, status| { - rtdebug!("writing"); + let _client_thread = do Thread::start { + rtdebug!("starting client thread"); + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("connecting"); assert!(status.is_none()); - stream_watcher.close(||()); + let mut stream_watcher = stream_watcher; + let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; + let buf = slice_to_uv_buf(msg); + let msg_cell = Cell(msg); + do stream_watcher.write(buf) |stream_watcher, status| { + rtdebug!("writing"); + assert!(status.is_none()); + let msg_cell = Cell(msg_cell.take()); + stream_watcher.close(||ignore(msg_cell.take())); + } } - } + loop_.run(); + loop_.close(); + }; + + let mut loop_ = loop_; loop_.run(); loop_.close(); - }; - - let mut loop_ = loop_; - loop_.run(); - loop_.close(); + } } } diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uv/uvio.rs similarity index 59% rename from src/libcore/rt/uvio.rs rename to src/libcore/rt/uv/uvio.rs index 24bffd8d1cd24..cc9eb2ada4d17 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -10,20 +10,24 @@ use option::*; use result::*; - -use super::io::net::ip::IpAddr; -use super::uv::*; -use super::rtio::*; use ops::Drop; use old_iter::CopyableIter; use cell::{Cell, empty_cell}; use cast::transmute; -use super::sched::{Scheduler, local_sched}; +use clone::Clone; +use rt::io::IoError; +use rt::io::net::ip::IpAddr; +use rt::uv::*; +use rt::uv::idle::IdleWatcher; +use rt::rtio::*; +use rt::sched::{Scheduler, local_sched}; +use rt::io::{standard_error, OtherIoError}; +use rt::tube::Tube; #[cfg(test)] use container::Container; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use super::test::*; +#[cfg(test)] use rt::test::*; pub struct UvEventLoop { uvio: UvIoFactory @@ -64,7 +68,7 @@ impl EventLoop for UvEventLoop { assert!(status.is_none()); let mut idle_watcher = idle_watcher; idle_watcher.stop(); - idle_watcher.close(); + idle_watcher.close(||()); f(); } } @@ -100,11 +104,11 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> { + fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> { // Create a cell in the task to hold the result. We will fill // the cell before resuming the task. let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); @@ -122,21 +126,26 @@ impl IoFactory for UvIoFactory { // Wait for a connection do tcp_watcher.connect(addr) |stream_watcher, status| { rtdebug!("connect: in connect callback"); - let maybe_stream = if status.is_none() { + if status.is_none() { rtdebug!("status is none"); - Some(~UvStream(stream_watcher)) + let res = Ok(~UvTcpStream { watcher: stream_watcher }); + + // Store the stream in the task's stack + unsafe { (*result_cell_ptr).put_back(res); } + + // Context switch + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); } else { rtdebug!("status is some"); - stream_watcher.close(||()); - None + let task_cell = Cell(task_cell.take()); + do stream_watcher.close { + let res = Err(uv_error_to_io_error(status.get())); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } }; - - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(maybe_stream); } - - // Context switch - let scheduler = local_sched::take(); - scheduler.resume_task_immediately(task_cell.take()); } } @@ -144,103 +153,124 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> { + fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); - watcher.bind(addr); - return Some(~UvTcpListener(watcher)); + match watcher.bind(addr) { + Ok(_) => Ok(~UvTcpListener::new(watcher)), + Err(uverr) => { + let scheduler = local_sched::take(); + do scheduler.deschedule_running_task_and_then |task| { + let task_cell = Cell(task); + do watcher.as_stream().close { + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } + } + Err(uv_error_to_io_error(uverr)) + } + } } } -pub struct UvTcpListener(TcpWatcher); +// FIXME #6090: Prefer newtype structs but Drop doesn't work +pub struct UvTcpListener { + watcher: TcpWatcher, + listening: bool, + incoming_streams: Tube> +} impl UvTcpListener { - fn watcher(&self) -> TcpWatcher { - match self { &UvTcpListener(w) => w } + fn new(watcher: TcpWatcher) -> UvTcpListener { + UvTcpListener { + watcher: watcher, + listening: false, + incoming_streams: Tube::new() + } } - fn close(&self) { - // XXX: Need to wait until close finishes before returning - self.watcher().as_stream().close(||()); - } + fn watcher(&self) -> TcpWatcher { self.watcher } } impl Drop for UvTcpListener { fn finalize(&self) { - // XXX: Again, this never gets called. Use .close() instead - //self.watcher().as_stream().close(||()); + let watcher = self.watcher(); + let scheduler = local_sched::take(); + do scheduler.deschedule_running_task_and_then |task| { + let task_cell = Cell(task); + do watcher.as_stream().close { + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } + } } } -impl TcpListener for UvTcpListener { +impl RtioTcpListener for UvTcpListener { - fn listen(&mut self) -> Option<~StreamObject> { + fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { rtdebug!("entering listen"); - let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; - let server_tcp_watcher = self.watcher(); - - let scheduler = local_sched::take(); - assert!(scheduler.in_task_context()); + if self.listening { + return self.incoming_streams.recv(); + } - do scheduler.deschedule_running_task_and_then |task| { - let task_cell = Cell(task); - let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |server_stream_watcher, status| { - let maybe_stream = if status.is_none() { - let mut server_stream_watcher = server_stream_watcher; - let mut loop_ = loop_from_watcher(&server_stream_watcher); - let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream(); - // XXX: Needs to be surfaced in interface - server_stream_watcher.accept(client_tcp_watcher); - Some(~UvStream::new(client_tcp_watcher)) - } else { - None - }; + self.listening = true; - unsafe { (*result_cell_ptr).put_back(maybe_stream); } + let server_tcp_watcher = self.watcher(); + let incoming_streams_cell = Cell(self.incoming_streams.clone()); + + let incoming_streams_cell = Cell(incoming_streams_cell.take()); + let mut server_tcp_watcher = server_tcp_watcher; + do server_tcp_watcher.listen |server_stream_watcher, status| { + let maybe_stream = if status.is_none() { + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = server_stream_watcher.event_loop(); + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let client_tcp_watcher = client_tcp_watcher.as_stream(); + // XXX: Need's to be surfaced in interface + server_stream_watcher.accept(client_tcp_watcher); + Ok(~UvTcpStream { watcher: client_tcp_watcher }) + } else { + Err(standard_error(OtherIoError)) + }; - rtdebug!("resuming task from listen"); - // Context switch - let scheduler = local_sched::take(); - scheduler.resume_task_immediately(task_cell.take()); - } + let mut incoming_streams = incoming_streams_cell.take(); + incoming_streams.send(maybe_stream); + incoming_streams_cell.put_back(incoming_streams); } - assert!(!result_cell.is_empty()); - return result_cell.take(); + return self.incoming_streams.recv(); } } -pub struct UvStream(StreamWatcher); - -impl UvStream { - fn new(watcher: StreamWatcher) -> UvStream { - UvStream(watcher) - } - - fn watcher(&self) -> StreamWatcher { - match self { &UvStream(w) => w } - } +// FIXME #6090: Prefer newtype structs but Drop doesn't work +pub struct UvTcpStream { + watcher: StreamWatcher +} - // XXX: finalize isn't working for ~UvStream??? - fn close(&self) { - // XXX: Need to wait until this finishes before returning - self.watcher().close(||()); - } +impl UvTcpStream { + fn watcher(&self) -> StreamWatcher { self.watcher } } -impl Drop for UvStream { +impl Drop for UvTcpStream { fn finalize(&self) { - rtdebug!("closing stream"); - //self.watcher().close(||()); + rtdebug!("closing tcp stream"); + let watcher = self.watcher(); + let scheduler = local_sched::take(); + do scheduler.deschedule_running_task_and_then |task| { + let task_cell = Cell(task); + do watcher.close { + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } + } } } -impl Stream for UvStream { - fn read(&mut self, buf: &mut [u8]) -> Result { +impl RtioTcpStream for UvTcpStream { + fn read(&mut self, buf: &mut [u8]) -> Result { let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); @@ -271,7 +301,7 @@ impl Stream for UvStream { assert!(nread >= 0); Ok(nread as uint) } else { - Err(()) + Err(standard_error(OtherIoError)) }; unsafe { (*result_cell_ptr).put_back(result); } @@ -285,9 +315,9 @@ impl Stream for UvStream { return result_cell.take(); } - fn write(&mut self, buf: &[u8]) -> Result<(), ()> { + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); @@ -295,14 +325,12 @@ impl Stream for UvStream { do scheduler.deschedule_running_task_and_then |task| { let mut watcher = watcher; let task_cell = Cell(task); - let buf = unsafe { &*buf_ptr }; - // XXX: OMGCOPIES - let buf = buf.to_vec(); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; do watcher.write(buf) |_watcher, status| { let result = if status.is_none() { Ok(()) } else { - Err(()) + Err(standard_error(OtherIoError)) }; unsafe { (*result_cell_ptr).put_back(result); } @@ -320,10 +348,12 @@ impl Stream for UvStream { #[test] fn test_simple_io_no_connect() { do run_in_newsched_task { - let io = unsafe { local_sched::unsafe_borrow_io() }; - let addr = next_test_ip4(); - let maybe_chan = io.connect(addr); - assert!(maybe_chan.is_none()); + unsafe { + let io = local_sched::unsafe_borrow_io(); + let addr = next_test_ip4(); + let maybe_chan = (*io).tcp_connect(addr); + assert!(maybe_chan.is_err()); + } } } @@ -336,8 +366,8 @@ fn test_simple_tcp_server_and_client() { do spawntask_immediately { unsafe { let io = local_sched::unsafe_borrow_io(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); + let mut listener = (*io).tcp_bind(addr).unwrap(); + let mut stream = listener.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert!(nread == 8); @@ -345,17 +375,14 @@ fn test_simple_tcp_server_and_client() { rtdebug!("%u", buf[i] as uint); assert!(buf[i] == i as u8); } - stream.close(); - listener.close(); } } do spawntask_immediately { unsafe { let io = local_sched::unsafe_borrow_io(); - let mut stream = io.connect(addr).unwrap(); + let mut stream = (*io).tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); } } } @@ -368,8 +395,8 @@ fn test_read_and_block() { do spawntask_immediately { let io = unsafe { local_sched::unsafe_borrow_io() }; - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); + let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; + let mut stream = listener.accept().unwrap(); let mut buf = [0, .. 2048]; let expected = 32; @@ -399,19 +426,17 @@ fn test_read_and_block() { // Make sure we had multiple reads assert!(reads > 1); - - stream.close(); - listener.close(); } do spawntask_immediately { - let io = unsafe { local_sched::unsafe_borrow_io() }; - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); + unsafe { + let io = local_sched::unsafe_borrow_io(); + let mut stream = (*io).tcp_connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + } } } @@ -426,34 +451,33 @@ fn test_read_read_read() { do spawntask_immediately { unsafe { let io = local_sched::unsafe_borrow_io(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); + let mut listener = (*io).tcp_bind(addr).unwrap(); + let mut stream = listener.accept().unwrap(); let buf = [1, .. 2048]; let mut total_bytes_written = 0; while total_bytes_written < MAX { stream.write(buf); total_bytes_written += buf.len(); } - stream.close(); - listener.close(); } } do spawntask_immediately { - let io = unsafe { local_sched::unsafe_borrow_io() }; - let mut stream = io.connect(addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < MAX { - let nread = stream.read(buf).unwrap(); - rtdebug!("read %u bytes", nread as uint); - total_bytes_read += nread; - for uint::range(0, nread) |i| { - assert!(buf[i] == 1); + unsafe { + let io = local_sched::unsafe_borrow_io(); + let mut stream = (*io).tcp_connect(addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < MAX { + let nread = stream.read(buf).unwrap(); + rtdebug!("read %u bytes", nread as uint); + total_bytes_read += nread; + for uint::range(0, nread) |i| { + assert!(buf[i] == 1); + } } + rtdebug!("read %u bytes total", total_bytes_read as uint); } - rtdebug!("read %u bytes total", total_bytes_read as uint); - stream.close(); } } } diff --git a/src/libcore/rt/uvll.rs b/src/libcore/rt/uv/uvll.rs similarity index 98% rename from src/libcore/rt/uvll.rs rename to src/libcore/rt/uv/uvll.rs index 4bff3bff7d3ae..2a2812c671847 100644 --- a/src/libcore/rt/uvll.rs +++ b/src/libcore/rt/uv/uvll.rs @@ -33,6 +33,13 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t}; use libc::{malloc, free}; use prelude::*; +pub static UNKNOWN: c_int = -1; +pub static OK: c_int = 0; +pub static EOF: c_int = 1; +pub static EADDRINFO: c_int = 2; +pub static EACCES: c_int = 3; +pub static ECONNREFUSED: c_int = 12; + pub struct uv_err_t { code: c_int, sys_errno_: c_int diff --git a/src/libcore/sys.rs b/src/libcore/sys.rs index 4eca7ebbb371e..50a739ec67df7 100644 --- a/src/libcore/sys.rs +++ b/src/libcore/sys.rs @@ -202,10 +202,12 @@ impl FailWithCause for &'static str { // FIXME #4427: Temporary until rt::rt_fail_ goes away pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { - use rt::{context, OldTaskContext}; - use rt::local_services::unsafe_borrow_local_services; + use option::Option; + use rt::{context, OldTaskContext, TaskContext}; + use rt::local_services::{unsafe_borrow_local_services, Unwinder}; - match context() { + let context = context(); + match context { OldTaskContext => { unsafe { gc::cleanup_stack_for_failure(); @@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { } } _ => { - // XXX: Need to print the failure message - gc::cleanup_stack_for_failure(); unsafe { + // XXX: Bad re-allocations. fail! needs some refactoring + let msg = str::raw::from_c_str(msg); + let file = str::raw::from_c_str(file); + + let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file); + + // XXX: Logging doesn't work correctly in non-task context because it + // invokes the local heap + if context == TaskContext { + error!(outmsg); + } else { + rtdebug!("%s", outmsg); + } + + gc::cleanup_stack_for_failure(); + let local_services = unsafe_borrow_local_services(); - match local_services.unwinder { + let unwinder: &mut Option = &mut (*local_services).unwinder; + match *unwinder { Some(ref mut unwinder) => unwinder.begin_unwind(), None => abort!("failure without unwinder. aborting process") } diff --git a/src/libcore/task/local_data_priv.rs b/src/libcore/task/local_data_priv.rs index a30db039f30d7..27f58057e2ff5 100644 --- a/src/libcore/task/local_data_priv.rs +++ b/src/libcore/task/local_data_priv.rs @@ -36,7 +36,7 @@ impl Handle { } _ => { let local_services = unsafe_borrow_local_services(); - NewHandle(&mut local_services.storage) + NewHandle(&mut (*local_services).storage) } } } diff --git a/src/libcore/unstable/lang.rs b/src/libcore/unstable/lang.rs index 8153c2d43d998..e521fb59fbe5f 100644 --- a/src/libcore/unstable/lang.rs +++ b/src/libcore/unstable/lang.rs @@ -16,12 +16,12 @@ use libc::{c_char, c_uchar, c_void, size_t, uintptr_t, c_int, STDERR_FILENO}; use managed::raw::BoxRepr; use str; use sys; -use unstable::exchange_alloc; use cast::transmute; use rt::{context, OldTaskContext}; use rt::local_services::borrow_local_services; use option::{Option, Some, None}; use io; +use rt::global_heap; #[allow(non_camel_case_types)] pub type rust_task = c_void; @@ -153,7 +153,7 @@ unsafe fn fail_borrowed(box: *mut BoxRepr, file: *c_char, line: size_t) { #[lang="exchange_malloc"] #[inline(always)] pub unsafe fn exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char { - transmute(exchange_alloc::malloc(transmute(td), transmute(size))) + transmute(global_heap::malloc(transmute(td), transmute(size))) } /// Because this code is so perf. sensitive, use a static constant so that @@ -233,7 +233,7 @@ impl DebugPrints for io::fd_t { #[lang="exchange_free"] #[inline(always)] pub unsafe fn exchange_free(ptr: *c_char) { - exchange_alloc::free(transmute(ptr)) + global_heap::free(transmute(ptr)) } #[lang="malloc"] diff --git a/src/libcore/unstable/mod.rs b/src/libcore/unstable/mod.rs index bef7a7f87d3bd..18a6262f17de9 100644 --- a/src/libcore/unstable/mod.rs +++ b/src/libcore/unstable/mod.rs @@ -19,7 +19,6 @@ pub mod at_exit; pub mod global; pub mod finally; pub mod weak_task; -pub mod exchange_alloc; pub mod intrinsics; pub mod simd; pub mod extfmt; diff --git a/src/libcore/vec.rs b/src/libcore/vec.rs index 190b493a6f0e6..0fb697b686fb3 100644 --- a/src/libcore/vec.rs +++ b/src/libcore/vec.rs @@ -2071,6 +2071,8 @@ pub trait ImmutableVector<'self, T> { fn initn(&self, n: uint) -> &'self [T]; fn last(&self) -> &'self T; fn last_opt(&self) -> Option<&'self T>; + fn position(&self, f: &fn(t: &T) -> bool) -> Option; + fn rposition(&self, f: &fn(t: &T) -> bool) -> Option; #[cfg(stage0)] fn each_reverse(&self, blk: &fn(&T) -> bool); #[cfg(not(stage0))] @@ -2138,6 +2140,30 @@ impl<'self,T> ImmutableVector<'self, T> for &'self [T] { #[inline] fn last_opt(&self) -> Option<&'self T> { last_opt(*self) } + /** + * Find the first index matching some predicate + * + * Apply function `f` to each element of `v`. When function `f` returns + * true then an option containing the index is returned. If `f` matches no + * elements then none is returned. + */ + #[inline] + fn position(&self, f: &fn(t: &T) -> bool) -> Option { + position(*self, f) + } + + /** + * Find the last index matching some predicate + * + * Apply function `f` to each element of `v` in reverse order. When + * function `f` returns true then an option containing the index is + * returned. If `f` matches no elements then none is returned. + */ + #[inline] + fn rposition(&self, f: &fn(t: &T) -> bool) -> Option { + rposition(*self, f) + } + /// Iterates over a vector's elements in reverse. #[inline] #[cfg(stage0)] @@ -2230,43 +2256,17 @@ impl<'self,T> ImmutableVector<'self, T> for &'self [T] { } pub trait ImmutableEqVector { - fn position(&self, f: &fn(t: &T) -> bool) -> Option; fn position_elem(&self, t: &T) -> Option; - fn rposition(&self, f: &fn(t: &T) -> bool) -> Option; fn rposition_elem(&self, t: &T) -> Option; } impl<'self,T:Eq> ImmutableEqVector for &'self [T] { - /** - * Find the first index matching some predicate - * - * Apply function `f` to each element of `v`. When function `f` returns - * true then an option containing the index is returned. If `f` matches no - * elements then none is returned. - */ - #[inline] - fn position(&self, f: &fn(t: &T) -> bool) -> Option { - position(*self, f) - } - /// Find the first index containing a matching value #[inline] fn position_elem(&self, x: &T) -> Option { position_elem(*self, x) } - /** - * Find the last index matching some predicate - * - * Apply function `f` to each element of `v` in reverse order. When - * function `f` returns true then an option containing the index is - * returned. If `f` matches no elements then none is returned. - */ - #[inline] - fn rposition(&self, f: &fn(t: &T) -> bool) -> Option { - rposition(*self, f) - } - /// Find the last index containing a matching value #[inline] fn rposition_elem(&self, t: &T) -> Option { diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 903289281222b..39a6f5bfd1b7b 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -830,9 +830,9 @@ rust_get_rt_env() { } #ifndef _WIN32 -pthread_key_t sched_key; +pthread_key_t sched_key = -1; #else -DWORD sched_key; +DWORD sched_key = -1; #endif extern "C" void* diff --git a/src/rt/rust_stack.cpp b/src/rt/rust_stack.cpp index f07690a955ea2..a609ac573245d 100644 --- a/src/rt/rust_stack.cpp +++ b/src/rt/rust_stack.cpp @@ -92,3 +92,14 @@ destroy_exchange_stack(rust_exchange_alloc *exchange, stk_seg *stk) { deregister_valgrind_stack(stk); exchange->free(stk); } + + +extern "C" CDECL unsigned int +rust_valgrind_stack_register(void *start, void *end) { + return VALGRIND_STACK_REGISTER(start, end); +} + +extern "C" CDECL void +rust_valgrind_stack_deregister(unsigned int id) { + VALGRIND_STACK_DEREGISTER(id); +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 6be41251f1bd9..75a5a069605c7 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -234,3 +234,5 @@ rust_try rust_begin_unwind rust_take_task_borrow_list rust_set_task_borrow_list +rust_valgrind_stack_register +rust_valgrind_stack_deregister \ No newline at end of file