OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "content/renderer/scheduler/task_queue_manager.h" | |
6 | |
7 #include <queue> | |
8 #include <set> | |
9 | |
10 #include "base/bind.h" | |
11 #include "base/trace_event/trace_event.h" | |
12 #include "base/trace_event/trace_event_argument.h" | |
13 #include "cc/test/test_now_source.h" | |
14 #include "content/renderer/scheduler/nestable_single_thread_task_runner.h" | |
15 #include "content/renderer/scheduler/task_queue_selector.h" | |
16 | |
17 namespace { | |
18 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max(); | |
19 } | |
20 | |
21 namespace content { | |
22 namespace internal { | |
23 | |
24 // Now() is somewhat expensive so it makes sense not to call Now() unless we | |
25 // really need to. | |
26 class LazyNow { | |
27 public: | |
28 explicit LazyNow(base::TimeTicks now) | |
29 : task_queue_manager_(nullptr), now_(now) { | |
30 DCHECK(!now.is_null()); | |
31 } | |
32 | |
33 explicit LazyNow(TaskQueueManager* task_queue_manager) | |
34 : task_queue_manager_(task_queue_manager) {} | |
35 | |
36 base::TimeTicks Now() { | |
37 if (now_.is_null()) | |
38 now_ = task_queue_manager_->Now(); | |
39 return now_; | |
40 } | |
41 | |
42 private: | |
43 TaskQueueManager* task_queue_manager_; // NOT OWNED | |
44 base::TimeTicks now_; | |
45 }; | |
46 | |
47 class TaskQueue : public base::SingleThreadTaskRunner { | |
48 public: | |
49 TaskQueue(TaskQueueManager* task_queue_manager); | |
50 | |
51 // base::SingleThreadTaskRunner implementation. | |
52 bool RunsTasksOnCurrentThread() const override; | |
53 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
54 const base::Closure& task, | |
55 base::TimeDelta delay) override { | |
56 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL); | |
57 } | |
58 | |
59 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
60 const base::Closure& task, | |
61 base::TimeDelta delay) override { | |
62 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE); | |
63 } | |
64 | |
65 bool IsQueueEmpty() const; | |
66 | |
67 void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy); | |
68 void PumpQueue(); | |
69 | |
70 bool NextPendingDelayedTaskRunTime( | |
71 base::TimeTicks* next_pending_delayed_task); | |
72 | |
73 bool UpdateWorkQueue(LazyNow* lazy_now, | |
74 const base::PendingTask* previous_task); | |
75 base::PendingTask TakeTaskFromWorkQueue(); | |
76 | |
77 void WillDeleteTaskQueueManager(); | |
78 | |
79 base::TaskQueue& work_queue() { return work_queue_; } | |
80 | |
81 void set_name(const char* name) { name_ = name; } | |
82 | |
83 void AsValueInto(base::trace_event::TracedValue* state) const; | |
84 | |
85 private: | |
86 enum class TaskType { | |
87 NORMAL, | |
88 NON_NESTABLE, | |
89 }; | |
90 | |
91 ~TaskQueue() override; | |
92 | |
93 bool PostDelayedTaskImpl(const tracked_objects::Location& from_here, | |
94 const base::Closure& task, | |
95 base::TimeDelta delay, | |
96 TaskType task_type); | |
97 | |
98 // Delayed task posted to the underlying run loop, which locks |lock_| and | |
99 // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks | |
100 // that need to be run now. | |
101 void MoveReadyDelayedTasksToIncomingQueue(); | |
102 | |
103 // Enqueues any delayed tasks which should be run now on the incoming_queue_ | |
104 // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled. | |
105 // Must be called with |lock_| locked. | |
106 void MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now); | |
107 | |
108 // Posts MoveReadyDelayedTasksToIncomingQueue if there isn't already a task | |
109 // posted on the underlying runloop for the next task's scheduled run time. | |
110 void ScheduleDelayedWorkLocked(LazyNow* lazy_now); | |
111 | |
112 void PumpQueueLocked(); | |
113 bool TaskIsOlderThanQueuedTasks(const base::PendingTask* task); | |
114 bool ShouldAutoPumpQueueLocked(const base::PendingTask* previous_task); | |
115 void EnqueueTaskLocked(const base::PendingTask& pending_task); | |
116 | |
117 void TraceQueueSize(bool is_locked) const; | |
118 static const char* PumpPolicyToString( | |
119 TaskQueueManager::PumpPolicy pump_policy); | |
120 static void QueueAsValueInto(const base::TaskQueue& queue, | |
121 base::trace_event::TracedValue* state); | |
122 static void QueueAsValueInto(const base::DelayedTaskQueue& queue, | |
123 base::trace_event::TracedValue* state); | |
124 static void TaskAsValueInto(const base::PendingTask& task, | |
125 base::trace_event::TracedValue* state); | |
126 | |
127 // This lock protects all members except the work queue and the | |
128 // main_thread_checker_. | |
129 mutable base::Lock lock_; | |
130 base::PlatformThreadId thread_id_; | |
131 TaskQueueManager* task_queue_manager_; | |
132 base::TaskQueue incoming_queue_; | |
133 TaskQueueManager::PumpPolicy pump_policy_; | |
134 const char* name_; | |
135 base::DelayedTaskQueue delayed_task_queue_; | |
136 std::set<base::TimeTicks> in_flight_kick_delayed_tasks_; | |
137 | |
138 base::ThreadChecker main_thread_checker_; | |
139 base::TaskQueue work_queue_; | |
140 | |
141 DISALLOW_COPY_AND_ASSIGN(TaskQueue); | |
142 }; | |
143 | |
144 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) | |
145 : thread_id_(base::PlatformThread::CurrentId()), | |
146 task_queue_manager_(task_queue_manager), | |
147 pump_policy_(TaskQueueManager::PumpPolicy::AUTO), | |
148 name_(nullptr) { | |
149 } | |
150 | |
151 TaskQueue::~TaskQueue() { | |
152 } | |
153 | |
154 void TaskQueue::WillDeleteTaskQueueManager() { | |
155 base::AutoLock lock(lock_); | |
156 task_queue_manager_ = nullptr; | |
157 // TODO(scheduler-dev): Should we also clear the other queues here too? | |
158 delayed_task_queue_ = base::DelayedTaskQueue(); | |
159 } | |
160 | |
161 bool TaskQueue::RunsTasksOnCurrentThread() const { | |
162 base::AutoLock lock(lock_); | |
163 return base::PlatformThread::CurrentId() == thread_id_; | |
164 } | |
165 | |
166 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here, | |
167 const base::Closure& task, | |
168 base::TimeDelta delay, | |
169 TaskType task_type) { | |
170 base::AutoLock lock(lock_); | |
171 if (!task_queue_manager_) | |
172 return false; | |
173 | |
174 base::PendingTask pending_task(from_here, task, base::TimeTicks(), | |
175 task_type != TaskType::NON_NESTABLE); | |
176 task_queue_manager_->DidQueueTask(&pending_task); | |
177 | |
178 if (delay > base::TimeDelta()) { | |
179 base::TimeTicks now = task_queue_manager_->Now(); | |
180 pending_task.delayed_run_time = now + delay; | |
181 delayed_task_queue_.push(pending_task); | |
182 TraceQueueSize(true); | |
183 // If we changed the topmost task, then it is time to reschedule. | |
184 if (delayed_task_queue_.top().task.Equals(pending_task.task)) { | |
185 LazyNow lazy_now(now); | |
186 ScheduleDelayedWorkLocked(&lazy_now); | |
187 } | |
188 return true; | |
189 } | |
190 EnqueueTaskLocked(pending_task); | |
191 return true; | |
192 } | |
193 | |
194 void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() { | |
195 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
196 base::AutoLock lock(lock_); | |
197 if (!task_queue_manager_) | |
198 return; | |
199 | |
200 LazyNow lazy_now(task_queue_manager_); | |
201 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); | |
202 } | |
203 | |
204 void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) { | |
205 lock_.AssertAcquired(); | |
206 // Enqueue all delayed tasks that should be running now. | |
207 while (!delayed_task_queue_.empty() && | |
208 delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) { | |
209 in_flight_kick_delayed_tasks_.erase( | |
210 delayed_task_queue_.top().delayed_run_time); | |
211 EnqueueTaskLocked(delayed_task_queue_.top()); | |
212 delayed_task_queue_.pop(); | |
213 } | |
214 TraceQueueSize(true); | |
215 ScheduleDelayedWorkLocked(lazy_now); | |
216 } | |
217 | |
218 void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { | |
219 lock_.AssertAcquired(); | |
220 // Any remaining tasks are in the future, so queue a task to kick them. | |
221 if (!delayed_task_queue_.empty()) { | |
222 base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time; | |
223 DCHECK_GT(next_run_time, lazy_now->Now()); | |
224 // Make sure we don't have more than one | |
225 // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled | |
226 // run time (note it's fine to have multiple ones in flight for distinct | |
227 // run times). | |
228 if (in_flight_kick_delayed_tasks_.find(next_run_time) == | |
229 in_flight_kick_delayed_tasks_.end()) { | |
230 in_flight_kick_delayed_tasks_.insert(next_run_time); | |
231 base::TimeDelta delay = next_run_time - lazy_now->Now(); | |
232 task_queue_manager_->PostDelayedTask( | |
233 FROM_HERE, | |
234 Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay); | |
235 } | |
236 } | |
237 } | |
238 | |
239 bool TaskQueue::IsQueueEmpty() const { | |
240 if (!work_queue_.empty()) | |
241 return false; | |
242 | |
243 { | |
244 base::AutoLock lock(lock_); | |
245 return incoming_queue_.empty(); | |
246 } | |
247 } | |
248 | |
249 bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { | |
250 lock_.AssertAcquired(); | |
251 // A null task is passed when UpdateQueue is called before any task is run. | |
252 // In this case we don't want to pump an after_wakeup queue, so return true | |
253 // here. | |
254 if (!task) | |
255 return true; | |
256 | |
257 // Return false if there are no task in the incoming queue. | |
258 if (incoming_queue_.empty()) | |
259 return false; | |
260 | |
261 base::PendingTask oldest_queued_task = incoming_queue_.front(); | |
262 DCHECK(oldest_queued_task.delayed_run_time.is_null()); | |
263 DCHECK(task->delayed_run_time.is_null()); | |
264 | |
265 // Note: the comparison is correct due to the fact that the PendingTask | |
266 // operator inverts its comparison operation in order to work well in a heap | |
267 // based priority queue. | |
268 return oldest_queued_task < *task; | |
269 } | |
270 | |
271 bool TaskQueue::ShouldAutoPumpQueueLocked( | |
272 const base::PendingTask* previous_task) { | |
273 lock_.AssertAcquired(); | |
274 if (pump_policy_ == TaskQueueManager::PumpPolicy::MANUAL) | |
275 return false; | |
276 if (pump_policy_ == TaskQueueManager::PumpPolicy::AFTER_WAKEUP && | |
277 TaskIsOlderThanQueuedTasks(previous_task)) | |
278 return false; | |
279 if (incoming_queue_.empty()) | |
280 return false; | |
281 return true; | |
282 } | |
283 | |
284 bool TaskQueue::NextPendingDelayedTaskRunTime( | |
285 base::TimeTicks* next_pending_delayed_task) { | |
286 base::AutoLock lock(lock_); | |
287 if (delayed_task_queue_.empty()) | |
288 return false; | |
289 *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time; | |
290 return true; | |
291 } | |
292 | |
293 bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now, | |
294 const base::PendingTask* previous_task) { | |
295 if (!work_queue_.empty()) | |
296 return true; | |
297 | |
298 { | |
299 base::AutoLock lock(lock_); | |
300 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now); | |
301 if (!ShouldAutoPumpQueueLocked(previous_task)) | |
302 return false; | |
303 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now); | |
304 work_queue_.Swap(&incoming_queue_); | |
305 TraceQueueSize(true); | |
306 return true; | |
307 } | |
308 } | |
309 | |
310 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { | |
311 base::PendingTask pending_task = work_queue_.front(); | |
312 work_queue_.pop(); | |
313 TraceQueueSize(false); | |
314 return pending_task; | |
315 } | |
316 | |
317 void TaskQueue::TraceQueueSize(bool is_locked) const { | |
318 bool is_tracing; | |
319 TRACE_EVENT_CATEGORY_GROUP_ENABLED( | |
320 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), &is_tracing); | |
321 if (!is_tracing || !name_) | |
322 return; | |
323 if (!is_locked) | |
324 lock_.Acquire(); | |
325 else | |
326 lock_.AssertAcquired(); | |
327 TRACE_COUNTER1( | |
328 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_, | |
329 incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size()); | |
330 if (!is_locked) | |
331 lock_.Release(); | |
332 } | |
333 | |
334 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { | |
335 lock_.AssertAcquired(); | |
336 if (!task_queue_manager_) | |
337 return; | |
338 if (pump_policy_ == TaskQueueManager::PumpPolicy::AUTO && | |
339 incoming_queue_.empty()) | |
340 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | |
341 incoming_queue_.push(pending_task); | |
342 | |
343 if (!pending_task.delayed_run_time.is_null()) { | |
344 // Clear the delayed run time because we've already applied the delay | |
345 // before getting here. | |
346 incoming_queue_.back().delayed_run_time = base::TimeTicks(); | |
347 } | |
348 TraceQueueSize(true); | |
349 } | |
350 | |
351 void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { | |
352 base::AutoLock lock(lock_); | |
353 if (pump_policy == TaskQueueManager::PumpPolicy::AUTO && | |
354 pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) { | |
355 PumpQueueLocked(); | |
356 } | |
357 pump_policy_ = pump_policy; | |
358 } | |
359 | |
360 void TaskQueue::PumpQueueLocked() { | |
361 lock_.AssertAcquired(); | |
362 if (task_queue_manager_) { | |
363 LazyNow lazy_now(task_queue_manager_); | |
364 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); | |
365 } | |
366 while (!incoming_queue_.empty()) { | |
367 work_queue_.push(incoming_queue_.front()); | |
368 incoming_queue_.pop(); | |
369 } | |
370 if (!work_queue_.empty()) | |
371 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | |
372 } | |
373 | |
374 void TaskQueue::PumpQueue() { | |
375 base::AutoLock lock(lock_); | |
376 PumpQueueLocked(); | |
377 } | |
378 | |
379 void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const { | |
380 base::AutoLock lock(lock_); | |
381 state->BeginDictionary(); | |
382 if (name_) | |
383 state->SetString("name", name_); | |
384 state->SetString("pump_policy", PumpPolicyToString(pump_policy_)); | |
385 state->BeginArray("incoming_queue"); | |
386 QueueAsValueInto(incoming_queue_, state); | |
387 state->EndArray(); | |
388 state->BeginArray("work_queue"); | |
389 QueueAsValueInto(work_queue_, state); | |
390 state->EndArray(); | |
391 state->BeginArray("delayed_task_queue"); | |
392 QueueAsValueInto(delayed_task_queue_, state); | |
393 state->EndArray(); | |
394 state->EndDictionary(); | |
395 } | |
396 | |
397 // static | |
398 const char* TaskQueue::PumpPolicyToString( | |
399 TaskQueueManager::PumpPolicy pump_policy) { | |
400 switch (pump_policy) { | |
401 case TaskQueueManager::PumpPolicy::AUTO: | |
402 return "auto"; | |
403 case TaskQueueManager::PumpPolicy::AFTER_WAKEUP: | |
404 return "after_wakeup"; | |
405 case TaskQueueManager::PumpPolicy::MANUAL: | |
406 return "manual"; | |
407 default: | |
408 NOTREACHED(); | |
409 return nullptr; | |
410 } | |
411 } | |
412 | |
413 // static | |
414 void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue, | |
415 base::trace_event::TracedValue* state) { | |
416 base::TaskQueue queue_copy(queue); | |
417 while (!queue_copy.empty()) { | |
418 TaskAsValueInto(queue_copy.front(), state); | |
419 queue_copy.pop(); | |
420 } | |
421 } | |
422 | |
423 // static | |
424 void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue, | |
425 base::trace_event::TracedValue* state) { | |
426 base::DelayedTaskQueue queue_copy(queue); | |
427 while (!queue_copy.empty()) { | |
428 TaskAsValueInto(queue_copy.top(), state); | |
429 queue_copy.pop(); | |
430 } | |
431 } | |
432 | |
433 // static | |
434 void TaskQueue::TaskAsValueInto(const base::PendingTask& task, | |
435 base::trace_event::TracedValue* state) { | |
436 state->BeginDictionary(); | |
437 state->SetString("posted_from", task.posted_from.ToString()); | |
438 state->SetInteger("sequence_num", task.sequence_num); | |
439 state->SetBoolean("nestable", task.nestable); | |
440 state->SetBoolean("is_high_res", task.is_high_res); | |
441 state->SetDouble( | |
442 "delayed_run_time", | |
443 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L); | |
444 state->EndDictionary(); | |
445 } | |
446 | |
447 } // namespace internal | |
448 | |
449 TaskQueueManager::TaskQueueManager( | |
450 size_t task_queue_count, | |
451 scoped_refptr<NestableSingleThreadTaskRunner> main_task_runner, | |
452 TaskQueueSelector* selector) | |
453 : main_task_runner_(main_task_runner), | |
454 selector_(selector), | |
455 pending_dowork_count_(0), | |
456 work_batch_size_(1), | |
457 time_source_(nullptr), | |
458 weak_factory_(this) { | |
459 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); | |
460 TRACE_EVENT_OBJECT_CREATED_WITH_ID( | |
461 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", | |
462 this); | |
463 | |
464 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); | |
465 for (size_t i = 0; i < task_queue_count; i++) { | |
466 scoped_refptr<internal::TaskQueue> queue( | |
467 make_scoped_refptr(new internal::TaskQueue(this))); | |
468 queues_.push_back(queue); | |
469 } | |
470 | |
471 std::vector<const base::TaskQueue*> work_queues; | |
472 for (const auto& queue: queues_) | |
473 work_queues.push_back(&queue->work_queue()); | |
474 selector_->RegisterWorkQueues(work_queues); | |
475 } | |
476 | |
477 TaskQueueManager::~TaskQueueManager() { | |
478 TRACE_EVENT_OBJECT_DELETED_WITH_ID( | |
479 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", | |
480 this); | |
481 for (auto& queue : queues_) | |
482 queue->WillDeleteTaskQueueManager(); | |
483 } | |
484 | |
485 internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const { | |
486 DCHECK_LT(queue_index, queues_.size()); | |
487 return queues_[queue_index].get(); | |
488 } | |
489 | |
490 scoped_refptr<base::SingleThreadTaskRunner> | |
491 TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const { | |
492 return Queue(queue_index); | |
493 } | |
494 | |
495 bool TaskQueueManager::IsQueueEmpty(size_t queue_index) const { | |
496 internal::TaskQueue* queue = Queue(queue_index); | |
497 return queue->IsQueueEmpty(); | |
498 } | |
499 | |
500 base::TimeTicks TaskQueueManager::NextPendingDelayedTaskRunTime() { | |
501 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
502 bool found_pending_task = false; | |
503 base::TimeTicks next_pending_delayed_task( | |
504 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); | |
505 for (auto& queue : queues_) { | |
506 base::TimeTicks queues_next_pending_delayed_task; | |
507 if (queue->NextPendingDelayedTaskRunTime( | |
508 &queues_next_pending_delayed_task)) { | |
509 found_pending_task = true; | |
510 next_pending_delayed_task = | |
511 std::min(next_pending_delayed_task, queues_next_pending_delayed_task); | |
512 } | |
513 } | |
514 | |
515 if (!found_pending_task) | |
516 return base::TimeTicks(); | |
517 | |
518 DCHECK_NE(next_pending_delayed_task, | |
519 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); | |
520 return next_pending_delayed_task; | |
521 } | |
522 | |
523 void TaskQueueManager::SetPumpPolicy(size_t queue_index, | |
524 PumpPolicy pump_policy) { | |
525 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
526 internal::TaskQueue* queue = Queue(queue_index); | |
527 queue->SetPumpPolicy(pump_policy); | |
528 } | |
529 | |
530 void TaskQueueManager::PumpQueue(size_t queue_index) { | |
531 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
532 internal::TaskQueue* queue = Queue(queue_index); | |
533 queue->PumpQueue(); | |
534 } | |
535 | |
536 bool TaskQueueManager::UpdateWorkQueues( | |
537 const base::PendingTask* previous_task) { | |
538 // TODO(skyostil): This is not efficient when the number of queues grows very | |
539 // large due to the number of locks taken. Consider optimizing when we get | |
540 // there. | |
541 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
542 internal::LazyNow lazy_now(this); | |
543 bool has_work = false; | |
544 for (auto& queue : queues_) { | |
545 has_work |= queue->UpdateWorkQueue(&lazy_now, previous_task); | |
546 if (!queue->work_queue().empty()) { | |
547 // Currently we should not be getting tasks with delayed run times in any | |
548 // of the work queues. | |
549 DCHECK(queue->work_queue().front().delayed_run_time.is_null()); | |
550 } | |
551 } | |
552 return has_work; | |
553 } | |
554 | |
555 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { | |
556 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); | |
557 if (on_main_thread) { | |
558 // We only want one pending DoWork posted from the main thread, or we risk | |
559 // an explosion of pending DoWorks which could starve out everything else. | |
560 if (pending_dowork_count_ > 0) { | |
561 return; | |
562 } | |
563 pending_dowork_count_++; | |
564 } | |
565 | |
566 main_task_runner_->PostTask( | |
567 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, | |
568 on_main_thread)); | |
569 } | |
570 | |
571 void TaskQueueManager::DoWork(bool posted_from_main_thread) { | |
572 if (posted_from_main_thread) { | |
573 pending_dowork_count_--; | |
574 DCHECK_GE(pending_dowork_count_, 0); | |
575 } | |
576 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
577 | |
578 // Pass nullptr to UpdateWorkQueues here to prevent waking up a | |
579 // pump-after-wakeup queue. | |
580 if (!UpdateWorkQueues(nullptr)) | |
581 return; | |
582 | |
583 base::PendingTask previous_task((tracked_objects::Location()), | |
584 (base::Closure())); | |
585 for (int i = 0; i < work_batch_size_; i++) { | |
586 size_t queue_index; | |
587 if (!SelectWorkQueueToService(&queue_index)) | |
588 return; | |
589 // Note that this function won't post another call to DoWork if one is | |
590 // already pending, so it is safe to call it in a loop. | |
591 MaybePostDoWorkOnMainRunner(); | |
592 ProcessTaskFromWorkQueue(queue_index, i > 0, &previous_task); | |
593 | |
594 if (!UpdateWorkQueues(&previous_task)) | |
595 return; | |
596 } | |
597 } | |
598 | |
599 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { | |
600 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); | |
601 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( | |
602 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, | |
603 AsValueWithSelectorResult(should_run, *out_queue_index)); | |
604 return should_run; | |
605 } | |
606 | |
607 void TaskQueueManager::DidQueueTask(base::PendingTask* pending_task) { | |
608 pending_task->sequence_num = task_sequence_num_.GetNext(); | |
609 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", *pending_task); | |
610 } | |
611 | |
612 void TaskQueueManager::ProcessTaskFromWorkQueue( | |
613 size_t queue_index, | |
614 bool has_previous_task, | |
615 base::PendingTask* previous_task) { | |
616 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
617 internal::TaskQueue* queue = Queue(queue_index); | |
618 base::PendingTask pending_task = queue->TakeTaskFromWorkQueue(); | |
619 if (!pending_task.nestable && main_task_runner_->IsNested()) { | |
620 // Defer non-nestable work to the main task runner. NOTE these tasks can be | |
621 // arbitrarily delayed so the additional delay should not be a problem. | |
622 main_task_runner_->PostNonNestableTask(pending_task.posted_from, | |
623 pending_task.task); | |
624 } else { | |
625 // Suppress "will" task observer notifications for the first and "did" | |
626 // notifications for the last task in the batch to avoid duplicate | |
627 // notifications. | |
628 if (has_previous_task) { | |
629 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
630 DidProcessTask(*previous_task)); | |
631 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
632 WillProcessTask(pending_task)); | |
633 } | |
634 task_annotator_.RunTask("TaskQueueManager::PostTask", | |
635 "TaskQueueManager::RunTask", pending_task); | |
636 pending_task.task.Reset(); | |
637 *previous_task = pending_task; | |
638 } | |
639 } | |
640 | |
641 bool TaskQueueManager::RunsTasksOnCurrentThread() const { | |
642 return main_task_runner_->RunsTasksOnCurrentThread(); | |
643 } | |
644 | |
645 bool TaskQueueManager::PostDelayedTask( | |
646 const tracked_objects::Location& from_here, | |
647 const base::Closure& task, | |
648 base::TimeDelta delay) { | |
649 DCHECK(delay > base::TimeDelta()); | |
650 return main_task_runner_->PostDelayedTask(from_here, task, delay); | |
651 } | |
652 | |
653 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { | |
654 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
655 internal::TaskQueue* queue = Queue(queue_index); | |
656 queue->set_name(name); | |
657 } | |
658 | |
659 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { | |
660 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
661 DCHECK_GE(work_batch_size, 1); | |
662 work_batch_size_ = work_batch_size; | |
663 } | |
664 | |
665 void TaskQueueManager::AddTaskObserver( | |
666 base::MessageLoop::TaskObserver* task_observer) { | |
667 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
668 base::MessageLoop::current()->AddTaskObserver(task_observer); | |
669 task_observers_.AddObserver(task_observer); | |
670 } | |
671 | |
672 void TaskQueueManager::RemoveTaskObserver( | |
673 base::MessageLoop::TaskObserver* task_observer) { | |
674 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
675 base::MessageLoop::current()->RemoveTaskObserver(task_observer); | |
676 task_observers_.RemoveObserver(task_observer); | |
677 } | |
678 | |
679 void TaskQueueManager::SetTimeSourceForTesting( | |
680 scoped_refptr<cc::TestNowSource> time_source) { | |
681 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
682 time_source_ = time_source; | |
683 } | |
684 | |
685 base::TimeTicks TaskQueueManager::Now() const { | |
686 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now(); | |
687 } | |
688 | |
689 scoped_refptr<base::trace_event::ConvertableToTraceFormat> | |
690 TaskQueueManager::AsValueWithSelectorResult(bool should_run, | |
691 size_t selected_queue) const { | |
692 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
693 scoped_refptr<base::trace_event::TracedValue> state = | |
694 new base::trace_event::TracedValue(); | |
695 state->BeginArray("queues"); | |
696 for (auto& queue : queues_) | |
697 queue->AsValueInto(state.get()); | |
698 state->EndArray(); | |
699 state->BeginDictionary("selector"); | |
700 selector_->AsValueInto(state.get()); | |
701 state->EndDictionary(); | |
702 if (should_run) | |
703 state->SetInteger("selected_queue", selected_queue); | |
704 return state; | |
705 } | |
706 | |
707 } // namespace content | |
OLD | NEW |