OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "components/scheduler/child/task_queue_impl.h" | |
6 | |
7 #include "components/scheduler/child/task_queue_manager.h" | |
8 | |
9 namespace scheduler { | |
10 namespace internal { | |
11 | |
12 TaskQueueImpl::TaskQueueImpl( | |
13 TaskQueueManager* task_queue_manager, | |
14 const Spec& spec, | |
15 const char* disabled_by_default_tracing_category, | |
16 const char* disabled_by_default_verbose_tracing_category) | |
17 : thread_id_(base::PlatformThread::CurrentId()), | |
18 task_queue_manager_(task_queue_manager), | |
19 pump_policy_(spec.pump_policy), | |
20 name_(spec.name), | |
21 disabled_by_default_tracing_category_( | |
22 disabled_by_default_tracing_category), | |
23 disabled_by_default_verbose_tracing_category_( | |
24 disabled_by_default_verbose_tracing_category), | |
25 wakeup_policy_(spec.wakeup_policy), | |
26 set_index_(0), | |
27 should_monitor_quiescence_(spec.should_monitor_quiescence), | |
28 should_notify_observers_(spec.should_notify_observers) {} | |
29 | |
30 TaskQueueImpl::~TaskQueueImpl() {} | |
31 | |
32 TaskQueueImpl::Task::Task() | |
33 : PendingTask(tracked_objects::Location(), | |
34 base::Closure(), | |
35 base::TimeTicks(), | |
36 true), | |
37 #ifndef NDEBUG | |
38 enqueue_order_set_(false), | |
39 #endif | |
40 enqueue_order_(0) { | |
41 sequence_num = 0; | |
42 } | |
43 | |
44 TaskQueueImpl::Task::Task(const tracked_objects::Location& posted_from, | |
45 const base::Closure& task, | |
46 int sequence_number, | |
47 bool nestable) | |
48 : PendingTask(posted_from, task, base::TimeTicks(), nestable), | |
49 #ifndef NDEBUG | |
50 enqueue_order_set_(false), | |
51 #endif | |
52 enqueue_order_(0) { | |
53 sequence_num = sequence_number; | |
54 } | |
55 | |
56 void TaskQueueImpl::UnregisterTaskQueue() { | |
57 if (!task_queue_manager_) | |
58 return; | |
59 task_queue_manager_->UnregisterTaskQueue(this); | |
60 | |
61 { | |
62 base::AutoLock lock(lock_); | |
63 task_queue_manager_ = nullptr; | |
64 delayed_task_queue_ = std::priority_queue<Task>(); | |
65 incoming_queue_ = std::queue<Task>(); | |
66 work_queue_ = std::queue<Task>(); | |
67 } | |
68 } | |
69 | |
70 bool TaskQueueImpl::RunsTasksOnCurrentThread() const { | |
71 base::AutoLock lock(lock_); | |
72 return base::PlatformThread::CurrentId() == thread_id_; | |
73 } | |
74 | |
75 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here, | |
76 const base::Closure& task, | |
77 base::TimeDelta delay) { | |
78 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL); | |
79 } | |
80 | |
81 bool TaskQueueImpl::PostNonNestableDelayedTask( | |
82 const tracked_objects::Location& from_here, | |
83 const base::Closure& task, | |
84 base::TimeDelta delay) { | |
85 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE); | |
86 } | |
87 | |
88 bool TaskQueueImpl::PostDelayedTaskAt( | |
89 const tracked_objects::Location& from_here, | |
90 const base::Closure& task, | |
91 base::TimeTicks desired_run_time) { | |
92 base::AutoLock lock(lock_); | |
93 if (!task_queue_manager_) | |
94 return false; | |
95 LazyNow lazy_now(task_queue_manager_); | |
96 return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time, | |
97 TaskType::NORMAL); | |
98 } | |
99 | |
100 bool TaskQueueImpl::PostDelayedTaskImpl( | |
101 const tracked_objects::Location& from_here, | |
102 const base::Closure& task, | |
103 base::TimeDelta delay, | |
104 TaskType task_type) { | |
105 base::AutoLock lock(lock_); | |
106 if (!task_queue_manager_) | |
107 return false; | |
108 LazyNow lazy_now(task_queue_manager_); | |
109 base::TimeTicks desired_run_time; | |
110 if (delay > base::TimeDelta()) | |
111 desired_run_time = lazy_now.Now() + delay; | |
112 return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time, | |
113 task_type); | |
114 } | |
115 | |
116 bool TaskQueueImpl::PostDelayedTaskLocked( | |
117 LazyNow* lazy_now, | |
118 const tracked_objects::Location& from_here, | |
119 const base::Closure& task, | |
120 base::TimeTicks desired_run_time, | |
121 TaskType task_type) { | |
122 lock_.AssertAcquired(); | |
123 DCHECK(task_queue_manager_); | |
124 Task pending_task(from_here, task, | |
125 task_queue_manager_->GetNextSequenceNumber(), | |
126 task_type != TaskType::NON_NESTABLE); | |
127 task_queue_manager_->DidQueueTask(pending_task); | |
128 | |
129 if (!desired_run_time.is_null()) { | |
130 pending_task.delayed_run_time = std::max(lazy_now->Now(), desired_run_time); | |
131 // TODO(alexclarke): consider emplace() when C++11 library features allowed. | |
132 delayed_task_queue_.push(pending_task); | |
133 TraceQueueSize(true); | |
134 // Schedule a later call to MoveReadyDelayedTasksToIncomingQueue. | |
135 task_queue_manager_->ScheduleDelayedWork(this, desired_run_time, lazy_now); | |
136 return true; | |
137 } | |
138 pending_task.set_enqueue_order(pending_task.sequence_num); | |
139 EnqueueTaskLocked(pending_task); | |
140 return true; | |
141 } | |
142 | |
143 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue(LazyNow* lazy_now) { | |
144 base::AutoLock lock(lock_); | |
145 if (!task_queue_manager_) | |
146 return; | |
147 | |
148 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now); | |
149 } | |
150 | |
151 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked( | |
152 LazyNow* lazy_now) { | |
153 lock_.AssertAcquired(); | |
154 // Enqueue all delayed tasks that should be running now. | |
155 while (!delayed_task_queue_.empty() && | |
156 delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) { | |
157 // TODO(alexclarke): consider std::move() when allowed. | |
158 EnqueueDelayedTaskLocked(delayed_task_queue_.top()); | |
159 delayed_task_queue_.pop(); | |
160 } | |
161 TraceQueueSize(true); | |
162 } | |
163 | |
164 bool TaskQueueImpl::IsQueueEnabled() const { | |
165 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
166 if (!task_queue_manager_) | |
167 return false; | |
168 | |
169 return task_queue_manager_->selector_.IsQueueEnabled(this); | |
170 } | |
171 | |
172 TaskQueue::QueueState TaskQueueImpl::GetQueueState() const { | |
173 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
174 if (!work_queue_.empty()) | |
175 return QueueState::HAS_WORK; | |
176 | |
177 { | |
178 base::AutoLock lock(lock_); | |
179 if (incoming_queue_.empty()) { | |
180 return QueueState::EMPTY; | |
181 } else { | |
182 return QueueState::NEEDS_PUMPING; | |
183 } | |
184 } | |
185 } | |
186 | |
187 bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const Task* task) { | |
188 lock_.AssertAcquired(); | |
189 // A null task is passed when UpdateQueue is called before any task is run. | |
190 // In this case we don't want to pump an after_wakeup queue, so return true | |
191 // here. | |
192 if (!task) | |
193 return true; | |
194 | |
195 // Return false if there are no task in the incoming queue. | |
196 if (incoming_queue_.empty()) | |
197 return false; | |
198 | |
199 const TaskQueueImpl::Task& oldest_queued_task = incoming_queue_.front(); | |
200 return task->enqueue_order() < oldest_queued_task.enqueue_order(); | |
201 } | |
202 | |
203 bool TaskQueueImpl::ShouldAutoPumpQueueLocked(bool should_trigger_wakeup, | |
204 const Task* previous_task) { | |
205 lock_.AssertAcquired(); | |
206 if (pump_policy_ == PumpPolicy::MANUAL) | |
207 return false; | |
208 if (pump_policy_ == PumpPolicy::AFTER_WAKEUP && | |
209 (!should_trigger_wakeup || TaskIsOlderThanQueuedTasks(previous_task))) | |
210 return false; | |
211 if (incoming_queue_.empty()) | |
212 return false; | |
213 return true; | |
214 } | |
215 | |
216 bool TaskQueueImpl::NextPendingDelayedTaskRunTime( | |
217 base::TimeTicks* next_pending_delayed_task) { | |
218 base::AutoLock lock(lock_); | |
219 if (delayed_task_queue_.empty()) | |
220 return false; | |
221 *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time; | |
222 return true; | |
223 } | |
224 | |
225 void TaskQueueImpl::UpdateWorkQueue(LazyNow* lazy_now, | |
226 bool should_trigger_wakeup, | |
227 const Task* previous_task) { | |
228 DCHECK(work_queue_.empty()); | |
229 base::AutoLock lock(lock_); | |
230 if (!ShouldAutoPumpQueueLocked(should_trigger_wakeup, previous_task)) | |
231 return; | |
232 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now); | |
233 std::swap(work_queue_, incoming_queue_); | |
234 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no | |
235 // longer needs to consider this queue for reloading. | |
236 task_queue_manager_->UnregisterAsUpdatableTaskQueue(this); | |
237 if (!work_queue_.empty()) { | |
238 DCHECK(task_queue_manager_); | |
239 task_queue_manager_->selector_.GetTaskQueueSets()->OnPushQueue(this); | |
240 TraceQueueSize(true); | |
241 } | |
242 } | |
243 | |
244 TaskQueueImpl::Task TaskQueueImpl::TakeTaskFromWorkQueue() { | |
245 // TODO(alexclarke): consider std::move() when allowed. | |
246 Task pending_task = work_queue_.front(); | |
247 work_queue_.pop(); | |
248 DCHECK(task_queue_manager_); | |
249 task_queue_manager_->selector_.GetTaskQueueSets()->OnPopQueue(this); | |
250 TraceQueueSize(false); | |
251 return pending_task; | |
252 } | |
253 | |
254 void TaskQueueImpl::TraceQueueSize(bool is_locked) const { | |
255 bool is_tracing; | |
256 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_, | |
257 &is_tracing); | |
258 if (!is_tracing) | |
259 return; | |
260 if (!is_locked) | |
261 lock_.Acquire(); | |
262 else | |
263 lock_.AssertAcquired(); | |
264 TRACE_COUNTER1( | |
265 disabled_by_default_tracing_category_, GetName(), | |
266 incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size()); | |
267 if (!is_locked) | |
268 lock_.Release(); | |
269 } | |
270 | |
271 void TaskQueueImpl::EnqueueTaskLocked(const Task& pending_task) { | |
272 lock_.AssertAcquired(); | |
273 if (!task_queue_manager_) | |
274 return; | |
275 if (incoming_queue_.empty()) | |
276 task_queue_manager_->RegisterAsUpdatableTaskQueue(this); | |
277 if (pump_policy_ == PumpPolicy::AUTO && incoming_queue_.empty()) { | |
278 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | |
279 } | |
280 // TODO(alexclarke): consider std::move() when allowed. | |
281 incoming_queue_.push(pending_task); | |
282 TraceQueueSize(true); | |
283 } | |
284 | |
285 void TaskQueueImpl::EnqueueDelayedTaskLocked(const Task& pending_task) { | |
286 lock_.AssertAcquired(); | |
287 if (!task_queue_manager_) | |
288 return; | |
289 if (incoming_queue_.empty()) | |
290 task_queue_manager_->RegisterAsUpdatableTaskQueue(this); | |
291 // TODO(alexclarke): consider std::move() when allowed. | |
292 incoming_queue_.push(pending_task); | |
293 incoming_queue_.back().set_enqueue_order( | |
294 task_queue_manager_->GetNextSequenceNumber()); | |
295 TraceQueueSize(true); | |
296 } | |
297 | |
298 void TaskQueueImpl::SetPumpPolicy(PumpPolicy pump_policy) { | |
299 base::AutoLock lock(lock_); | |
300 if (pump_policy == PumpPolicy::AUTO && pump_policy_ != PumpPolicy::AUTO) { | |
301 PumpQueueLocked(); | |
302 } | |
303 pump_policy_ = pump_policy; | |
304 } | |
305 | |
306 void TaskQueueImpl::PumpQueueLocked() { | |
307 lock_.AssertAcquired(); | |
308 if (!task_queue_manager_) | |
309 return; | |
310 | |
311 LazyNow lazy_now(task_queue_manager_); | |
312 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); | |
313 | |
314 bool was_empty = work_queue_.empty(); | |
315 while (!incoming_queue_.empty()) { | |
316 // TODO(alexclarke): consider std::move() when allowed. | |
317 work_queue_.push(incoming_queue_.front()); | |
318 incoming_queue_.pop(); | |
319 } | |
320 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no longer | |
321 // needs to consider this queue for reloading. | |
322 task_queue_manager_->UnregisterAsUpdatableTaskQueue(this); | |
323 if (!work_queue_.empty()) { | |
324 if (was_empty) | |
325 task_queue_manager_->selector_.GetTaskQueueSets()->OnPushQueue(this); | |
326 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | |
327 } | |
328 } | |
329 | |
330 void TaskQueueImpl::PumpQueue() { | |
331 base::AutoLock lock(lock_); | |
332 PumpQueueLocked(); | |
333 } | |
334 | |
335 const char* TaskQueueImpl::GetName() const { | |
336 return name_; | |
337 } | |
338 | |
339 bool TaskQueueImpl::GetWorkQueueFrontTaskEnqueueOrder( | |
340 int* enqueue_order) const { | |
341 if (work_queue_.empty()) | |
342 return false; | |
343 *enqueue_order = work_queue_.front().enqueue_order(); | |
344 return true; | |
345 } | |
346 | |
347 void TaskQueueImpl::PushTaskOntoWorkQueueForTest(const Task& task) { | |
348 work_queue_.push(task); | |
349 } | |
350 | |
351 void TaskQueueImpl::PopTaskFromWorkQueueForTest() { | |
352 work_queue_.pop(); | |
353 } | |
354 | |
355 void TaskQueueImpl::SetQueuePriority(QueuePriority priority) { | |
356 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
357 if (!task_queue_manager_) | |
358 return; | |
359 | |
360 task_queue_manager_->selector_.SetQueuePriority(this, priority); | |
361 } | |
362 | |
363 // static | |
364 const char* TaskQueueImpl::PumpPolicyToString( | |
365 TaskQueue::PumpPolicy pump_policy) { | |
366 switch (pump_policy) { | |
367 case TaskQueue::PumpPolicy::AUTO: | |
368 return "auto"; | |
369 case TaskQueue::PumpPolicy::AFTER_WAKEUP: | |
370 return "after_wakeup"; | |
371 case TaskQueue::PumpPolicy::MANUAL: | |
372 return "manual"; | |
373 default: | |
374 NOTREACHED(); | |
375 return nullptr; | |
376 } | |
377 } | |
378 | |
379 // static | |
380 const char* TaskQueueImpl::WakeupPolicyToString( | |
381 TaskQueue::WakeupPolicy wakeup_policy) { | |
382 switch (wakeup_policy) { | |
383 case TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES: | |
384 return "can_wake_other_queues"; | |
385 case TaskQueue::WakeupPolicy::DONT_WAKE_OTHER_QUEUES: | |
386 return "dont_wake_other_queues"; | |
387 default: | |
388 NOTREACHED(); | |
389 return nullptr; | |
390 } | |
391 } | |
392 | |
393 // static | |
394 const char* TaskQueueImpl::PriorityToString(QueuePriority priority) { | |
395 switch (priority) { | |
396 case CONTROL_PRIORITY: | |
397 return "control"; | |
398 case HIGH_PRIORITY: | |
399 return "high"; | |
400 case NORMAL_PRIORITY: | |
401 return "normal"; | |
402 case BEST_EFFORT_PRIORITY: | |
403 return "best_effort"; | |
404 case DISABLED_PRIORITY: | |
405 return "disabled"; | |
406 default: | |
407 NOTREACHED(); | |
408 return nullptr; | |
409 } | |
410 } | |
411 | |
412 void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const { | |
413 base::AutoLock lock(lock_); | |
414 state->BeginDictionary(); | |
415 state->SetString("name", GetName()); | |
416 state->SetString("pump_policy", PumpPolicyToString(pump_policy_)); | |
417 state->SetString("wakeup_policy", WakeupPolicyToString(wakeup_policy_)); | |
418 bool verbose_tracing_enabled = false; | |
419 TRACE_EVENT_CATEGORY_GROUP_ENABLED( | |
420 disabled_by_default_verbose_tracing_category_, &verbose_tracing_enabled); | |
421 state->SetInteger("incoming_queue_size", incoming_queue_.size()); | |
422 state->SetInteger("work_queue_size", work_queue_.size()); | |
423 state->SetInteger("delayed_task_queue_size", delayed_task_queue_.size()); | |
424 if (verbose_tracing_enabled) { | |
425 state->BeginArray("incoming_queue"); | |
426 QueueAsValueInto(incoming_queue_, state); | |
427 state->EndArray(); | |
428 state->BeginArray("work_queue"); | |
429 QueueAsValueInto(work_queue_, state); | |
430 state->EndArray(); | |
431 state->BeginArray("delayed_task_queue"); | |
432 QueueAsValueInto(delayed_task_queue_, state); | |
433 state->EndArray(); | |
434 } | |
435 state->SetString("priority", | |
436 PriorityToString(static_cast<QueuePriority>(set_index_))); | |
437 state->EndDictionary(); | |
438 } | |
439 | |
440 void TaskQueueImpl::AddTaskObserver( | |
441 base::MessageLoop::TaskObserver* task_observer) { | |
442 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
443 task_observers_.AddObserver(task_observer); | |
444 } | |
445 | |
446 void TaskQueueImpl::RemoveTaskObserver( | |
447 base::MessageLoop::TaskObserver* task_observer) { | |
448 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
449 task_observers_.RemoveObserver(task_observer); | |
450 } | |
451 | |
452 void TaskQueueImpl::NotifyWillProcessTask( | |
453 const base::PendingTask& pending_task) { | |
454 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
455 DCHECK(should_notify_observers_); | |
456 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
457 WillProcessTask(pending_task)); | |
458 } | |
459 | |
460 void TaskQueueImpl::NotifyDidProcessTask( | |
461 const base::PendingTask& pending_task) { | |
462 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
463 DCHECK(should_notify_observers_); | |
464 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
465 DidProcessTask(pending_task)); | |
466 } | |
467 | |
468 // static | |
469 void TaskQueueImpl::QueueAsValueInto(const std::queue<Task>& queue, | |
470 base::trace_event::TracedValue* state) { | |
471 std::queue<Task> queue_copy(queue); | |
472 while (!queue_copy.empty()) { | |
473 TaskAsValueInto(queue_copy.front(), state); | |
474 queue_copy.pop(); | |
475 } | |
476 } | |
477 | |
478 // static | |
479 void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue, | |
480 base::trace_event::TracedValue* state) { | |
481 std::priority_queue<Task> queue_copy(queue); | |
482 while (!queue_copy.empty()) { | |
483 TaskAsValueInto(queue_copy.top(), state); | |
484 queue_copy.pop(); | |
485 } | |
486 } | |
487 | |
488 // static | |
489 void TaskQueueImpl::TaskAsValueInto(const Task& task, | |
490 base::trace_event::TracedValue* state) { | |
491 state->BeginDictionary(); | |
492 state->SetString("posted_from", task.posted_from.ToString()); | |
493 state->SetInteger("enqueue_order", task.enqueue_order()); | |
494 state->SetInteger("sequence_num", task.sequence_num); | |
495 state->SetBoolean("nestable", task.nestable); | |
496 state->SetBoolean("is_high_res", task.is_high_res); | |
497 state->SetDouble( | |
498 "delayed_run_time", | |
499 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L); | |
500 state->EndDictionary(); | |
501 } | |
502 | |
503 } // namespace internal | |
504 } // namespace scheduler | |
OLD | NEW |