Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(302)

Side by Side Diff: components/scheduler/base/task_queue_manager.cc

Issue 2118903002: scheduler: Move the Blink scheduler into Blink (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebased Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/scheduler/base/task_queue_manager.h"
6
7 #include <queue>
8 #include <set>
9
10 #include "base/bind.h"
11 #include "base/metrics/histogram_macros.h"
12 #include "base/trace_event/trace_event.h"
13 #include "components/scheduler/base/real_time_domain.h"
14 #include "components/scheduler/base/task_queue_impl.h"
15 #include "components/scheduler/base/task_queue_manager_delegate.h"
16 #include "components/scheduler/base/task_queue_selector.h"
17 #include "components/scheduler/base/task_time_tracker.h"
18 #include "components/scheduler/base/work_queue.h"
19 #include "components/scheduler/base/work_queue_sets.h"
20
21 namespace scheduler {
22
23 namespace {
24 const size_t kRecordRecordTaskDelayHistogramsEveryNTasks = 10;
25
26 void RecordDelayedTaskLateness(base::TimeDelta lateness) {
27 UMA_HISTOGRAM_TIMES("RendererScheduler.TaskQueueManager.DelayedTaskLateness",
28 lateness);
29 }
30
31 void RecordImmediateTaskQueueingDuration(tracked_objects::Duration duration) {
32 UMA_HISTOGRAM_TIMES(
33 "RendererScheduler.TaskQueueManager.ImmediateTaskQueueingDuration",
34 base::TimeDelta::FromMilliseconds(duration.InMilliseconds()));
35 }
36 }
37
38 TaskQueueManager::TaskQueueManager(
39 scoped_refptr<TaskQueueManagerDelegate> delegate,
40 const char* tracing_category,
41 const char* disabled_by_default_tracing_category,
42 const char* disabled_by_default_verbose_tracing_category)
43 : real_time_domain_(new RealTimeDomain(tracing_category)),
44 delegate_(delegate),
45 task_was_run_on_quiescence_monitored_queue_(false),
46 work_batch_size_(1),
47 task_count_(0),
48 task_time_tracker_(nullptr),
49 tracing_category_(tracing_category),
50 disabled_by_default_tracing_category_(
51 disabled_by_default_tracing_category),
52 disabled_by_default_verbose_tracing_category_(
53 disabled_by_default_verbose_tracing_category),
54 currently_executing_task_queue_(nullptr),
55 observer_(nullptr),
56 deletion_sentinel_(new DeletionSentinel()),
57 weak_factory_(this) {
58 DCHECK(delegate->RunsTasksOnCurrentThread());
59 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category,
60 "TaskQueueManager", this);
61 selector_.SetTaskQueueSelectorObserver(this);
62
63 from_main_thread_immediate_do_work_closure_ =
64 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(),
65 base::TimeTicks(), true);
66 from_other_thread_immediate_do_work_closure_ =
67 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(),
68 base::TimeTicks(), false);
69
70 // TODO(alexclarke): Change this to be a parameter that's passed in.
71 RegisterTimeDomain(real_time_domain_.get());
72 }
73
74 TaskQueueManager::~TaskQueueManager() {
75 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_,
76 "TaskQueueManager", this);
77
78 while (!queues_.empty())
79 (*queues_.begin())->UnregisterTaskQueue();
80
81 selector_.SetTaskQueueSelectorObserver(nullptr);
82 }
83
84 void TaskQueueManager::RegisterTimeDomain(TimeDomain* time_domain) {
85 time_domains_.insert(time_domain);
86 time_domain->OnRegisterWithTaskQueueManager(this);
87 }
88
89 void TaskQueueManager::UnregisterTimeDomain(TimeDomain* time_domain) {
90 time_domains_.erase(time_domain);
91 }
92
93 scoped_refptr<internal::TaskQueueImpl> TaskQueueManager::NewTaskQueue(
94 const TaskQueue::Spec& spec) {
95 TRACE_EVENT1(tracing_category_,
96 "TaskQueueManager::NewTaskQueue", "queue_name", spec.name);
97 DCHECK(main_thread_checker_.CalledOnValidThread());
98 TimeDomain* time_domain =
99 spec.time_domain ? spec.time_domain : real_time_domain_.get();
100 DCHECK(time_domains_.find(time_domain) != time_domains_.end());
101 scoped_refptr<internal::TaskQueueImpl> queue(
102 make_scoped_refptr(new internal::TaskQueueImpl(
103 this, time_domain, spec, disabled_by_default_tracing_category_,
104 disabled_by_default_verbose_tracing_category_)));
105 queues_.insert(queue);
106 selector_.AddQueue(queue.get());
107 return queue;
108 }
109
110 void TaskQueueManager::SetObserver(Observer* observer) {
111 DCHECK(main_thread_checker_.CalledOnValidThread());
112 observer_ = observer;
113 }
114
115 void TaskQueueManager::UnregisterTaskQueue(
116 scoped_refptr<internal::TaskQueueImpl> task_queue) {
117 TRACE_EVENT1(tracing_category_,
118 "TaskQueueManager::UnregisterTaskQueue", "queue_name",
119 task_queue->GetName());
120 DCHECK(main_thread_checker_.CalledOnValidThread());
121 if (observer_)
122 observer_->OnUnregisterTaskQueue(task_queue);
123
124 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being
125 // freed while any of our structures hold hold a raw pointer to it.
126 queues_to_delete_.insert(task_queue);
127 queues_.erase(task_queue);
128 selector_.RemoveQueue(task_queue.get());
129 }
130
131 void TaskQueueManager::UpdateWorkQueues(
132 bool should_trigger_wakeup,
133 const internal::TaskQueueImpl::Task* previous_task,
134 LazyNow lazy_now) {
135 TRACE_EVENT0(disabled_by_default_tracing_category_,
136 "TaskQueueManager::UpdateWorkQueues");
137
138 for (TimeDomain* time_domain : time_domains_) {
139 LazyNow lazy_now_in_domain = time_domain == real_time_domain_.get()
140 ? lazy_now
141 : time_domain->CreateLazyNow();
142 time_domain->UpdateWorkQueues(should_trigger_wakeup, previous_task,
143 lazy_now_in_domain);
144 }
145 }
146
147 void TaskQueueManager::MaybeScheduleImmediateWork(
148 const tracked_objects::Location& from_here) {
149 bool on_main_thread = delegate_->BelongsToCurrentThread();
150 // De-duplicate DoWork posts.
151 if (on_main_thread) {
152 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) {
153 return;
154 }
155 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_);
156 } else {
157 {
158 base::AutoLock lock(other_thread_lock_);
159 if (!other_thread_pending_wakeups_.insert(base::TimeTicks()).second)
160 return;
161 }
162 delegate_->PostTask(from_here,
163 from_other_thread_immediate_do_work_closure_);
164 }
165 }
166
167 void TaskQueueManager::MaybeScheduleDelayedWork(
168 const tracked_objects::Location& from_here,
169 base::TimeTicks now,
170 base::TimeDelta delay) {
171 DCHECK(main_thread_checker_.CalledOnValidThread());
172 DCHECK_GE(delay, base::TimeDelta());
173 base::TimeTicks run_time = now + delay;
174 // De-duplicate DoWork posts.
175 if (!main_thread_pending_wakeups_.insert(run_time).second)
176 return;
177 delegate_->PostDelayedTask(
178 from_here, base::Bind(&TaskQueueManager::DoWork,
179 weak_factory_.GetWeakPtr(), run_time, true),
180 delay);
181 }
182
183 void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
184 DCHECK(main_thread_checker_.CalledOnValidThread());
185 TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork",
186 "from_main_thread", from_main_thread);
187 if (from_main_thread) {
188 main_thread_pending_wakeups_.erase(run_time);
189 } else {
190 base::AutoLock lock(other_thread_lock_);
191 other_thread_pending_wakeups_.erase(run_time);
192 }
193
194 if (!delegate_->IsNested())
195 queues_to_delete_.clear();
196
197 LazyNow lazy_now(real_time_domain()->CreateLazyNow());
198 base::TimeTicks task_start_time;
199
200 if (!delegate_->IsNested() && task_time_tracker_)
201 task_start_time = lazy_now.Now();
202
203 // Pass false and nullptr to UpdateWorkQueues here to prevent waking up a
204 // pump-after-wakeup queue.
205 UpdateWorkQueues(false, nullptr, lazy_now);
206
207 internal::TaskQueueImpl::Task previous_task;
208
209 for (int i = 0; i < work_batch_size_; i++) {
210 internal::WorkQueue* work_queue;
211 if (!SelectWorkQueueToService(&work_queue)) {
212 break;
213 }
214
215 bool should_trigger_wakeup = work_queue->task_queue()->wakeup_policy() ==
216 TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES;
217
218 switch (ProcessTaskFromWorkQueue(work_queue, &previous_task)) {
219 case ProcessTaskResult::DEFERRED:
220 // If a task was deferred, try again with another task. Note that this
221 // means deferred tasks (i.e. non-nestable tasks) will never trigger
222 // queue wake-ups.
223 continue;
224 case ProcessTaskResult::EXECUTED:
225 break;
226 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED:
227 return; // The TaskQueueManager got deleted, we must bail out.
228 }
229
230 lazy_now = real_time_domain()->CreateLazyNow();
231 if (!delegate_->IsNested() && task_time_tracker_) {
232 // Only report top level task durations.
233 base::TimeTicks task_end_time = lazy_now.Now();
234 task_time_tracker_->ReportTaskTime(task_start_time, task_end_time);
235 task_start_time = task_end_time;
236 }
237
238 work_queue = nullptr; // The queue may have been unregistered.
239
240 UpdateWorkQueues(should_trigger_wakeup, &previous_task, lazy_now);
241
242 // Only run a single task per batch in nested run loops so that we can
243 // properly exit the nested loop when someone calls RunLoop::Quit().
244 if (delegate_->IsNested())
245 break;
246 }
247
248 // TODO(alexclarke): Consider refactoring the above loop to terminate only
249 // when there's no more work left to be done, rather than posting a
250 // continuation task.
251 if (!selector_.EnabledWorkQueuesEmpty() || TryAdvanceTimeDomains())
252 MaybeScheduleImmediateWork(FROM_HERE);
253 }
254
255 bool TaskQueueManager::TryAdvanceTimeDomains() {
256 bool can_advance = false;
257 for (TimeDomain* time_domain : time_domains_) {
258 can_advance |= time_domain->MaybeAdvanceTime();
259 }
260 return can_advance;
261 }
262
263 bool TaskQueueManager::SelectWorkQueueToService(
264 internal::WorkQueue** out_work_queue) {
265 bool should_run = selector_.SelectWorkQueueToService(out_work_queue);
266 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
267 disabled_by_default_tracing_category_, "TaskQueueManager", this,
268 AsValueWithSelectorResult(should_run, *out_work_queue));
269 return should_run;
270 }
271
272 void TaskQueueManager::DidQueueTask(
273 const internal::TaskQueueImpl::Task& pending_task) {
274 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task);
275 }
276
277 TaskQueueManager::ProcessTaskResult TaskQueueManager::ProcessTaskFromWorkQueue(
278 internal::WorkQueue* work_queue,
279 internal::TaskQueueImpl::Task* out_previous_task) {
280 DCHECK(main_thread_checker_.CalledOnValidThread());
281 scoped_refptr<DeletionSentinel> protect(deletion_sentinel_);
282 internal::TaskQueueImpl* queue = work_queue->task_queue();
283
284 if (queue->GetQuiescenceMonitored())
285 task_was_run_on_quiescence_monitored_queue_ = true;
286
287 internal::TaskQueueImpl::Task pending_task =
288 work_queue->TakeTaskFromWorkQueue();
289 if (!pending_task.nestable && delegate_->IsNested()) {
290 // Defer non-nestable work to the main task runner. NOTE these tasks can be
291 // arbitrarily delayed so the additional delay should not be a problem.
292 // TODO(skyostil): Figure out a way to not forget which task queue the
293 // task is associated with. See http://crbug.com/522843.
294 delegate_->PostNonNestableTask(pending_task.posted_from, pending_task.task);
295 return ProcessTaskResult::DEFERRED;
296 }
297
298 MaybeRecordTaskDelayHistograms(pending_task, queue);
299
300 TRACE_TASK_EXECUTION("TaskQueueManager::ProcessTaskFromWorkQueue",
301 pending_task);
302 if (queue->GetShouldNotifyObservers()) {
303 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
304 WillProcessTask(pending_task));
305 queue->NotifyWillProcessTask(pending_task);
306 }
307 TRACE_EVENT1(tracing_category_,
308 "TaskQueueManager::RunTask", "queue", queue->GetName());
309 // NOTE when TaskQueues get unregistered a reference ends up getting retained
310 // by |queues_to_delete_| which is cleared at the top of |DoWork|. This means
311 // we are OK to use raw pointers here.
312 internal::TaskQueueImpl* prev_executing_task_queue =
313 currently_executing_task_queue_;
314 currently_executing_task_queue_ = queue;
315 task_annotator_.RunTask("TaskQueueManager::PostTask", pending_task);
316
317 // Detect if the TaskQueueManager just got deleted. If this happens we must
318 // not access any member variables after this point.
319 if (protect->HasOneRef())
320 return ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED;
321
322 currently_executing_task_queue_ = prev_executing_task_queue;
323
324 if (queue->GetShouldNotifyObservers()) {
325 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
326 DidProcessTask(pending_task));
327 queue->NotifyDidProcessTask(pending_task);
328 }
329
330 pending_task.task.Reset();
331 *out_previous_task = std::move(pending_task);
332 return ProcessTaskResult::EXECUTED;
333 }
334
335 void TaskQueueManager::MaybeRecordTaskDelayHistograms(
336 const internal::TaskQueueImpl::Task& pending_task,
337 const internal::TaskQueueImpl* queue) {
338 if ((task_count_++ % kRecordRecordTaskDelayHistogramsEveryNTasks) != 0)
339 return;
340
341 // Record delayed task lateness and immediate task queueing durations, but
342 // only for auto-pumped queues. Manually pumped and after wakeup queues can
343 // have arbitarially large delayes, which would cloud any analysis.
344 if (queue->GetPumpPolicy() == TaskQueue::PumpPolicy::AUTO) {
345 if (!pending_task.delayed_run_time.is_null()) {
346 RecordDelayedTaskLateness(delegate_->NowTicks() -
347 pending_task.delayed_run_time);
348 } else if (!pending_task.time_posted.is_null()) {
349 RecordImmediateTaskQueueingDuration(tracked_objects::TrackedTime::Now() -
350 pending_task.time_posted);
351 }
352 }
353 }
354
355 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
356 return delegate_->RunsTasksOnCurrentThread();
357 }
358
359 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
360 DCHECK(main_thread_checker_.CalledOnValidThread());
361 DCHECK_GE(work_batch_size, 1);
362 work_batch_size_ = work_batch_size;
363 }
364
365 void TaskQueueManager::AddTaskObserver(
366 base::MessageLoop::TaskObserver* task_observer) {
367 DCHECK(main_thread_checker_.CalledOnValidThread());
368 task_observers_.AddObserver(task_observer);
369 }
370
371 void TaskQueueManager::RemoveTaskObserver(
372 base::MessageLoop::TaskObserver* task_observer) {
373 DCHECK(main_thread_checker_.CalledOnValidThread());
374 task_observers_.RemoveObserver(task_observer);
375 }
376
377 bool TaskQueueManager::GetAndClearSystemIsQuiescentBit() {
378 bool task_was_run = task_was_run_on_quiescence_monitored_queue_;
379 task_was_run_on_quiescence_monitored_queue_ = false;
380 return !task_was_run;
381 }
382
383 const scoped_refptr<TaskQueueManagerDelegate>& TaskQueueManager::delegate()
384 const {
385 return delegate_;
386 }
387
388 internal::EnqueueOrder TaskQueueManager::GetNextSequenceNumber() {
389 return enqueue_order_generator_.GenerateNext();
390 }
391
392 LazyNow TaskQueueManager::CreateLazyNow() const {
393 return LazyNow(delegate_.get());
394 }
395
396 std::unique_ptr<base::trace_event::ConvertableToTraceFormat>
397 TaskQueueManager::AsValueWithSelectorResult(
398 bool should_run,
399 internal::WorkQueue* selected_work_queue) const {
400 DCHECK(main_thread_checker_.CalledOnValidThread());
401 std::unique_ptr<base::trace_event::TracedValue> state(
402 new base::trace_event::TracedValue());
403 state->BeginArray("queues");
404 for (auto& queue : queues_)
405 queue->AsValueInto(state.get());
406 state->EndArray();
407 state->BeginDictionary("selector");
408 selector_.AsValueInto(state.get());
409 state->EndDictionary();
410 if (should_run) {
411 state->SetString("selected_queue",
412 selected_work_queue->task_queue()->GetName());
413 state->SetString("work_queue_name", selected_work_queue->name());
414 }
415
416 state->BeginArray("time_domains");
417 for (auto* time_domain : time_domains_)
418 time_domain->AsValueInto(state.get());
419 state->EndArray();
420 return std::move(state);
421 }
422
423 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) {
424 DCHECK(main_thread_checker_.CalledOnValidThread());
425 // Only schedule DoWork if there's something to do.
426 if (!queue->immediate_work_queue()->Empty() ||
427 !queue->delayed_work_queue()->Empty()) {
428 MaybeScheduleImmediateWork(FROM_HERE);
429 }
430 }
431
432 void TaskQueueManager::OnTriedToSelectBlockedWorkQueue(
433 internal::WorkQueue* work_queue) {
434 DCHECK(main_thread_checker_.CalledOnValidThread());
435 DCHECK(!work_queue->Empty());
436 if (observer_) {
437 observer_->OnTriedToExecuteBlockedTask(*work_queue->task_queue(),
438 *work_queue->GetFrontTask());
439 }
440 }
441
442 } // namespace scheduler
OLDNEW
« no previous file with comments | « components/scheduler/base/task_queue_manager.h ('k') | components/scheduler/base/task_queue_manager_delegate.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698