Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(441)

Side by Side Diff: mojo/public/rust/src/bindings/run_loop.rs

Issue 2244463002: Rust: Support delayed tasks in RunLoop (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: loosen restriction on Fn Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | mojo/public/rust/tests/run_loop.rs » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 //! This module contains a thread-local run-loop. 5 //! This module contains a thread-local run-loop.
6 //! 6 //!
7 //! The run-loop may have handles and handlers pre-registers 7 //! The run-loop may have handles and handlers pre-registers
8 //! (and in fact, must) in order to keep running. The run-loop 8 //! (and in fact, must) in order to keep running. The run-loop
9 //! executes until it has no more handles or handlers on itself, 9 //! executes until it has no more handles or handlers on itself,
10 //! or until it is told to quit via stop(). 10 //! or until it is told to quit via stop().
11 //! 11 //!
12 //! The run-loop waits until some signals on some handle is satisfied, 12 //! The run-loop waits until some signals on some handle is satisfied,
13 //! at which point it wakes up and executes the appropriate handler 13 //! at which point it wakes up and executes the appropriate handler
14 //! method. This handler method may then be used to further populate 14 //! method. This handler method may then be used to further populate
15 //! or de-populate the run-loop. 15 //! or de-populate the run-loop.
16 //!
17 //! As of yet, the run-loop is NOT thread-safe. Although it is useful
18 //! to be able to register tasks or handles from one thread onto
19 //! another thread's run-loop, this is as-of-yet unsupported, and
20 //! Rust should complain loudly when you try to do any threading here.
16 21
17 use std::cell::RefCell; 22 use std::cell::RefCell;
18 use std::cmp::{PartialEq, Eq, PartialOrd, Ord, Ordering}; 23 use std::cmp::{PartialEq, Eq, PartialOrd, Ord, Ordering};
19 use std::collections::BinaryHeap; 24 use std::collections::BinaryHeap;
20 use std::collections::HashMap; 25 use std::collections::HashMap;
21 use std::i64; 26 use std::i64;
22 use std::u32; 27 use std::u32;
23 use std::vec::Vec; 28 use std::vec::Vec;
24 29
25 use system; 30 use system;
26 use system::{Handle, MOJO_INDEFINITE, MojoResult}; 31 use system::{Handle, MOJO_INDEFINITE, MojoResult};
27 use system::core; 32 use system::core;
28 use system::wait_set; 33 use system::wait_set;
29 34
30 /// Define the equivalent of MOJO_INDEFINITE for absolute deadlines 35 /// Define the equivalent of MOJO_INDEFINITE for absolute deadlines
31 const MOJO_INDEFINITE_ABSOLUTE: system::MojoTimeTicks = 0; 36 const MOJO_INDEFINITE_ABSOLUTE: system::MojoTimeTicks = 0;
32 37
33 // TODO(mknyszek): The numbers below are arbitrary and come from the C++ binding s, 38 // TODO(mknyszek): The numbers below are arbitrary and come from the C++ binding s,
34 // and should probably be changed at some point 39 // and should probably be changed at some point
35 40
36 /// Initial size of the result buffer. 41 /// Initial size of the result buffer.
37 const INITIAL_WAIT_SET_NUM_RESULTS: usize = 16; 42 const INITIAL_WAIT_SET_NUM_RESULTS: usize = 16;
38 43
39 /// Maximum size of the result buffer. 44 /// Maximum size of the result buffer.
40 const MAXIMUM_WAIT_SET_NUM_RESULTS: usize = 256; 45 const MAXIMUM_WAIT_SET_NUM_RESULTS: usize = 256;
41 46
42 /// Thread-local data structure for keeping track of handles to wait on. 47 /// Thread-local data structure for keeping track of handles to wait on.
43 thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static>> = RefCell::new(RunLo op::new())); 48 thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static, 'static>> = RefCell:: new(RunLoop::new()));
44 49
45 /// Token representing handle/callback to wait on for this thread only. This 50 /// Token representing handle/callback to wait on for this thread only. This
46 /// token only has meaning on the thread in which the handle was registered. 51 /// token only has meaning on the thread in which the handle was registered.
47 #[derive(Clone, Debug, PartialEq, Eq, Hash)] 52 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
48 pub struct Token(u64); 53 pub struct Token(u64);
49 54
50 impl Token { 55 impl Token {
51 /// Get the wait token's "cookie" form, suitable for use in a wait set. 56 /// Get the wait token's "cookie" form, suitable for use in a wait set.
52 fn as_cookie(&self) -> u64 { 57 fn as_cookie(&self) -> u64 {
53 self.0 58 self.0
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
126 pub fn deadline(&self) -> system::MojoTimeTicks { 131 pub fn deadline(&self) -> system::MojoTimeTicks {
127 self.deadline 132 self.deadline
128 } 133 }
129 134
130 /// Setter to update the current absolute deadline. 135 /// Setter to update the current absolute deadline.
131 pub fn set_deadline(&mut self, deadline: system::MojoTimeTicks) { 136 pub fn set_deadline(&mut self, deadline: system::MojoTimeTicks) {
132 self.deadline = deadline 137 self.deadline = deadline
133 } 138 }
134 } 139 }
135 140
141 /// A wrapper struct for carrying the task as well as some information about
142 /// it.
143 struct TaskInfo<'t> {
144 /// The task, boxed up.
145 closure: Box<FnMut(&mut RunLoop) + 't>,
146
147 /// An absolute deadline in terms of time ticks.
148 ///
149 /// This is the most recently updated deadline that
150 /// we should be watching out for. All others for this
151 /// token may be considered "stale".
152 deadline: system::MojoTimeTicks,
153 }
154
155 impl<'t> TaskInfo<'t> {
156 /// Executes the task within the info object, consuming it
157 /// in the process.
158 pub fn execute_task(mut self, run_loop: &mut RunLoop) {
159 (*self.closure)(run_loop);
160 }
161
162 /// Getter for the current absolute deadline held inside.
163 pub fn deadline(&self) -> system::MojoTimeTicks {
164 self.deadline
165 }
166 }
167
168 impl<'t> PartialEq for TaskInfo<'t> {
169 /// Equality for TaskInfo in terms of its deadline
170 fn eq(&self, other: &TaskInfo) -> bool {
171 self.deadline == other.deadline
172 }
173 }
174
175 impl<'t> Eq for TaskInfo<'t> {}
176
177 impl<'t> PartialOrd for TaskInfo<'t> {
178 /// Partial comparison for TaskInfo in terms of its deadline
179 ///
180 /// Reverses the comparison because the Rust std library only
181 /// offers a max-heap, and we need a min-heap.
182 fn partial_cmp(&self, other: &TaskInfo) -> Option<Ordering> {
183 other.deadline.partial_cmp(&self.deadline)
184 }
185 }
186
187 impl<'t> Ord for TaskInfo<'t> {
188 /// Implement comparisons for Task Info.
189 ///
190 /// Reverses the comparison because the Rust std library only
191 /// offers a max-heap, and we need a min-heap.
192 fn cmp(&self, other: &Self) -> Ordering {
193 other.deadline.cmp(&self.deadline)
194 }
195 }
196
136 /// Wrapper struct intended to be used in a priority queue 197 /// Wrapper struct intended to be used in a priority queue
137 /// for efficiently retrieving the next closest deadline. 198 /// for efficiently retrieving the next closest deadline.
138 #[derive(Clone)] 199 #[derive(Clone)]
139 struct DeadlineInfo { 200 struct DeadlineInfo {
140 /// The ID of the associated Handler struct in the RunLoop's 201 /// The ID of the associated Handler struct in the RunLoop's
141 /// hash map. 202 /// hash map.
142 token: Token, 203 token: Token,
143 204
144 /// An absolute deadline in terms of time ticks. 205 /// An absolute deadline in terms of time ticks.
145 deadline: system::MojoTimeTicks, 206 deadline: system::MojoTimeTicks,
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
217 } else { 278 } else {
218 (deadline - now) as system::MojoDeadline 279 (deadline - now) as system::MojoDeadline
219 } 280 }
220 } 281 }
221 282
222 /// This structure contains all information necessary to wait on handles 283 /// This structure contains all information necessary to wait on handles
223 /// asynchronously. 284 /// asynchronously.
224 /// 285 ///
225 /// Ultimately, it should only be a singleton living in 286 /// Ultimately, it should only be a singleton living in
226 /// thread-local storage. 287 /// thread-local storage.
227 pub struct RunLoop<'h> { 288 pub struct RunLoop<'h, 't> {
228 /// Running count of the next available token slot. 289 /// Running count of the next available token slot.
229 token_count: u64, 290 token_count: u64,
230 291
231 /// A map of handlers. 292 /// A map of handlers.
232 /// 293 ///
233 /// TODO(mknyszek): Try out a Slab allocator instead of a hashmap. 294 /// TODO(mknyszek): Try out a Slab allocator instead of a hashmap.
234 handlers: HashMap<Token, HandlerInfo<'h>>, 295 handlers: HashMap<Token, HandlerInfo<'h>>,
235 296
297 /// A min-heap of delayed tasks in order to pull the soonest task to
298 /// execute efficiently.
299 tasks: BinaryHeap<TaskInfo<'t>>,
300
236 /// A min-heap containing deadlines in order to pull out the soonest 301 /// A min-heap containing deadlines in order to pull out the soonest
237 /// deadline efficiently. 302 /// deadline efficiently.
238 /// 303 ///
239 /// Warning: may contain "stale" deadlines which are not kept in the 304 /// Warning: may contain "stale" deadlines which are not kept in the
240 /// map! 305 /// map!
241 deadlines: BinaryHeap<DeadlineInfo>, 306 deadlines: BinaryHeap<DeadlineInfo>,
242 307
243 /// The Mojo structure keeping track of handles and signals. 308 /// The Mojo structure keeping track of handles and signals.
244 /// 309 ///
245 /// This structure must be kept in sync with handlers. 310 /// This structure must be kept in sync with handlers.
246 handle_set: wait_set::WaitSet, 311 handle_set: wait_set::WaitSet,
247 312
248 /// A flag that tells the RunLoop whether it should quit. 313 /// A flag that tells the RunLoop whether it should quit.
249 should_quit: bool, 314 should_quit: bool,
250 315
251 /// A flag that indicates whether the RunLoop is running or now 316 /// A flag that indicates whether the RunLoop is running or now
252 running: bool, 317 running: bool,
253 } 318 }
254 319
255 impl<'h> RunLoop<'h> { 320 impl<'h, 't> RunLoop<'h, 't> {
256 /// Create a new RunLoop. 321 /// Create a new RunLoop.
257 pub fn new() -> RunLoop<'h> { 322 pub fn new() -> RunLoop<'h, 't> {
258 RunLoop { 323 RunLoop {
259 token_count: 0, 324 token_count: 0,
260 handlers: HashMap::new(), 325 handlers: HashMap::new(),
326 tasks: BinaryHeap::new(),
261 deadlines: BinaryHeap::new(), 327 deadlines: BinaryHeap::new(),
262 handle_set: wait_set::WaitSet::new(wsflags!(Create::None)).unwrap(), 328 handle_set: wait_set::WaitSet::new(wsflags!(Create::None)).unwrap(),
263 should_quit: false, 329 should_quit: false,
264 running: false, 330 running: false,
265 } 331 }
266 } 332 }
267 333
268 /// Generate a new Token for this RunLoop 334 /// Generate a new Token for this RunLoop
269 fn generate_token(&mut self) -> Token { 335 fn generate_token(&mut self) -> Token {
270 self.token_count = self.token_count.wrapping_add(1); 336 self.token_count = self.token_count.wrapping_add(1);
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
349 match self.handlers.remove(&token) { 415 match self.handlers.remove(&token) {
350 Some(_) => { 416 Some(_) => {
351 let _result = self.handle_set.remove(token.as_cookie()); 417 let _result = self.handle_set.remove(token.as_cookie());
352 debug_assert_eq!(_result, MojoResult::Okay); 418 debug_assert_eq!(_result, MojoResult::Okay);
353 true 419 true
354 } 420 }
355 None => false, 421 None => false,
356 } 422 }
357 } 423 }
358 424
359 /// Uses the binary heap to get the next closest deadline. 425 /// Adds a task to be run by the runloop after some delay.
426 ///
427 /// Returns a token if the delay is valid, otherwise returns None.
428 pub fn post_task<F>(&mut self, task: F, delay: system::MojoTimeTicks) -> Res ult<(), ()>
429 where F: FnMut(&mut RunLoop) + 't
430 {
431 let now = core::get_time_ticks_now();
432 if delay > i64::MAX - now {
433 return Err(());
434 }
435 let deadline = now + delay;
436 self.tasks.push(TaskInfo {
437 closure: Box::new(task),
438 deadline: deadline,
439 });
440 Ok(())
441 }
442
443 /// Uses the binary heaps to get the next closest deadline.
360 /// 444 ///
361 /// Removes stale deadline entries as they are found, but 445 /// Removes stale deadline entries as they are found, but
362 /// does not otherwise modify the heap of deadlines. 446 /// does not otherwise modify the heap of deadlines.
363 fn get_next_deadline(&mut self) -> system::MojoTimeTicks { 447 fn get_next_deadline(&mut self) -> system::MojoTimeTicks {
364 debug_assert!(!self.handlers.is_empty()); 448 debug_assert!(!self.handlers.is_empty());
449 let top_task_deadline = match self.tasks.peek() {
450 Some(info) => info.deadline(),
451 None => MOJO_INDEFINITE_ABSOLUTE,
452 };
365 let mut top = match self.deadlines.peek() { 453 let mut top = match self.deadlines.peek() {
366 Some(info) => info.clone(), 454 Some(info) => info.clone(),
367 None => return MOJO_INDEFINITE_ABSOLUTE, 455 None => return MOJO_INDEFINITE_ABSOLUTE,
368 }; 456 };
369 while !self.handlers.contains_key(top.token()) { 457 while !self.handlers.contains_key(top.token()) {
370 self.deadlines.pop(); 458 self.deadlines.pop();
371 top = match self.deadlines.peek() { 459 top = match self.deadlines.peek() {
372 Some(info) => info.clone(), 460 Some(info) => info.clone(),
373 None => return MOJO_INDEFINITE_ABSOLUTE, 461 None => return MOJO_INDEFINITE_ABSOLUTE,
374 } 462 }
375 } 463 }
376 top.deadline() 464 if top_task_deadline != MOJO_INDEFINITE_ABSOLUTE &&
465 top_task_deadline < top.deadline() {
466 top_task_deadline
467 } else {
468 top.deadline()
469 }
377 } 470 }
378 471
379 /// Gets a handler by token to be manipulated in a consistent environment. 472 /// Gets a handler by token to be manipulated in a consistent environment.
380 /// 473 ///
381 /// This method provides a method of accessing a handler in order to manipul ate 474 /// This method provides a method of accessing a handler in order to manipul ate
382 /// it in a manner that avoids a borrow cycle, that is, it take()s the handl er 475 /// it in a manner that avoids a borrow cycle, that is, it take()s the handl er
383 /// out of the HashMap, and returns it when manipulation has completed. 476 /// out of the HashMap, and returns it when manipulation has completed.
384 fn get_handler_with<F>(&mut self, token: &Token, invoker: F) 477 fn get_handler_with<F>(&mut self, token: &Token, invoker: F)
385 where F: FnOnce(&mut Self, 478 where F: FnOnce(&mut Self,
386 &mut Box<Handler + 'h>, 479 &mut Box<Handler + 'h>,
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
476 // Remove the deadline 569 // Remove the deadline
477 self.deadlines.pop(); 570 self.deadlines.pop();
478 // Break if the next deadline has not yet expired. 571 // Break if the next deadline has not yet expired.
479 top = match self.deadlines.peek() { 572 top = match self.deadlines.peek() {
480 Some(info) => info.clone(), 573 Some(info) => info.clone(),
481 None => break, 574 None => break,
482 }; 575 };
483 } 576 }
484 } 577 }
485 578
579 /// Iterates through all tasks whose deadline has passed and executes
580 /// them, consuming their information object.
581 ///
582 /// These tasks all have access to the RunLoop so that they may reschedule
583 /// themselves or manipulate the RunLoop in some other way.
584 fn execute_ready_tasks(&mut self) {
585 let now = core::get_time_ticks_now();
586 let mut deadline = match self.tasks.peek() {
587 Some(info) => info.deadline(),
588 None => return,
589 };
590 while deadline < now {
591 let top = self.tasks.pop().expect("Sudden change to heap?");
592 top.execute_task(self);
593 if self.should_quit {
594 return;
595 }
596 deadline = match self.tasks.peek() {
597 Some(info) => info.deadline(),
598 None => return,
599 };
600 }
601 }
602
486 /// Blocks on handle_set.wait_on_set using the information contained 603 /// Blocks on handle_set.wait_on_set using the information contained
487 /// within itself. 604 /// within itself.
488 /// 605 ///
489 /// This method blocks for only as long as the shortest deadline among all 606 /// This method blocks for only as long as the shortest deadline among all
490 /// handles this thread has registered. This method returns immediately as 607 /// handles this thread has registered. This method returns immediately as
491 /// soon as any one handle has its signals satisfied, fails to ever have its 608 /// soon as any one handle has its signals satisfied, fails to ever have its
492 /// signals satisfied, or reaches its deadline. 609 /// signals satisfied, or reaches its deadline.
493 fn wait(&mut self, results_buffer: &mut Vec<system::WaitSetResult>) { 610 fn wait(&mut self, results_buffer: &mut Vec<system::WaitSetResult>) {
494 debug_assert!(!self.handlers.is_empty()); 611 debug_assert!(!self.handlers.is_empty());
612 self.execute_ready_tasks();
613 // If after executing a task we quit or there are no handles,
614 // we have no reason to continue.
615 if self.handlers.is_empty() || self.should_quit {
616 return;
617 }
495 let deadline = self.get_next_deadline(); 618 let deadline = self.get_next_deadline();
496 let until_deadline = relative_deadline(deadline, core::get_time_ticks_no w()); 619 let until_deadline = relative_deadline(deadline, core::get_time_ticks_no w());
497 // Perform the wait 620 // Perform the wait
498 match self.handle_set.wait_on_set(until_deadline, results_buffer) { 621 match self.handle_set.wait_on_set(until_deadline, results_buffer) {
499 Ok(max_results) => { 622 Ok(max_results) => {
500 self.notify_of_results(results_buffer); 623 self.notify_of_results(results_buffer);
501 // Clear the buffer since we don't need the results anymore. 624 // Clear the buffer since we don't need the results anymore.
502 // Helps prevent a copy if we resize the buffer. 625 // Helps prevent a copy if we resize the buffer.
503 results_buffer.clear(); 626 results_buffer.clear();
504 // Increase the size of the buffer if there are more results 627 // Increase the size of the buffer if there are more results
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
540 663
541 /// Provides a scope to modify the current thread's runloop. 664 /// Provides a scope to modify the current thread's runloop.
542 pub fn with_current<F>(modifier: F) 665 pub fn with_current<F>(modifier: F)
543 where F: FnOnce(&mut RunLoop) 666 where F: FnOnce(&mut RunLoop)
544 { 667 {
545 TL_RUN_LOOP.with(|ref_runloop| { 668 TL_RUN_LOOP.with(|ref_runloop| {
546 let mut runloop = ref_runloop.borrow_mut(); 669 let mut runloop = ref_runloop.borrow_mut();
547 modifier(&mut *runloop); 670 modifier(&mut *runloop);
548 }); 671 });
549 } 672 }
OLDNEW
« no previous file with comments | « no previous file | mojo/public/rust/tests/run_loop.rs » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698