Skip to content

Scheduler multithreading #6839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
29d8300
core::rt: Move uv idle tests to idle mod
brson May 20, 2013
8072690
core::rt: Add bindings for async uv handles
brson May 21, 2013
6d8d73c
Add AtomicUint newtype
Aatch May 21, 2013
8f77a6f
core: Add AtomicInt and cleanup
brson May 22, 2013
a0cd55a
core::rt: Add RemoteCallback trait and uv implementation
brson May 21, 2013
41c2168
core::rt: Add SchedHandle type
brson May 22, 2013
8b7e392
core::rt: Scheduler takes a WorkQueue
brson May 23, 2013
7f107c4
core::rt: Remove UvEventLoop::new_scheduler function
brson May 23, 2013
3f8095e
core::rt: Add a very basic multi-threaded scheduling test
brson May 23, 2013
dec9db1
core::rt: Add SleeperList
brson May 29, 2013
ed8c359
core::rt: Add SleeperList to Scheduler
brson May 29, 2013
5043ea2
core::rt: Add run_in_mt_newsched_task test function
brson May 29, 2013
a373dad
core::rt: Outline the full multithreaded scheduling algo. Implement s…
brson May 29, 2013
f343e61
core::rt: Fix an infinite recursion bug
brson May 30, 2013
134bb0f
core::rt: Change the signature of context switching methods to avoid …
brson May 30, 2013
f4ed554
Merge remote-tracking branch 'brson/io' into incoming
brson May 30, 2013
ca2eebd
core::rt: Add some notes about optimizations
brson May 30, 2013
8eb358b
core::rt: Begin recording scheduler metrics
brson May 30, 2013
053b38e
core::rt: Fix two multithreading bugs and add a threadring test
brson May 30, 2013
ea633b4
core::rt: deny(unused_imports, unused_mut, unused_variable)
brson May 30, 2013
e2bedb1
core: Make atomic methods public
brson Jun 1, 2013
2e6d51f
std::rt: Use AtomicUint instead of intrinsics in comm
brson Jun 2, 2013
f7e242a
std::rt: Destroy the task start closure while in task context
brson Jun 4, 2013
1507df8
std::rt: Remove in incorrect assert
brson Jun 4, 2013
422f663
core::rt: Implement SharedChan
brson Jun 1, 2013
51d257f
core::rt: Add SharedPort
brson Jun 1, 2013
ece38b3
core::rt: Add `MegaPipe`, an unbounded, multiple producer/consumer, l…
brson Jun 1, 2013
80849e7
std: Fix stage0 build
brson Jun 2, 2013
f9a5005
rt: Add rust_get_num_cpus
brson Jun 6, 2013
8afec77
std::rt: Configure test threads with RUST_TEST_THREADS. Default is nc…
brson Jun 6, 2013
d6ccc6b
std::rt: Fix stream test to be parallel
brson Jun 6, 2013
d4de99a
std::rt: Fix a race in the UvRemoteCallback dtor
brson Jun 7, 2013
84d2695
std::rt: Work around a dynamic borrowck bug
brson Jun 11, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 322 additions & 19 deletions src/libstd/rt/comm.rs

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions src/libstd/rt/io/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,6 @@ mod test {
#[should_fail]
#[ignore(cfg(windows))]
fn push_bytes_fail_reset_len() {
use unstable::finally::Finally;

// push_bytes unsafely sets the vector length. This is testing that
// upon failure the length is reset correctly.
let mut reader = MockReader::new();
Expand All @@ -772,7 +770,8 @@ mod test {
reader.push_bytes(&mut *buf, 4);
}).finally {
// NB: Using rtassert here to trigger abort on failure since this is a should_fail test
rtassert!(*buf == ~[8, 9, 10]);
// FIXME: #7049 This fails because buf is still borrowed
//rtassert!(*buf == ~[8, 9, 10]);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/libstd/rt/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,30 @@ impl Local for IoFactoryObject {

#[cfg(test)]
mod test {
use rt::test::*;
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use super::*;

#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}

#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}

#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
Expand Down
3 changes: 3 additions & 0 deletions src/libstd/rt/message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! A concurrent queue that supports multiple producers and a
//! single consumer.

use container::Container;
use kinds::Owned;
use vec::OwnedVector;
Expand Down
88 changes: 88 additions & 0 deletions src/libstd/rt/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use to_str::ToStr;

pub struct SchedMetrics {
// The number of times executing `run_sched_once`.
turns: uint,
// The number of turns that received a message.
messages_received: uint,
// The number of turns that ran a task from the queue.
tasks_resumed_from_queue: uint,
// The number of turns that found no work to perform.
wasted_turns: uint,
// The number of times the scheduler went to sleep.
sleepy_times: uint,
// Context switches from the scheduler into a task.
context_switches_sched_to_task: uint,
// Context switches from a task into the scheduler.
context_switches_task_to_sched: uint,
// Context switches from a task to a task.
context_switches_task_to_task: uint,
// Message sends that unblock the receiver
rendezvous_sends: uint,
// Message sends that do not unblock the receiver
non_rendezvous_sends: uint,
// Message receives that do not block the receiver
rendezvous_recvs: uint,
// Message receives that block the receiver
non_rendezvous_recvs: uint
}

impl SchedMetrics {
pub fn new() -> SchedMetrics {
SchedMetrics {
turns: 0,
messages_received: 0,
tasks_resumed_from_queue: 0,
wasted_turns: 0,
sleepy_times: 0,
context_switches_sched_to_task: 0,
context_switches_task_to_sched: 0,
context_switches_task_to_task: 0,
rendezvous_sends: 0,
non_rendezvous_sends: 0,
rendezvous_recvs: 0,
non_rendezvous_recvs: 0
}
}
}

impl ToStr for SchedMetrics {
fn to_str(&self) -> ~str {
fmt!("turns: %u\n\
messages_received: %u\n\
tasks_resumed_from_queue: %u\n\
wasted_turns: %u\n\
sleepy_times: %u\n\
context_switches_sched_to_task: %u\n\
context_switches_task_to_sched: %u\n\
context_switches_task_to_task: %u\n\
rendezvous_sends: %u\n\
non_rendezvous_sends: %u\n\
rendezvous_recvs: %u\n\
non_rendezvous_recvs: %u\n\
",
self.turns,
self.messages_received,
self.tasks_resumed_from_queue,
self.wasted_turns,
self.sleepy_times,
self.context_switches_sched_to_task,
self.context_switches_task_to_sched,
self.context_switches_task_to_task,
self.rendezvous_sends,
self.non_rendezvous_sends,
self.rendezvous_recvs,
self.non_rendezvous_recvs
)
}
}
27 changes: 18 additions & 9 deletions src/libstd/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ Several modules in `core` are clients of `rt`:
*/

#[doc(hidden)];
#[deny(unused_imports)];
#[deny(unused_mut)];
#[deny(unused_variable)];

use ptr::Ptr;

Expand Down Expand Up @@ -88,6 +91,9 @@ mod work_queue;
/// A parallel queue.
mod message_queue;

/// A parallel data structure for tracking sleeping schedulers.
mod sleeper_list;

/// Stack segments and caching.
mod stack;

Expand Down Expand Up @@ -127,6 +133,8 @@ pub mod local_ptr;
/// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;

pub mod metrics;


/// Set up a default runtime configuration, given compiler-supplied arguments.
///
Expand All @@ -145,12 +153,17 @@ pub mod thread_local_storage;
pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {

use self::sched::{Scheduler, Coroutine};
use self::work_queue::WorkQueue;
use self::uv::uvio::UvEventLoop;
use self::sleeper_list::SleeperList;

init(crate_map);

let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_);
let work_queue = WorkQueue::new();
let sleepers = SleeperList::new();
let mut sched = ~Scheduler::new(loop_, work_queue, sleepers);
sched.no_sleep = true;
let main_task = ~Coroutine::new(&mut sched.stack_pool, main);

sched.enqueue_task(main_task);
Expand Down Expand Up @@ -218,23 +231,19 @@ pub fn context() -> RuntimeContext {
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{Scheduler, Coroutine};
use rt::uv::uvio::UvEventLoop;
use cell::Cell;
use rt::local::Local;
use rt::test::new_test_uv_sched;

assert_eq!(context(), OldTaskContext);
do run_in_bare_thread {
assert_eq!(context(), GlobalContext);
let mut sched = ~UvEventLoop::new_scheduler();
let mut sched = ~new_test_uv_sched();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |task| {
do sched.deschedule_running_task_and_then() |sched, task| {
assert_eq!(context(), SchedulerContext);
let task = Cell(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
sched.enqueue_task(task);
}
};
sched.enqueue_task(task);
Expand Down
11 changes: 11 additions & 0 deletions src/libstd/rt/rtio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use rt::uv::uvio;
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = uvio::UvEventLoop;
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
Expand All @@ -26,10 +27,20 @@ pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
fn callback_ms(&mut self, ms: u64, ~fn());
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
/// The asynchronous I/O services. Not all event loops may provide one
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
}

pub trait RemoteCallback {
/// Trigger the remote callback. Note that the number of times the callback
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
/// the callback will be called at least once, but multiple callbacks may be coalesced
/// and callbacks may be called more often requested. Destruction also triggers the
/// callback.
fn fire(&mut self);
}

pub trait IoFactory {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
Expand Down
Loading