OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "components/scheduler/base/task_queue_manager.h" | |
6 | |
7 #include <queue> | |
8 #include <set> | |
9 | |
10 #include "base/bind.h" | |
11 #include "base/metrics/histogram_macros.h" | |
12 #include "base/trace_event/trace_event.h" | |
13 #include "components/scheduler/base/real_time_domain.h" | |
14 #include "components/scheduler/base/task_queue_impl.h" | |
15 #include "components/scheduler/base/task_queue_manager_delegate.h" | |
16 #include "components/scheduler/base/task_queue_selector.h" | |
17 #include "components/scheduler/base/task_time_tracker.h" | |
18 #include "components/scheduler/base/work_queue.h" | |
19 #include "components/scheduler/base/work_queue_sets.h" | |
20 | |
21 namespace scheduler { | |
22 | |
23 namespace { | |
24 const size_t kRecordRecordTaskDelayHistogramsEveryNTasks = 10; | |
25 | |
26 void RecordDelayedTaskLateness(base::TimeDelta lateness) { | |
27 UMA_HISTOGRAM_TIMES("RendererScheduler.TaskQueueManager.DelayedTaskLateness", | |
28 lateness); | |
29 } | |
30 | |
31 void RecordImmediateTaskQueueingDuration(tracked_objects::Duration duration) { | |
32 UMA_HISTOGRAM_TIMES( | |
33 "RendererScheduler.TaskQueueManager.ImmediateTaskQueueingDuration", | |
34 base::TimeDelta::FromMilliseconds(duration.InMilliseconds())); | |
35 } | |
36 } | |
37 | |
38 TaskQueueManager::TaskQueueManager( | |
39 scoped_refptr<TaskQueueManagerDelegate> delegate, | |
40 const char* tracing_category, | |
41 const char* disabled_by_default_tracing_category, | |
42 const char* disabled_by_default_verbose_tracing_category) | |
43 : real_time_domain_(new RealTimeDomain(tracing_category)), | |
44 delegate_(delegate), | |
45 task_was_run_on_quiescence_monitored_queue_(false), | |
46 work_batch_size_(1), | |
47 task_count_(0), | |
48 task_time_tracker_(nullptr), | |
49 tracing_category_(tracing_category), | |
50 disabled_by_default_tracing_category_( | |
51 disabled_by_default_tracing_category), | |
52 disabled_by_default_verbose_tracing_category_( | |
53 disabled_by_default_verbose_tracing_category), | |
54 currently_executing_task_queue_(nullptr), | |
55 observer_(nullptr), | |
56 deletion_sentinel_(new DeletionSentinel()), | |
57 weak_factory_(this) { | |
58 DCHECK(delegate->RunsTasksOnCurrentThread()); | |
59 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category, | |
60 "TaskQueueManager", this); | |
61 selector_.SetTaskQueueSelectorObserver(this); | |
62 | |
63 from_main_thread_immediate_do_work_closure_ = | |
64 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), | |
65 base::TimeTicks(), true); | |
66 from_other_thread_immediate_do_work_closure_ = | |
67 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), | |
68 base::TimeTicks(), false); | |
69 | |
70 // TODO(alexclarke): Change this to be a parameter that's passed in. | |
71 RegisterTimeDomain(real_time_domain_.get()); | |
72 } | |
73 | |
74 TaskQueueManager::~TaskQueueManager() { | |
75 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_, | |
76 "TaskQueueManager", this); | |
77 | |
78 while (!queues_.empty()) | |
79 (*queues_.begin())->UnregisterTaskQueue(); | |
80 | |
81 selector_.SetTaskQueueSelectorObserver(nullptr); | |
82 } | |
83 | |
84 void TaskQueueManager::RegisterTimeDomain(TimeDomain* time_domain) { | |
85 time_domains_.insert(time_domain); | |
86 time_domain->OnRegisterWithTaskQueueManager(this); | |
87 } | |
88 | |
89 void TaskQueueManager::UnregisterTimeDomain(TimeDomain* time_domain) { | |
90 time_domains_.erase(time_domain); | |
91 } | |
92 | |
93 scoped_refptr<internal::TaskQueueImpl> TaskQueueManager::NewTaskQueue( | |
94 const TaskQueue::Spec& spec) { | |
95 TRACE_EVENT1(tracing_category_, | |
96 "TaskQueueManager::NewTaskQueue", "queue_name", spec.name); | |
97 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
98 TimeDomain* time_domain = | |
99 spec.time_domain ? spec.time_domain : real_time_domain_.get(); | |
100 DCHECK(time_domains_.find(time_domain) != time_domains_.end()); | |
101 scoped_refptr<internal::TaskQueueImpl> queue( | |
102 make_scoped_refptr(new internal::TaskQueueImpl( | |
103 this, time_domain, spec, disabled_by_default_tracing_category_, | |
104 disabled_by_default_verbose_tracing_category_))); | |
105 queues_.insert(queue); | |
106 selector_.AddQueue(queue.get()); | |
107 return queue; | |
108 } | |
109 | |
110 void TaskQueueManager::SetObserver(Observer* observer) { | |
111 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
112 observer_ = observer; | |
113 } | |
114 | |
115 void TaskQueueManager::UnregisterTaskQueue( | |
116 scoped_refptr<internal::TaskQueueImpl> task_queue) { | |
117 TRACE_EVENT1(tracing_category_, | |
118 "TaskQueueManager::UnregisterTaskQueue", "queue_name", | |
119 task_queue->GetName()); | |
120 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
121 if (observer_) | |
122 observer_->OnUnregisterTaskQueue(task_queue); | |
123 | |
124 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being | |
125 // freed while any of our structures hold hold a raw pointer to it. | |
126 queues_to_delete_.insert(task_queue); | |
127 queues_.erase(task_queue); | |
128 selector_.RemoveQueue(task_queue.get()); | |
129 } | |
130 | |
131 void TaskQueueManager::UpdateWorkQueues( | |
132 bool should_trigger_wakeup, | |
133 const internal::TaskQueueImpl::Task* previous_task, | |
134 LazyNow lazy_now) { | |
135 TRACE_EVENT0(disabled_by_default_tracing_category_, | |
136 "TaskQueueManager::UpdateWorkQueues"); | |
137 | |
138 for (TimeDomain* time_domain : time_domains_) { | |
139 LazyNow lazy_now_in_domain = time_domain == real_time_domain_.get() | |
140 ? lazy_now | |
141 : time_domain->CreateLazyNow(); | |
142 time_domain->UpdateWorkQueues(should_trigger_wakeup, previous_task, | |
143 lazy_now_in_domain); | |
144 } | |
145 } | |
146 | |
147 void TaskQueueManager::MaybeScheduleImmediateWork( | |
148 const tracked_objects::Location& from_here) { | |
149 bool on_main_thread = delegate_->BelongsToCurrentThread(); | |
150 // De-duplicate DoWork posts. | |
151 if (on_main_thread) { | |
152 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { | |
153 return; | |
154 } | |
155 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); | |
156 } else { | |
157 { | |
158 base::AutoLock lock(other_thread_lock_); | |
159 if (!other_thread_pending_wakeups_.insert(base::TimeTicks()).second) | |
160 return; | |
161 } | |
162 delegate_->PostTask(from_here, | |
163 from_other_thread_immediate_do_work_closure_); | |
164 } | |
165 } | |
166 | |
167 void TaskQueueManager::MaybeScheduleDelayedWork( | |
168 const tracked_objects::Location& from_here, | |
169 base::TimeTicks now, | |
170 base::TimeDelta delay) { | |
171 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
172 DCHECK_GE(delay, base::TimeDelta()); | |
173 base::TimeTicks run_time = now + delay; | |
174 // De-duplicate DoWork posts. | |
175 if (!main_thread_pending_wakeups_.insert(run_time).second) | |
176 return; | |
177 delegate_->PostDelayedTask( | |
178 from_here, base::Bind(&TaskQueueManager::DoWork, | |
179 weak_factory_.GetWeakPtr(), run_time, true), | |
180 delay); | |
181 } | |
182 | |
183 void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) { | |
184 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
185 TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork", | |
186 "from_main_thread", from_main_thread); | |
187 if (from_main_thread) { | |
188 main_thread_pending_wakeups_.erase(run_time); | |
189 } else { | |
190 base::AutoLock lock(other_thread_lock_); | |
191 other_thread_pending_wakeups_.erase(run_time); | |
192 } | |
193 | |
194 if (!delegate_->IsNested()) | |
195 queues_to_delete_.clear(); | |
196 | |
197 LazyNow lazy_now(real_time_domain()->CreateLazyNow()); | |
198 base::TimeTicks task_start_time; | |
199 | |
200 if (!delegate_->IsNested() && task_time_tracker_) | |
201 task_start_time = lazy_now.Now(); | |
202 | |
203 // Pass false and nullptr to UpdateWorkQueues here to prevent waking up a | |
204 // pump-after-wakeup queue. | |
205 UpdateWorkQueues(false, nullptr, lazy_now); | |
206 | |
207 internal::TaskQueueImpl::Task previous_task; | |
208 | |
209 for (int i = 0; i < work_batch_size_; i++) { | |
210 internal::WorkQueue* work_queue; | |
211 if (!SelectWorkQueueToService(&work_queue)) { | |
212 break; | |
213 } | |
214 | |
215 bool should_trigger_wakeup = work_queue->task_queue()->wakeup_policy() == | |
216 TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES; | |
217 | |
218 switch (ProcessTaskFromWorkQueue(work_queue, &previous_task)) { | |
219 case ProcessTaskResult::DEFERRED: | |
220 // If a task was deferred, try again with another task. Note that this | |
221 // means deferred tasks (i.e. non-nestable tasks) will never trigger | |
222 // queue wake-ups. | |
223 continue; | |
224 case ProcessTaskResult::EXECUTED: | |
225 break; | |
226 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED: | |
227 return; // The TaskQueueManager got deleted, we must bail out. | |
228 } | |
229 | |
230 lazy_now = real_time_domain()->CreateLazyNow(); | |
231 if (!delegate_->IsNested() && task_time_tracker_) { | |
232 // Only report top level task durations. | |
233 base::TimeTicks task_end_time = lazy_now.Now(); | |
234 task_time_tracker_->ReportTaskTime(task_start_time, task_end_time); | |
235 task_start_time = task_end_time; | |
236 } | |
237 | |
238 work_queue = nullptr; // The queue may have been unregistered. | |
239 | |
240 UpdateWorkQueues(should_trigger_wakeup, &previous_task, lazy_now); | |
241 | |
242 // Only run a single task per batch in nested run loops so that we can | |
243 // properly exit the nested loop when someone calls RunLoop::Quit(). | |
244 if (delegate_->IsNested()) | |
245 break; | |
246 } | |
247 | |
248 // TODO(alexclarke): Consider refactoring the above loop to terminate only | |
249 // when there's no more work left to be done, rather than posting a | |
250 // continuation task. | |
251 if (!selector_.EnabledWorkQueuesEmpty() || TryAdvanceTimeDomains()) | |
252 MaybeScheduleImmediateWork(FROM_HERE); | |
253 } | |
254 | |
255 bool TaskQueueManager::TryAdvanceTimeDomains() { | |
256 bool can_advance = false; | |
257 for (TimeDomain* time_domain : time_domains_) { | |
258 can_advance |= time_domain->MaybeAdvanceTime(); | |
259 } | |
260 return can_advance; | |
261 } | |
262 | |
263 bool TaskQueueManager::SelectWorkQueueToService( | |
264 internal::WorkQueue** out_work_queue) { | |
265 bool should_run = selector_.SelectWorkQueueToService(out_work_queue); | |
266 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( | |
267 disabled_by_default_tracing_category_, "TaskQueueManager", this, | |
268 AsValueWithSelectorResult(should_run, *out_work_queue)); | |
269 return should_run; | |
270 } | |
271 | |
272 void TaskQueueManager::DidQueueTask( | |
273 const internal::TaskQueueImpl::Task& pending_task) { | |
274 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task); | |
275 } | |
276 | |
277 TaskQueueManager::ProcessTaskResult TaskQueueManager::ProcessTaskFromWorkQueue( | |
278 internal::WorkQueue* work_queue, | |
279 internal::TaskQueueImpl::Task* out_previous_task) { | |
280 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
281 scoped_refptr<DeletionSentinel> protect(deletion_sentinel_); | |
282 internal::TaskQueueImpl* queue = work_queue->task_queue(); | |
283 | |
284 if (queue->GetQuiescenceMonitored()) | |
285 task_was_run_on_quiescence_monitored_queue_ = true; | |
286 | |
287 internal::TaskQueueImpl::Task pending_task = | |
288 work_queue->TakeTaskFromWorkQueue(); | |
289 if (!pending_task.nestable && delegate_->IsNested()) { | |
290 // Defer non-nestable work to the main task runner. NOTE these tasks can be | |
291 // arbitrarily delayed so the additional delay should not be a problem. | |
292 // TODO(skyostil): Figure out a way to not forget which task queue the | |
293 // task is associated with. See http://crbug.com/522843. | |
294 delegate_->PostNonNestableTask(pending_task.posted_from, pending_task.task); | |
295 return ProcessTaskResult::DEFERRED; | |
296 } | |
297 | |
298 MaybeRecordTaskDelayHistograms(pending_task, queue); | |
299 | |
300 TRACE_TASK_EXECUTION("TaskQueueManager::ProcessTaskFromWorkQueue", | |
301 pending_task); | |
302 if (queue->GetShouldNotifyObservers()) { | |
303 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
304 WillProcessTask(pending_task)); | |
305 queue->NotifyWillProcessTask(pending_task); | |
306 } | |
307 TRACE_EVENT1(tracing_category_, | |
308 "TaskQueueManager::RunTask", "queue", queue->GetName()); | |
309 // NOTE when TaskQueues get unregistered a reference ends up getting retained | |
310 // by |queues_to_delete_| which is cleared at the top of |DoWork|. This means | |
311 // we are OK to use raw pointers here. | |
312 internal::TaskQueueImpl* prev_executing_task_queue = | |
313 currently_executing_task_queue_; | |
314 currently_executing_task_queue_ = queue; | |
315 task_annotator_.RunTask("TaskQueueManager::PostTask", pending_task); | |
316 | |
317 // Detect if the TaskQueueManager just got deleted. If this happens we must | |
318 // not access any member variables after this point. | |
319 if (protect->HasOneRef()) | |
320 return ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED; | |
321 | |
322 currently_executing_task_queue_ = prev_executing_task_queue; | |
323 | |
324 if (queue->GetShouldNotifyObservers()) { | |
325 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
326 DidProcessTask(pending_task)); | |
327 queue->NotifyDidProcessTask(pending_task); | |
328 } | |
329 | |
330 pending_task.task.Reset(); | |
331 *out_previous_task = std::move(pending_task); | |
332 return ProcessTaskResult::EXECUTED; | |
333 } | |
334 | |
335 void TaskQueueManager::MaybeRecordTaskDelayHistograms( | |
336 const internal::TaskQueueImpl::Task& pending_task, | |
337 const internal::TaskQueueImpl* queue) { | |
338 if ((task_count_++ % kRecordRecordTaskDelayHistogramsEveryNTasks) != 0) | |
339 return; | |
340 | |
341 // Record delayed task lateness and immediate task queueing durations, but | |
342 // only for auto-pumped queues. Manually pumped and after wakeup queues can | |
343 // have arbitarially large delayes, which would cloud any analysis. | |
344 if (queue->GetPumpPolicy() == TaskQueue::PumpPolicy::AUTO) { | |
345 if (!pending_task.delayed_run_time.is_null()) { | |
346 RecordDelayedTaskLateness(delegate_->NowTicks() - | |
347 pending_task.delayed_run_time); | |
348 } else if (!pending_task.time_posted.is_null()) { | |
349 RecordImmediateTaskQueueingDuration(tracked_objects::TrackedTime::Now() - | |
350 pending_task.time_posted); | |
351 } | |
352 } | |
353 } | |
354 | |
355 bool TaskQueueManager::RunsTasksOnCurrentThread() const { | |
356 return delegate_->RunsTasksOnCurrentThread(); | |
357 } | |
358 | |
359 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { | |
360 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
361 DCHECK_GE(work_batch_size, 1); | |
362 work_batch_size_ = work_batch_size; | |
363 } | |
364 | |
365 void TaskQueueManager::AddTaskObserver( | |
366 base::MessageLoop::TaskObserver* task_observer) { | |
367 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
368 task_observers_.AddObserver(task_observer); | |
369 } | |
370 | |
371 void TaskQueueManager::RemoveTaskObserver( | |
372 base::MessageLoop::TaskObserver* task_observer) { | |
373 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
374 task_observers_.RemoveObserver(task_observer); | |
375 } | |
376 | |
377 bool TaskQueueManager::GetAndClearSystemIsQuiescentBit() { | |
378 bool task_was_run = task_was_run_on_quiescence_monitored_queue_; | |
379 task_was_run_on_quiescence_monitored_queue_ = false; | |
380 return !task_was_run; | |
381 } | |
382 | |
383 const scoped_refptr<TaskQueueManagerDelegate>& TaskQueueManager::delegate() | |
384 const { | |
385 return delegate_; | |
386 } | |
387 | |
388 internal::EnqueueOrder TaskQueueManager::GetNextSequenceNumber() { | |
389 return enqueue_order_generator_.GenerateNext(); | |
390 } | |
391 | |
392 LazyNow TaskQueueManager::CreateLazyNow() const { | |
393 return LazyNow(delegate_.get()); | |
394 } | |
395 | |
396 std::unique_ptr<base::trace_event::ConvertableToTraceFormat> | |
397 TaskQueueManager::AsValueWithSelectorResult( | |
398 bool should_run, | |
399 internal::WorkQueue* selected_work_queue) const { | |
400 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
401 std::unique_ptr<base::trace_event::TracedValue> state( | |
402 new base::trace_event::TracedValue()); | |
403 state->BeginArray("queues"); | |
404 for (auto& queue : queues_) | |
405 queue->AsValueInto(state.get()); | |
406 state->EndArray(); | |
407 state->BeginDictionary("selector"); | |
408 selector_.AsValueInto(state.get()); | |
409 state->EndDictionary(); | |
410 if (should_run) { | |
411 state->SetString("selected_queue", | |
412 selected_work_queue->task_queue()->GetName()); | |
413 state->SetString("work_queue_name", selected_work_queue->name()); | |
414 } | |
415 | |
416 state->BeginArray("time_domains"); | |
417 for (auto* time_domain : time_domains_) | |
418 time_domain->AsValueInto(state.get()); | |
419 state->EndArray(); | |
420 return std::move(state); | |
421 } | |
422 | |
423 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) { | |
424 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
425 // Only schedule DoWork if there's something to do. | |
426 if (!queue->immediate_work_queue()->Empty() || | |
427 !queue->delayed_work_queue()->Empty()) { | |
428 MaybeScheduleImmediateWork(FROM_HERE); | |
429 } | |
430 } | |
431 | |
432 void TaskQueueManager::OnTriedToSelectBlockedWorkQueue( | |
433 internal::WorkQueue* work_queue) { | |
434 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
435 DCHECK(!work_queue->Empty()); | |
436 if (observer_) { | |
437 observer_->OnTriedToExecuteBlockedTask(*work_queue->task_queue(), | |
438 *work_queue->GetFrontTask()); | |
439 } | |
440 } | |
441 | |
442 } // namespace scheduler | |
OLD | NEW |