Index: mojo/public/rust/src/bindings/run_loop.rs |
diff --git a/mojo/public/rust/src/bindings/run_loop.rs b/mojo/public/rust/src/bindings/run_loop.rs |
index 13109105a0639af1b64579ed32ab762415d83e8a..754694ed3d2d4d94def3b4a39c1bc255375ef53e 100644 |
--- a/mojo/public/rust/src/bindings/run_loop.rs |
+++ b/mojo/public/rust/src/bindings/run_loop.rs |
@@ -13,6 +13,11 @@ |
//! at which point it wakes up and executes the appropriate handler |
//! method. This handler method may then be used to further populate |
//! or de-populate the run-loop. |
+//! |
+//! As of yet, the run-loop is NOT thread-safe. Although it is useful |
+//! to be able to register tasks or handles from one thread onto |
+//! another thread's run-loop, this is as-of-yet unsupported, and |
+//! Rust should complain loudly when you try to do any threading here. |
use std::cell::RefCell; |
use std::cmp::{PartialEq, Eq, PartialOrd, Ord, Ordering}; |
@@ -40,7 +45,7 @@ const INITIAL_WAIT_SET_NUM_RESULTS: usize = 16; |
const MAXIMUM_WAIT_SET_NUM_RESULTS: usize = 256; |
/// Thread-local data structure for keeping track of handles to wait on. |
-thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static>> = RefCell::new(RunLoop::new())); |
+thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static, 'static>> = RefCell::new(RunLoop::new())); |
/// Token representing handle/callback to wait on for this thread only. This |
/// token only has meaning on the thread in which the handle was registered. |
@@ -133,6 +138,62 @@ impl<'h> HandlerInfo<'h> { |
} |
} |
+/// A wrapper struct for carrying the task as well as some information about |
+/// it. |
+struct TaskInfo<'t> { |
+ /// The task, boxed up. |
+ closure: Box<FnMut(&mut RunLoop) + 't>, |
+ |
+ /// An absolute deadline in terms of time ticks. |
+ /// |
+ /// This is the most recently updated deadline that |
+ /// we should be watching out for. All others for this |
+ /// token may be considered "stale". |
+ deadline: system::MojoTimeTicks, |
+} |
+ |
+impl<'t> TaskInfo<'t> { |
+ /// Executes the task within the info object, consuming it |
+ /// in the process. |
+ pub fn execute_task(mut self, run_loop: &mut RunLoop) { |
+ (*self.closure)(run_loop); |
+ } |
+ |
+ /// Getter for the current absolute deadline held inside. |
+ pub fn deadline(&self) -> system::MojoTimeTicks { |
+ self.deadline |
+ } |
+} |
+ |
+impl<'t> PartialEq for TaskInfo<'t> { |
+ /// Equality for TaskInfo in terms of its deadline |
+ fn eq(&self, other: &TaskInfo) -> bool { |
+ self.deadline == other.deadline |
+ } |
+} |
+ |
+impl<'t> Eq for TaskInfo<'t> {} |
+ |
+impl<'t> PartialOrd for TaskInfo<'t> { |
+ /// Partial comparison for TaskInfo in terms of its deadline |
+ /// |
+ /// Reverses the comparison because the Rust std library only |
+ /// offers a max-heap, and we need a min-heap. |
+ fn partial_cmp(&self, other: &TaskInfo) -> Option<Ordering> { |
+ other.deadline.partial_cmp(&self.deadline) |
+ } |
+} |
+ |
+impl<'t> Ord for TaskInfo<'t> { |
+ /// Implement comparisons for Task Info. |
+ /// |
+ /// Reverses the comparison because the Rust std library only |
+ /// offers a max-heap, and we need a min-heap. |
+ fn cmp(&self, other: &Self) -> Ordering { |
+ other.deadline.cmp(&self.deadline) |
+ } |
+} |
+ |
/// Wrapper struct intended to be used in a priority queue |
/// for efficiently retrieving the next closest deadline. |
#[derive(Clone)] |
@@ -224,7 +285,7 @@ fn relative_deadline(deadline: system::MojoTimeTicks, |
/// |
/// Ultimately, it should only be a singleton living in |
/// thread-local storage. |
-pub struct RunLoop<'h> { |
+pub struct RunLoop<'h, 't> { |
/// Running count of the next available token slot. |
token_count: u64, |
@@ -233,6 +294,10 @@ pub struct RunLoop<'h> { |
/// TODO(mknyszek): Try out a Slab allocator instead of a hashmap. |
handlers: HashMap<Token, HandlerInfo<'h>>, |
+ /// A min-heap of delayed tasks in order to pull the soonest task to |
+ /// execute efficiently. |
+ tasks: BinaryHeap<TaskInfo<'t>>, |
+ |
/// A min-heap containing deadlines in order to pull out the soonest |
/// deadline efficiently. |
/// |
@@ -252,12 +317,13 @@ pub struct RunLoop<'h> { |
running: bool, |
} |
-impl<'h> RunLoop<'h> { |
+impl<'h, 't> RunLoop<'h, 't> { |
/// Create a new RunLoop. |
- pub fn new() -> RunLoop<'h> { |
+ pub fn new() -> RunLoop<'h, 't> { |
RunLoop { |
token_count: 0, |
handlers: HashMap::new(), |
+ tasks: BinaryHeap::new(), |
deadlines: BinaryHeap::new(), |
handle_set: wait_set::WaitSet::new(wsflags!(Create::None)).unwrap(), |
should_quit: false, |
@@ -356,12 +422,34 @@ impl<'h> RunLoop<'h> { |
} |
} |
- /// Uses the binary heap to get the next closest deadline. |
+ /// Adds a task to be run by the runloop after some delay. |
+ /// |
+ /// Returns a token if the delay is valid, otherwise returns None. |
+ pub fn post_task<F>(&mut self, task: F, delay: system::MojoTimeTicks) -> Result<(), ()> |
+ where F: FnMut(&mut RunLoop) + 't |
+ { |
+ let now = core::get_time_ticks_now(); |
+ if delay > i64::MAX - now { |
+ return Err(()); |
+ } |
+ let deadline = now + delay; |
+ self.tasks.push(TaskInfo { |
+ closure: Box::new(task), |
+ deadline: deadline, |
+ }); |
+ Ok(()) |
+ } |
+ |
+ /// Uses the binary heaps to get the next closest deadline. |
/// |
/// Removes stale deadline entries as they are found, but |
/// does not otherwise modify the heap of deadlines. |
fn get_next_deadline(&mut self) -> system::MojoTimeTicks { |
debug_assert!(!self.handlers.is_empty()); |
+ let top_task_deadline = match self.tasks.peek() { |
+ Some(info) => info.deadline(), |
+ None => MOJO_INDEFINITE_ABSOLUTE, |
+ }; |
let mut top = match self.deadlines.peek() { |
Some(info) => info.clone(), |
None => return MOJO_INDEFINITE_ABSOLUTE, |
@@ -373,7 +461,12 @@ impl<'h> RunLoop<'h> { |
None => return MOJO_INDEFINITE_ABSOLUTE, |
} |
} |
- top.deadline() |
+ if top_task_deadline != MOJO_INDEFINITE_ABSOLUTE && |
+ top_task_deadline < top.deadline() { |
+ top_task_deadline |
+ } else { |
+ top.deadline() |
+ } |
} |
/// Gets a handler by token to be manipulated in a consistent environment. |
@@ -483,6 +576,30 @@ impl<'h> RunLoop<'h> { |
} |
} |
+ /// Iterates through all tasks whose deadline has passed and executes |
+ /// them, consuming their information object. |
+ /// |
+ /// These tasks all have access to the RunLoop so that they may reschedule |
+ /// themselves or manipulate the RunLoop in some other way. |
+ fn execute_ready_tasks(&mut self) { |
+ let now = core::get_time_ticks_now(); |
+ let mut deadline = match self.tasks.peek() { |
+ Some(info) => info.deadline(), |
+ None => return, |
+ }; |
+ while deadline < now { |
+ let top = self.tasks.pop().expect("Sudden change to heap?"); |
+ top.execute_task(self); |
+ if self.should_quit { |
+ return; |
+ } |
+ deadline = match self.tasks.peek() { |
+ Some(info) => info.deadline(), |
+ None => return, |
+ }; |
+ } |
+ } |
+ |
/// Blocks on handle_set.wait_on_set using the information contained |
/// within itself. |
/// |
@@ -492,6 +609,12 @@ impl<'h> RunLoop<'h> { |
/// signals satisfied, or reaches its deadline. |
fn wait(&mut self, results_buffer: &mut Vec<system::WaitSetResult>) { |
debug_assert!(!self.handlers.is_empty()); |
+ self.execute_ready_tasks(); |
+ // If after executing a task we quit or there are no handles, |
+ // we have no reason to continue. |
+ if self.handlers.is_empty() || self.should_quit { |
+ return; |
+ } |
let deadline = self.get_next_deadline(); |
let until_deadline = relative_deadline(deadline, core::get_time_ticks_now()); |
// Perform the wait |