Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(25)

Side by Side Diff: content/renderer/scheduler/task_queue_manager.cc

Issue 1025323003: Introduce a SchedulerHelper in content/child/scheduler (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Sami's comments Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/renderer/scheduler/task_queue_manager.h"
6
7 #include <queue>
8 #include <set>
9
10 #include "base/bind.h"
11 #include "base/trace_event/trace_event.h"
12 #include "base/trace_event/trace_event_argument.h"
13 #include "cc/test/test_now_source.h"
14 #include "content/renderer/scheduler/nestable_single_thread_task_runner.h"
15 #include "content/renderer/scheduler/task_queue_selector.h"
16
17 namespace {
18 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
19 }
20
21 namespace content {
22 namespace internal {
23
24 // Now() is somewhat expensive so it makes sense not to call Now() unless we
25 // really need to.
26 class LazyNow {
27 public:
28 explicit LazyNow(base::TimeTicks now)
29 : task_queue_manager_(nullptr), now_(now) {
30 DCHECK(!now.is_null());
31 }
32
33 explicit LazyNow(TaskQueueManager* task_queue_manager)
34 : task_queue_manager_(task_queue_manager) {}
35
36 base::TimeTicks Now() {
37 if (now_.is_null())
38 now_ = task_queue_manager_->Now();
39 return now_;
40 }
41
42 private:
43 TaskQueueManager* task_queue_manager_; // NOT OWNED
44 base::TimeTicks now_;
45 };
46
47 class TaskQueue : public base::SingleThreadTaskRunner {
48 public:
49 TaskQueue(TaskQueueManager* task_queue_manager);
50
51 // base::SingleThreadTaskRunner implementation.
52 bool RunsTasksOnCurrentThread() const override;
53 bool PostDelayedTask(const tracked_objects::Location& from_here,
54 const base::Closure& task,
55 base::TimeDelta delay) override {
56 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL);
57 }
58
59 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
60 const base::Closure& task,
61 base::TimeDelta delay) override {
62 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE);
63 }
64
65 bool IsQueueEmpty() const;
66
67 void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy);
68 void PumpQueue();
69
70 bool NextPendingDelayedTaskRunTime(
71 base::TimeTicks* next_pending_delayed_task);
72
73 bool UpdateWorkQueue(LazyNow* lazy_now,
74 const base::PendingTask* previous_task);
75 base::PendingTask TakeTaskFromWorkQueue();
76
77 void WillDeleteTaskQueueManager();
78
79 base::TaskQueue& work_queue() { return work_queue_; }
80
81 void set_name(const char* name) { name_ = name; }
82
83 void AsValueInto(base::trace_event::TracedValue* state) const;
84
85 private:
86 enum class TaskType {
87 NORMAL,
88 NON_NESTABLE,
89 };
90
91 ~TaskQueue() override;
92
93 bool PostDelayedTaskImpl(const tracked_objects::Location& from_here,
94 const base::Closure& task,
95 base::TimeDelta delay,
96 TaskType task_type);
97
98 // Delayed task posted to the underlying run loop, which locks |lock_| and
99 // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks
100 // that need to be run now.
101 void MoveReadyDelayedTasksToIncomingQueue();
102
103 // Enqueues any delayed tasks which should be run now on the incoming_queue_
104 // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled.
105 // Must be called with |lock_| locked.
106 void MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now);
107
108 // Posts MoveReadyDelayedTasksToIncomingQueue if there isn't already a task
109 // posted on the underlying runloop for the next task's scheduled run time.
110 void ScheduleDelayedWorkLocked(LazyNow* lazy_now);
111
112 void PumpQueueLocked();
113 bool TaskIsOlderThanQueuedTasks(const base::PendingTask* task);
114 bool ShouldAutoPumpQueueLocked(const base::PendingTask* previous_task);
115 void EnqueueTaskLocked(const base::PendingTask& pending_task);
116
117 void TraceQueueSize(bool is_locked) const;
118 static const char* PumpPolicyToString(
119 TaskQueueManager::PumpPolicy pump_policy);
120 static void QueueAsValueInto(const base::TaskQueue& queue,
121 base::trace_event::TracedValue* state);
122 static void QueueAsValueInto(const base::DelayedTaskQueue& queue,
123 base::trace_event::TracedValue* state);
124 static void TaskAsValueInto(const base::PendingTask& task,
125 base::trace_event::TracedValue* state);
126
127 // This lock protects all members except the work queue and the
128 // main_thread_checker_.
129 mutable base::Lock lock_;
130 base::PlatformThreadId thread_id_;
131 TaskQueueManager* task_queue_manager_;
132 base::TaskQueue incoming_queue_;
133 TaskQueueManager::PumpPolicy pump_policy_;
134 const char* name_;
135 base::DelayedTaskQueue delayed_task_queue_;
136 std::set<base::TimeTicks> in_flight_kick_delayed_tasks_;
137
138 base::ThreadChecker main_thread_checker_;
139 base::TaskQueue work_queue_;
140
141 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
142 };
143
144 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager)
145 : thread_id_(base::PlatformThread::CurrentId()),
146 task_queue_manager_(task_queue_manager),
147 pump_policy_(TaskQueueManager::PumpPolicy::AUTO),
148 name_(nullptr) {
149 }
150
151 TaskQueue::~TaskQueue() {
152 }
153
154 void TaskQueue::WillDeleteTaskQueueManager() {
155 base::AutoLock lock(lock_);
156 task_queue_manager_ = nullptr;
157 // TODO(scheduler-dev): Should we also clear the other queues here too?
158 delayed_task_queue_ = base::DelayedTaskQueue();
159 }
160
161 bool TaskQueue::RunsTasksOnCurrentThread() const {
162 base::AutoLock lock(lock_);
163 return base::PlatformThread::CurrentId() == thread_id_;
164 }
165
166 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
167 const base::Closure& task,
168 base::TimeDelta delay,
169 TaskType task_type) {
170 base::AutoLock lock(lock_);
171 if (!task_queue_manager_)
172 return false;
173
174 base::PendingTask pending_task(from_here, task, base::TimeTicks(),
175 task_type != TaskType::NON_NESTABLE);
176 task_queue_manager_->DidQueueTask(&pending_task);
177
178 if (delay > base::TimeDelta()) {
179 base::TimeTicks now = task_queue_manager_->Now();
180 pending_task.delayed_run_time = now + delay;
181 delayed_task_queue_.push(pending_task);
182 TraceQueueSize(true);
183 // If we changed the topmost task, then it is time to reschedule.
184 if (delayed_task_queue_.top().task.Equals(pending_task.task)) {
185 LazyNow lazy_now(now);
186 ScheduleDelayedWorkLocked(&lazy_now);
187 }
188 return true;
189 }
190 EnqueueTaskLocked(pending_task);
191 return true;
192 }
193
194 void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() {
195 DCHECK(main_thread_checker_.CalledOnValidThread());
196 base::AutoLock lock(lock_);
197 if (!task_queue_manager_)
198 return;
199
200 LazyNow lazy_now(task_queue_manager_);
201 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
202 }
203
204 void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) {
205 lock_.AssertAcquired();
206 // Enqueue all delayed tasks that should be running now.
207 while (!delayed_task_queue_.empty() &&
208 delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) {
209 in_flight_kick_delayed_tasks_.erase(
210 delayed_task_queue_.top().delayed_run_time);
211 EnqueueTaskLocked(delayed_task_queue_.top());
212 delayed_task_queue_.pop();
213 }
214 TraceQueueSize(true);
215 ScheduleDelayedWorkLocked(lazy_now);
216 }
217
218 void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) {
219 lock_.AssertAcquired();
220 // Any remaining tasks are in the future, so queue a task to kick them.
221 if (!delayed_task_queue_.empty()) {
222 base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time;
223 DCHECK_GT(next_run_time, lazy_now->Now());
224 // Make sure we don't have more than one
225 // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled
226 // run time (note it's fine to have multiple ones in flight for distinct
227 // run times).
228 if (in_flight_kick_delayed_tasks_.find(next_run_time) ==
229 in_flight_kick_delayed_tasks_.end()) {
230 in_flight_kick_delayed_tasks_.insert(next_run_time);
231 base::TimeDelta delay = next_run_time - lazy_now->Now();
232 task_queue_manager_->PostDelayedTask(
233 FROM_HERE,
234 Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay);
235 }
236 }
237 }
238
239 bool TaskQueue::IsQueueEmpty() const {
240 if (!work_queue_.empty())
241 return false;
242
243 {
244 base::AutoLock lock(lock_);
245 return incoming_queue_.empty();
246 }
247 }
248
249 bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) {
250 lock_.AssertAcquired();
251 // A null task is passed when UpdateQueue is called before any task is run.
252 // In this case we don't want to pump an after_wakeup queue, so return true
253 // here.
254 if (!task)
255 return true;
256
257 // Return false if there are no task in the incoming queue.
258 if (incoming_queue_.empty())
259 return false;
260
261 base::PendingTask oldest_queued_task = incoming_queue_.front();
262 DCHECK(oldest_queued_task.delayed_run_time.is_null());
263 DCHECK(task->delayed_run_time.is_null());
264
265 // Note: the comparison is correct due to the fact that the PendingTask
266 // operator inverts its comparison operation in order to work well in a heap
267 // based priority queue.
268 return oldest_queued_task < *task;
269 }
270
271 bool TaskQueue::ShouldAutoPumpQueueLocked(
272 const base::PendingTask* previous_task) {
273 lock_.AssertAcquired();
274 if (pump_policy_ == TaskQueueManager::PumpPolicy::MANUAL)
275 return false;
276 if (pump_policy_ == TaskQueueManager::PumpPolicy::AFTER_WAKEUP &&
277 TaskIsOlderThanQueuedTasks(previous_task))
278 return false;
279 if (incoming_queue_.empty())
280 return false;
281 return true;
282 }
283
284 bool TaskQueue::NextPendingDelayedTaskRunTime(
285 base::TimeTicks* next_pending_delayed_task) {
286 base::AutoLock lock(lock_);
287 if (delayed_task_queue_.empty())
288 return false;
289 *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time;
290 return true;
291 }
292
293 bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now,
294 const base::PendingTask* previous_task) {
295 if (!work_queue_.empty())
296 return true;
297
298 {
299 base::AutoLock lock(lock_);
300 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
301 if (!ShouldAutoPumpQueueLocked(previous_task))
302 return false;
303 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
304 work_queue_.Swap(&incoming_queue_);
305 TraceQueueSize(true);
306 return true;
307 }
308 }
309
310 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
311 base::PendingTask pending_task = work_queue_.front();
312 work_queue_.pop();
313 TraceQueueSize(false);
314 return pending_task;
315 }
316
317 void TaskQueue::TraceQueueSize(bool is_locked) const {
318 bool is_tracing;
319 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
320 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), &is_tracing);
321 if (!is_tracing || !name_)
322 return;
323 if (!is_locked)
324 lock_.Acquire();
325 else
326 lock_.AssertAcquired();
327 TRACE_COUNTER1(
328 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_,
329 incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size());
330 if (!is_locked)
331 lock_.Release();
332 }
333
334 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
335 lock_.AssertAcquired();
336 if (!task_queue_manager_)
337 return;
338 if (pump_policy_ == TaskQueueManager::PumpPolicy::AUTO &&
339 incoming_queue_.empty())
340 task_queue_manager_->MaybePostDoWorkOnMainRunner();
341 incoming_queue_.push(pending_task);
342
343 if (!pending_task.delayed_run_time.is_null()) {
344 // Clear the delayed run time because we've already applied the delay
345 // before getting here.
346 incoming_queue_.back().delayed_run_time = base::TimeTicks();
347 }
348 TraceQueueSize(true);
349 }
350
351 void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
352 base::AutoLock lock(lock_);
353 if (pump_policy == TaskQueueManager::PumpPolicy::AUTO &&
354 pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) {
355 PumpQueueLocked();
356 }
357 pump_policy_ = pump_policy;
358 }
359
360 void TaskQueue::PumpQueueLocked() {
361 lock_.AssertAcquired();
362 if (task_queue_manager_) {
363 LazyNow lazy_now(task_queue_manager_);
364 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
365 }
366 while (!incoming_queue_.empty()) {
367 work_queue_.push(incoming_queue_.front());
368 incoming_queue_.pop();
369 }
370 if (!work_queue_.empty())
371 task_queue_manager_->MaybePostDoWorkOnMainRunner();
372 }
373
374 void TaskQueue::PumpQueue() {
375 base::AutoLock lock(lock_);
376 PumpQueueLocked();
377 }
378
379 void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const {
380 base::AutoLock lock(lock_);
381 state->BeginDictionary();
382 if (name_)
383 state->SetString("name", name_);
384 state->SetString("pump_policy", PumpPolicyToString(pump_policy_));
385 state->BeginArray("incoming_queue");
386 QueueAsValueInto(incoming_queue_, state);
387 state->EndArray();
388 state->BeginArray("work_queue");
389 QueueAsValueInto(work_queue_, state);
390 state->EndArray();
391 state->BeginArray("delayed_task_queue");
392 QueueAsValueInto(delayed_task_queue_, state);
393 state->EndArray();
394 state->EndDictionary();
395 }
396
397 // static
398 const char* TaskQueue::PumpPolicyToString(
399 TaskQueueManager::PumpPolicy pump_policy) {
400 switch (pump_policy) {
401 case TaskQueueManager::PumpPolicy::AUTO:
402 return "auto";
403 case TaskQueueManager::PumpPolicy::AFTER_WAKEUP:
404 return "after_wakeup";
405 case TaskQueueManager::PumpPolicy::MANUAL:
406 return "manual";
407 default:
408 NOTREACHED();
409 return nullptr;
410 }
411 }
412
413 // static
414 void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue,
415 base::trace_event::TracedValue* state) {
416 base::TaskQueue queue_copy(queue);
417 while (!queue_copy.empty()) {
418 TaskAsValueInto(queue_copy.front(), state);
419 queue_copy.pop();
420 }
421 }
422
423 // static
424 void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue,
425 base::trace_event::TracedValue* state) {
426 base::DelayedTaskQueue queue_copy(queue);
427 while (!queue_copy.empty()) {
428 TaskAsValueInto(queue_copy.top(), state);
429 queue_copy.pop();
430 }
431 }
432
433 // static
434 void TaskQueue::TaskAsValueInto(const base::PendingTask& task,
435 base::trace_event::TracedValue* state) {
436 state->BeginDictionary();
437 state->SetString("posted_from", task.posted_from.ToString());
438 state->SetInteger("sequence_num", task.sequence_num);
439 state->SetBoolean("nestable", task.nestable);
440 state->SetBoolean("is_high_res", task.is_high_res);
441 state->SetDouble(
442 "delayed_run_time",
443 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L);
444 state->EndDictionary();
445 }
446
447 } // namespace internal
448
449 TaskQueueManager::TaskQueueManager(
450 size_t task_queue_count,
451 scoped_refptr<NestableSingleThreadTaskRunner> main_task_runner,
452 TaskQueueSelector* selector)
453 : main_task_runner_(main_task_runner),
454 selector_(selector),
455 pending_dowork_count_(0),
456 work_batch_size_(1),
457 time_source_(nullptr),
458 weak_factory_(this) {
459 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
460 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
461 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
462 this);
463
464 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr();
465 for (size_t i = 0; i < task_queue_count; i++) {
466 scoped_refptr<internal::TaskQueue> queue(
467 make_scoped_refptr(new internal::TaskQueue(this)));
468 queues_.push_back(queue);
469 }
470
471 std::vector<const base::TaskQueue*> work_queues;
472 for (const auto& queue: queues_)
473 work_queues.push_back(&queue->work_queue());
474 selector_->RegisterWorkQueues(work_queues);
475 }
476
477 TaskQueueManager::~TaskQueueManager() {
478 TRACE_EVENT_OBJECT_DELETED_WITH_ID(
479 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
480 this);
481 for (auto& queue : queues_)
482 queue->WillDeleteTaskQueueManager();
483 }
484
485 internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const {
486 DCHECK_LT(queue_index, queues_.size());
487 return queues_[queue_index].get();
488 }
489
490 scoped_refptr<base::SingleThreadTaskRunner>
491 TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const {
492 return Queue(queue_index);
493 }
494
495 bool TaskQueueManager::IsQueueEmpty(size_t queue_index) const {
496 internal::TaskQueue* queue = Queue(queue_index);
497 return queue->IsQueueEmpty();
498 }
499
500 base::TimeTicks TaskQueueManager::NextPendingDelayedTaskRunTime() {
501 DCHECK(main_thread_checker_.CalledOnValidThread());
502 bool found_pending_task = false;
503 base::TimeTicks next_pending_delayed_task(
504 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
505 for (auto& queue : queues_) {
506 base::TimeTicks queues_next_pending_delayed_task;
507 if (queue->NextPendingDelayedTaskRunTime(
508 &queues_next_pending_delayed_task)) {
509 found_pending_task = true;
510 next_pending_delayed_task =
511 std::min(next_pending_delayed_task, queues_next_pending_delayed_task);
512 }
513 }
514
515 if (!found_pending_task)
516 return base::TimeTicks();
517
518 DCHECK_NE(next_pending_delayed_task,
519 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
520 return next_pending_delayed_task;
521 }
522
523 void TaskQueueManager::SetPumpPolicy(size_t queue_index,
524 PumpPolicy pump_policy) {
525 DCHECK(main_thread_checker_.CalledOnValidThread());
526 internal::TaskQueue* queue = Queue(queue_index);
527 queue->SetPumpPolicy(pump_policy);
528 }
529
530 void TaskQueueManager::PumpQueue(size_t queue_index) {
531 DCHECK(main_thread_checker_.CalledOnValidThread());
532 internal::TaskQueue* queue = Queue(queue_index);
533 queue->PumpQueue();
534 }
535
536 bool TaskQueueManager::UpdateWorkQueues(
537 const base::PendingTask* previous_task) {
538 // TODO(skyostil): This is not efficient when the number of queues grows very
539 // large due to the number of locks taken. Consider optimizing when we get
540 // there.
541 DCHECK(main_thread_checker_.CalledOnValidThread());
542 internal::LazyNow lazy_now(this);
543 bool has_work = false;
544 for (auto& queue : queues_) {
545 has_work |= queue->UpdateWorkQueue(&lazy_now, previous_task);
546 if (!queue->work_queue().empty()) {
547 // Currently we should not be getting tasks with delayed run times in any
548 // of the work queues.
549 DCHECK(queue->work_queue().front().delayed_run_time.is_null());
550 }
551 }
552 return has_work;
553 }
554
555 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
556 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
557 if (on_main_thread) {
558 // We only want one pending DoWork posted from the main thread, or we risk
559 // an explosion of pending DoWorks which could starve out everything else.
560 if (pending_dowork_count_ > 0) {
561 return;
562 }
563 pending_dowork_count_++;
564 }
565
566 main_task_runner_->PostTask(
567 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_,
568 on_main_thread));
569 }
570
571 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
572 if (posted_from_main_thread) {
573 pending_dowork_count_--;
574 DCHECK_GE(pending_dowork_count_, 0);
575 }
576 DCHECK(main_thread_checker_.CalledOnValidThread());
577
578 // Pass nullptr to UpdateWorkQueues here to prevent waking up a
579 // pump-after-wakeup queue.
580 if (!UpdateWorkQueues(nullptr))
581 return;
582
583 base::PendingTask previous_task((tracked_objects::Location()),
584 (base::Closure()));
585 for (int i = 0; i < work_batch_size_; i++) {
586 size_t queue_index;
587 if (!SelectWorkQueueToService(&queue_index))
588 return;
589 // Note that this function won't post another call to DoWork if one is
590 // already pending, so it is safe to call it in a loop.
591 MaybePostDoWorkOnMainRunner();
592 ProcessTaskFromWorkQueue(queue_index, i > 0, &previous_task);
593
594 if (!UpdateWorkQueues(&previous_task))
595 return;
596 }
597 }
598
599 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
600 bool should_run = selector_->SelectWorkQueueToService(out_queue_index);
601 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
602 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
603 AsValueWithSelectorResult(should_run, *out_queue_index));
604 return should_run;
605 }
606
607 void TaskQueueManager::DidQueueTask(base::PendingTask* pending_task) {
608 pending_task->sequence_num = task_sequence_num_.GetNext();
609 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", *pending_task);
610 }
611
612 void TaskQueueManager::ProcessTaskFromWorkQueue(
613 size_t queue_index,
614 bool has_previous_task,
615 base::PendingTask* previous_task) {
616 DCHECK(main_thread_checker_.CalledOnValidThread());
617 internal::TaskQueue* queue = Queue(queue_index);
618 base::PendingTask pending_task = queue->TakeTaskFromWorkQueue();
619 if (!pending_task.nestable && main_task_runner_->IsNested()) {
620 // Defer non-nestable work to the main task runner. NOTE these tasks can be
621 // arbitrarily delayed so the additional delay should not be a problem.
622 main_task_runner_->PostNonNestableTask(pending_task.posted_from,
623 pending_task.task);
624 } else {
625 // Suppress "will" task observer notifications for the first and "did"
626 // notifications for the last task in the batch to avoid duplicate
627 // notifications.
628 if (has_previous_task) {
629 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
630 DidProcessTask(*previous_task));
631 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
632 WillProcessTask(pending_task));
633 }
634 task_annotator_.RunTask("TaskQueueManager::PostTask",
635 "TaskQueueManager::RunTask", pending_task);
636 pending_task.task.Reset();
637 *previous_task = pending_task;
638 }
639 }
640
641 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
642 return main_task_runner_->RunsTasksOnCurrentThread();
643 }
644
645 bool TaskQueueManager::PostDelayedTask(
646 const tracked_objects::Location& from_here,
647 const base::Closure& task,
648 base::TimeDelta delay) {
649 DCHECK(delay > base::TimeDelta());
650 return main_task_runner_->PostDelayedTask(from_here, task, delay);
651 }
652
653 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
654 DCHECK(main_thread_checker_.CalledOnValidThread());
655 internal::TaskQueue* queue = Queue(queue_index);
656 queue->set_name(name);
657 }
658
659 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
660 DCHECK(main_thread_checker_.CalledOnValidThread());
661 DCHECK_GE(work_batch_size, 1);
662 work_batch_size_ = work_batch_size;
663 }
664
665 void TaskQueueManager::AddTaskObserver(
666 base::MessageLoop::TaskObserver* task_observer) {
667 DCHECK(main_thread_checker_.CalledOnValidThread());
668 base::MessageLoop::current()->AddTaskObserver(task_observer);
669 task_observers_.AddObserver(task_observer);
670 }
671
672 void TaskQueueManager::RemoveTaskObserver(
673 base::MessageLoop::TaskObserver* task_observer) {
674 DCHECK(main_thread_checker_.CalledOnValidThread());
675 base::MessageLoop::current()->RemoveTaskObserver(task_observer);
676 task_observers_.RemoveObserver(task_observer);
677 }
678
679 void TaskQueueManager::SetTimeSourceForTesting(
680 scoped_refptr<cc::TestNowSource> time_source) {
681 DCHECK(main_thread_checker_.CalledOnValidThread());
682 time_source_ = time_source;
683 }
684
685 base::TimeTicks TaskQueueManager::Now() const {
686 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
687 }
688
689 scoped_refptr<base::trace_event::ConvertableToTraceFormat>
690 TaskQueueManager::AsValueWithSelectorResult(bool should_run,
691 size_t selected_queue) const {
692 DCHECK(main_thread_checker_.CalledOnValidThread());
693 scoped_refptr<base::trace_event::TracedValue> state =
694 new base::trace_event::TracedValue();
695 state->BeginArray("queues");
696 for (auto& queue : queues_)
697 queue->AsValueInto(state.get());
698 state->EndArray();
699 state->BeginDictionary("selector");
700 selector_->AsValueInto(state.get());
701 state->EndDictionary();
702 if (should_run)
703 state->SetInteger("selected_queue", selected_queue);
704 return state;
705 }
706
707 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698