Skip to content

green: Don't fall back to epoll() if possible #12186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/libgreen/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ impl EventLoop for BasicLoop {
}

fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }

fn has_active_io(&self) -> bool { false }
}

struct BasicRemote {
Expand Down
157 changes: 97 additions & 60 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,23 @@ impl Scheduler {

// * Execution Functions - Core Loop Logic

// The model for this function is that you continue through it
// until you either use the scheduler while performing a schedule
// action, in which case you give it away and return early, or
// you reach the end and sleep. In the case that a scheduler
// action is performed the loop is evented such that this function
// is called again.
// This function is run from the idle callback on the uv loop, indicating
// that there are no I/O events pending. When this function returns, we will
// fall back to epoll() in the uv event loop, waiting for more things to
// happen. We may come right back off epoll() if the idle callback is still
// active, in which case we're truly just polling to see if I/O events are
// complete.
//
// The model for this function is to execute as much work as possible while
// still fairly considering I/O tasks. Falling back to epoll() frequently is
// often quite expensive, so we attempt to avoid it as much as possible. If
// we have any active I/O on the event loop, then we're forced to fall back
// to epoll() in order to provide fairness, but as long as we're doing work
// and there's no active I/O, we can continue to do work.
//
// If we try really hard to do some work, but no work is available to be
// done, then we fall back to epoll() to block this thread waiting for more
// work (instead of busy waiting).
fn run_sched_once(mut ~self, stask: ~GreenTask) {
// Make sure that we're not lying in that the `stask` argument is indeed
// the scheduler task for this scheduler.
Expand All @@ -269,26 +280,46 @@ impl Scheduler {

// First we check for scheduler messages, these are higher
// priority than regular tasks.
let (sched, stask) =
match self.interpret_message_queue(stask, DontTryTooHard) {
Some(pair) => pair,
None => return
};

// This helper will use a randomized work-stealing algorithm
// to find work.
let (sched, stask) = match sched.do_work(stask) {
Some(pair) => pair,
None => return
};
let (mut sched, mut stask, mut did_work) =
self.interpret_message_queue(stask, DontTryTooHard);

// Now, before sleeping we need to find out if there really
// were any messages. Give it your best!
let (mut sched, stask) =
match sched.interpret_message_queue(stask, GiveItYourBest) {
Some(pair) => pair,
None => return
// After processing a message, we consider doing some more work on the
// event loop. The "keep going" condition changes after the first
// iteration becase we don't want to spin here infinitely.
//
// Once we start doing work we can keep doing work so long as the
// iteration does something. Note that we don't want to starve the
// message queue here, so each iteration when we're done working we
// check the message queue regardless of whether we did work or not.
let mut keep_going = !did_work || !sched.event_loop.has_active_io();
while keep_going {
let (a, b, c) = match sched.do_work(stask) {
(sched, task, false) => {
sched.interpret_message_queue(task, GiveItYourBest)
}
(sched, task, true) => {
let (sched, task, _) =
sched.interpret_message_queue(task, GiveItYourBest);
(sched, task, true)
}
};
sched = a;
stask = b;
did_work = c;

// We only keep going if we managed to do something productive and
// also don't have any active I/O. If we didn't do anything, we
// should consider going to sleep, and if we have active I/O we need
// to poll for completion.
keep_going = did_work && !sched.event_loop.has_active_io();
}

// If we ever did some work, then we shouldn't put our scheduler
// entirely to sleep just yet. Leave the idle callback active and fall
// back to epoll() to see what's going on.
if did_work {
return stask.put_with_sched(sched);
}

// If we got here then there was no work to do.
// Generate a SchedHandle and push it to the sleeper list so
Expand Down Expand Up @@ -318,7 +349,7 @@ impl Scheduler {
// return None.
fn interpret_message_queue(mut ~self, stask: ~GreenTask,
effort: EffortLevel)
-> Option<(~Scheduler, ~GreenTask)>
-> (~Scheduler, ~GreenTask, bool)
{

let msg = if effort == DontTryTooHard {
Expand Down Expand Up @@ -349,25 +380,25 @@ impl Scheduler {
Some(PinnedTask(task)) => {
let mut task = task;
task.give_home(HomeSched(self.make_handle()));
self.resume_task_immediately(stask, task).put();
return None;
let (sched, task) = self.resume_task_immediately(stask, task);
(sched, task, true)
}
Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!");
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
return None;
let (sched, task) =
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
(sched, task, true)
}
Some(RunOnce(task)) => {
// bypass the process_task logic to force running this task once
// on this home scheduler. This is often used for I/O (homing).
self.resume_task_immediately(stask, task).put();
return None;
let (sched, task) = self.resume_task_immediately(stask, task);
(sched, task, true)
}
Some(Wake) => {
self.sleepy = false;
stask.put_with_sched(self);
return None;
(self, stask, true)
}
Some(Shutdown) => {
rtdebug!("shutting down");
Expand All @@ -389,31 +420,30 @@ impl Scheduler {
// event loop references we will shut down.
self.no_sleep = true;
self.sleepy = false;
stask.put_with_sched(self);
return None;
(self, stask, true)
}
Some(NewNeighbor(neighbor)) => {
self.work_queues.push(neighbor);
return Some((self, stask));
}
None => {
return Some((self, stask));
(self, stask, false)
}
None => (self, stask, false)
}
}

fn do_work(mut ~self, stask: ~GreenTask) -> Option<(~Scheduler, ~GreenTask)> {
fn do_work(mut ~self,
stask: ~GreenTask) -> (~Scheduler, ~GreenTask, bool) {
rtdebug!("scheduler calling do work");
match self.find_work() {
Some(task) => {
rtdebug!("found some work! running the task");
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
return None;
let (sched, task) =
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
(sched, task, true)
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some((self, stask));
(self, stask, false)
}
}
}
Expand Down Expand Up @@ -486,7 +516,8 @@ impl Scheduler {
// place.

fn process_task(mut ~self, cur: ~GreenTask,
mut next: ~GreenTask, schedule_fn: SchedulingFn) {
mut next: ~GreenTask,
schedule_fn: SchedulingFn) -> (~Scheduler, ~GreenTask) {
rtdebug!("processing a task");

match next.take_unwrap_home() {
Expand All @@ -495,23 +526,23 @@ impl Scheduler {
rtdebug!("sending task home");
next.give_home(HomeSched(home_handle));
Scheduler::send_task_home(next);
cur.put_with_sched(self);
(self, cur)
} else {
rtdebug!("running task here");
next.give_home(HomeSched(home_handle));
schedule_fn(self, cur, next);
schedule_fn(self, cur, next)
}
}
AnySched if self.run_anything => {
rtdebug!("running anysched task here");
next.give_home(AnySched);
schedule_fn(self, cur, next);
schedule_fn(self, cur, next)
}
AnySched => {
rtdebug!("sending task to friend");
next.give_home(AnySched);
self.send_to_friend(next);
cur.put_with_sched(self);
(self, cur)
}
}
}
Expand Down Expand Up @@ -664,18 +695,19 @@ impl Scheduler {
// * Context Swapping Helpers - Here be ugliness!

pub fn resume_task_immediately(~self, cur: ~GreenTask,
next: ~GreenTask) -> ~GreenTask {
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
assert!(cur.is_sched());
self.change_task_context(cur, next, |sched, stask| {
let mut cur = self.change_task_context(cur, next, |sched, stask| {
assert!(sched.sched_task.is_none());
sched.sched_task = Some(stask);
})
});
(cur.sched.take_unwrap(), cur)
}

fn resume_task_immediately_cl(sched: ~Scheduler,
cur: ~GreenTask,
next: ~GreenTask) {
sched.resume_task_immediately(cur, next).put()
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
sched.resume_task_immediately(cur, next)
}

/// Block a running task, context switch to the scheduler, then pass the
Expand Down Expand Up @@ -741,15 +773,17 @@ impl Scheduler {
cur.put();
}

fn switch_task(sched: ~Scheduler, cur: ~GreenTask, next: ~GreenTask) {
sched.change_task_context(cur, next, |sched, last_task| {
fn switch_task(sched: ~Scheduler, cur: ~GreenTask,
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
if last_task.is_sched() {
assert!(sched.sched_task.is_none());
sched.sched_task = Some(last_task);
} else {
sched.enqueue_task(last_task);
}
}).put()
});
(cur.sched.take_unwrap(), cur)
}

// * Task Context Helpers
Expand All @@ -769,7 +803,9 @@ impl Scheduler {
}

pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) {
self.process_task(cur, next, Scheduler::switch_task);
let (sched, task) =
self.process_task(cur, next, Scheduler::switch_task);
task.put_with_sched(sched);
}

pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) {
Expand Down Expand Up @@ -836,7 +872,8 @@ impl Scheduler {

// Supporting types

type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask);
type SchedulingFn = fn (~Scheduler, ~GreenTask, ~GreenTask)
-> (~Scheduler, ~GreenTask);

pub enum SchedMessage {
Wake,
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/addrinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl GetAddrInfoRequest {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };

wait_until_woken_after(&mut cx.slot, || {
wait_until_woken_after(&mut cx.slot, loop_, || {
req.set_data(&cx);
});

Expand Down
3 changes: 2 additions & 1 deletion src/librustuv/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
0 => {
req.fired = true;
let mut slot = None;
wait_until_woken_after(&mut slot, || {
let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) };
wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || {
unsafe { uvll::set_data_for_req(req.req, &slot) }
});
match req.get_result() {
Expand Down
28 changes: 25 additions & 3 deletions src/librustuv/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ via `close` and `delete` methods.
use std::cast;
use std::io;
use std::io::IoError;
use std::libc::c_int;
use std::libc::{c_int, c_void};
use std::ptr::null;
use std::ptr;
use std::rt::local::Local;
Expand Down Expand Up @@ -95,6 +95,10 @@ pub mod stream;
pub trait UvHandle<T> {
fn uv_handle(&self) -> *T;

fn uv_loop(&self) -> Loop {
Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) })
}

// FIXME(#8888) dummy self
fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
unsafe {
Expand Down Expand Up @@ -136,7 +140,7 @@ pub trait UvHandle<T> {
uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());

wait_until_woken_after(&mut slot, || {
wait_until_woken_after(&mut slot, &self.uv_loop(), || {
uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
})
}
Expand Down Expand Up @@ -195,16 +199,20 @@ impl Drop for ForbidUnwind {
}
}

fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
fn wait_until_woken_after(slot: *mut Option<BlockedTask>,
loop_: &Loop,
f: ||) {
let _f = ForbidUnwind::new("wait_until_woken_after");
unsafe {
assert!((*slot).is_none());
let task: ~Task = Local::take();
loop_.modify_blockers(1);
task.deschedule(1, |task| {
*slot = Some(task);
f();
Ok(())
});
loop_.modify_blockers(-1);
}
}

Expand Down Expand Up @@ -273,6 +281,7 @@ impl Loop {
pub fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
assert!(handle.is_not_null());
unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) }
Loop::wrap(handle)
}

Expand All @@ -285,6 +294,19 @@ impl Loop {
pub fn close(&mut self) {
unsafe { uvll::uv_loop_delete(self.handle) };
}

// The 'data' field of the uv_loop_t is used to count the number of tasks
// that are currently blocked waiting for I/O to complete.
fn modify_blockers(&self, amt: uint) {
unsafe {
let cur = uvll::get_data_for_uv_loop(self.handle) as uint;
uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void)
}
}

fn get_blockers(&self) -> uint {
unsafe { uvll::get_data_for_uv_loop(self.handle) as uint }
}
}

// FIXME: Need to define the error constants like EOF so they can be
Expand Down
Loading