| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "components/scheduler/base/task_queue_manager.h" | |
| 6 | |
| 7 #include <queue> | |
| 8 #include <set> | |
| 9 | |
| 10 #include "base/bind.h" | |
| 11 #include "base/metrics/histogram_macros.h" | |
| 12 #include "base/trace_event/trace_event.h" | |
| 13 #include "components/scheduler/base/real_time_domain.h" | |
| 14 #include "components/scheduler/base/task_queue_impl.h" | |
| 15 #include "components/scheduler/base/task_queue_manager_delegate.h" | |
| 16 #include "components/scheduler/base/task_queue_selector.h" | |
| 17 #include "components/scheduler/base/task_time_tracker.h" | |
| 18 #include "components/scheduler/base/work_queue.h" | |
| 19 #include "components/scheduler/base/work_queue_sets.h" | |
| 20 | |
| 21 namespace scheduler { | |
| 22 | |
| 23 namespace { | |
| 24 const size_t kRecordRecordTaskDelayHistogramsEveryNTasks = 10; | |
| 25 | |
| 26 void RecordDelayedTaskLateness(base::TimeDelta lateness) { | |
| 27 UMA_HISTOGRAM_TIMES("RendererScheduler.TaskQueueManager.DelayedTaskLateness", | |
| 28 lateness); | |
| 29 } | |
| 30 | |
| 31 void RecordImmediateTaskQueueingDuration(tracked_objects::Duration duration) { | |
| 32 UMA_HISTOGRAM_TIMES( | |
| 33 "RendererScheduler.TaskQueueManager.ImmediateTaskQueueingDuration", | |
| 34 base::TimeDelta::FromMilliseconds(duration.InMilliseconds())); | |
| 35 } | |
| 36 } | |
| 37 | |
| 38 TaskQueueManager::TaskQueueManager( | |
| 39 scoped_refptr<TaskQueueManagerDelegate> delegate, | |
| 40 const char* tracing_category, | |
| 41 const char* disabled_by_default_tracing_category, | |
| 42 const char* disabled_by_default_verbose_tracing_category) | |
| 43 : real_time_domain_(new RealTimeDomain(tracing_category)), | |
| 44 delegate_(delegate), | |
| 45 task_was_run_on_quiescence_monitored_queue_(false), | |
| 46 work_batch_size_(1), | |
| 47 task_count_(0), | |
| 48 task_time_tracker_(nullptr), | |
| 49 tracing_category_(tracing_category), | |
| 50 disabled_by_default_tracing_category_( | |
| 51 disabled_by_default_tracing_category), | |
| 52 disabled_by_default_verbose_tracing_category_( | |
| 53 disabled_by_default_verbose_tracing_category), | |
| 54 currently_executing_task_queue_(nullptr), | |
| 55 observer_(nullptr), | |
| 56 deletion_sentinel_(new DeletionSentinel()), | |
| 57 weak_factory_(this) { | |
| 58 DCHECK(delegate->RunsTasksOnCurrentThread()); | |
| 59 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category, | |
| 60 "TaskQueueManager", this); | |
| 61 selector_.SetTaskQueueSelectorObserver(this); | |
| 62 | |
| 63 from_main_thread_immediate_do_work_closure_ = | |
| 64 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), | |
| 65 base::TimeTicks(), true); | |
| 66 from_other_thread_immediate_do_work_closure_ = | |
| 67 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), | |
| 68 base::TimeTicks(), false); | |
| 69 | |
| 70 // TODO(alexclarke): Change this to be a parameter that's passed in. | |
| 71 RegisterTimeDomain(real_time_domain_.get()); | |
| 72 } | |
| 73 | |
| 74 TaskQueueManager::~TaskQueueManager() { | |
| 75 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_, | |
| 76 "TaskQueueManager", this); | |
| 77 | |
| 78 while (!queues_.empty()) | |
| 79 (*queues_.begin())->UnregisterTaskQueue(); | |
| 80 | |
| 81 selector_.SetTaskQueueSelectorObserver(nullptr); | |
| 82 } | |
| 83 | |
| 84 void TaskQueueManager::RegisterTimeDomain(TimeDomain* time_domain) { | |
| 85 time_domains_.insert(time_domain); | |
| 86 time_domain->OnRegisterWithTaskQueueManager(this); | |
| 87 } | |
| 88 | |
| 89 void TaskQueueManager::UnregisterTimeDomain(TimeDomain* time_domain) { | |
| 90 time_domains_.erase(time_domain); | |
| 91 } | |
| 92 | |
| 93 scoped_refptr<internal::TaskQueueImpl> TaskQueueManager::NewTaskQueue( | |
| 94 const TaskQueue::Spec& spec) { | |
| 95 TRACE_EVENT1(tracing_category_, | |
| 96 "TaskQueueManager::NewTaskQueue", "queue_name", spec.name); | |
| 97 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 98 TimeDomain* time_domain = | |
| 99 spec.time_domain ? spec.time_domain : real_time_domain_.get(); | |
| 100 DCHECK(time_domains_.find(time_domain) != time_domains_.end()); | |
| 101 scoped_refptr<internal::TaskQueueImpl> queue( | |
| 102 make_scoped_refptr(new internal::TaskQueueImpl( | |
| 103 this, time_domain, spec, disabled_by_default_tracing_category_, | |
| 104 disabled_by_default_verbose_tracing_category_))); | |
| 105 queues_.insert(queue); | |
| 106 selector_.AddQueue(queue.get()); | |
| 107 return queue; | |
| 108 } | |
| 109 | |
| 110 void TaskQueueManager::SetObserver(Observer* observer) { | |
| 111 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 112 observer_ = observer; | |
| 113 } | |
| 114 | |
| 115 void TaskQueueManager::UnregisterTaskQueue( | |
| 116 scoped_refptr<internal::TaskQueueImpl> task_queue) { | |
| 117 TRACE_EVENT1(tracing_category_, | |
| 118 "TaskQueueManager::UnregisterTaskQueue", "queue_name", | |
| 119 task_queue->GetName()); | |
| 120 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 121 if (observer_) | |
| 122 observer_->OnUnregisterTaskQueue(task_queue); | |
| 123 | |
| 124 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being | |
| 125 // freed while any of our structures hold hold a raw pointer to it. | |
| 126 queues_to_delete_.insert(task_queue); | |
| 127 queues_.erase(task_queue); | |
| 128 selector_.RemoveQueue(task_queue.get()); | |
| 129 } | |
| 130 | |
| 131 void TaskQueueManager::UpdateWorkQueues( | |
| 132 bool should_trigger_wakeup, | |
| 133 const internal::TaskQueueImpl::Task* previous_task, | |
| 134 LazyNow lazy_now) { | |
| 135 TRACE_EVENT0(disabled_by_default_tracing_category_, | |
| 136 "TaskQueueManager::UpdateWorkQueues"); | |
| 137 | |
| 138 for (TimeDomain* time_domain : time_domains_) { | |
| 139 LazyNow lazy_now_in_domain = time_domain == real_time_domain_.get() | |
| 140 ? lazy_now | |
| 141 : time_domain->CreateLazyNow(); | |
| 142 time_domain->UpdateWorkQueues(should_trigger_wakeup, previous_task, | |
| 143 lazy_now_in_domain); | |
| 144 } | |
| 145 } | |
| 146 | |
| 147 void TaskQueueManager::MaybeScheduleImmediateWork( | |
| 148 const tracked_objects::Location& from_here) { | |
| 149 bool on_main_thread = delegate_->BelongsToCurrentThread(); | |
| 150 // De-duplicate DoWork posts. | |
| 151 if (on_main_thread) { | |
| 152 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { | |
| 153 return; | |
| 154 } | |
| 155 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); | |
| 156 } else { | |
| 157 { | |
| 158 base::AutoLock lock(other_thread_lock_); | |
| 159 if (!other_thread_pending_wakeups_.insert(base::TimeTicks()).second) | |
| 160 return; | |
| 161 } | |
| 162 delegate_->PostTask(from_here, | |
| 163 from_other_thread_immediate_do_work_closure_); | |
| 164 } | |
| 165 } | |
| 166 | |
| 167 void TaskQueueManager::MaybeScheduleDelayedWork( | |
| 168 const tracked_objects::Location& from_here, | |
| 169 base::TimeTicks now, | |
| 170 base::TimeDelta delay) { | |
| 171 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 172 DCHECK_GE(delay, base::TimeDelta()); | |
| 173 base::TimeTicks run_time = now + delay; | |
| 174 // De-duplicate DoWork posts. | |
| 175 if (!main_thread_pending_wakeups_.insert(run_time).second) | |
| 176 return; | |
| 177 delegate_->PostDelayedTask( | |
| 178 from_here, base::Bind(&TaskQueueManager::DoWork, | |
| 179 weak_factory_.GetWeakPtr(), run_time, true), | |
| 180 delay); | |
| 181 } | |
| 182 | |
| 183 void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) { | |
| 184 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 185 TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork", | |
| 186 "from_main_thread", from_main_thread); | |
| 187 if (from_main_thread) { | |
| 188 main_thread_pending_wakeups_.erase(run_time); | |
| 189 } else { | |
| 190 base::AutoLock lock(other_thread_lock_); | |
| 191 other_thread_pending_wakeups_.erase(run_time); | |
| 192 } | |
| 193 | |
| 194 if (!delegate_->IsNested()) | |
| 195 queues_to_delete_.clear(); | |
| 196 | |
| 197 LazyNow lazy_now(real_time_domain()->CreateLazyNow()); | |
| 198 base::TimeTicks task_start_time; | |
| 199 | |
| 200 if (!delegate_->IsNested() && task_time_tracker_) | |
| 201 task_start_time = lazy_now.Now(); | |
| 202 | |
| 203 // Pass false and nullptr to UpdateWorkQueues here to prevent waking up a | |
| 204 // pump-after-wakeup queue. | |
| 205 UpdateWorkQueues(false, nullptr, lazy_now); | |
| 206 | |
| 207 internal::TaskQueueImpl::Task previous_task; | |
| 208 | |
| 209 for (int i = 0; i < work_batch_size_; i++) { | |
| 210 internal::WorkQueue* work_queue; | |
| 211 if (!SelectWorkQueueToService(&work_queue)) { | |
| 212 break; | |
| 213 } | |
| 214 | |
| 215 bool should_trigger_wakeup = work_queue->task_queue()->wakeup_policy() == | |
| 216 TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES; | |
| 217 | |
| 218 switch (ProcessTaskFromWorkQueue(work_queue, &previous_task)) { | |
| 219 case ProcessTaskResult::DEFERRED: | |
| 220 // If a task was deferred, try again with another task. Note that this | |
| 221 // means deferred tasks (i.e. non-nestable tasks) will never trigger | |
| 222 // queue wake-ups. | |
| 223 continue; | |
| 224 case ProcessTaskResult::EXECUTED: | |
| 225 break; | |
| 226 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED: | |
| 227 return; // The TaskQueueManager got deleted, we must bail out. | |
| 228 } | |
| 229 | |
| 230 lazy_now = real_time_domain()->CreateLazyNow(); | |
| 231 if (!delegate_->IsNested() && task_time_tracker_) { | |
| 232 // Only report top level task durations. | |
| 233 base::TimeTicks task_end_time = lazy_now.Now(); | |
| 234 task_time_tracker_->ReportTaskTime(task_start_time, task_end_time); | |
| 235 task_start_time = task_end_time; | |
| 236 } | |
| 237 | |
| 238 work_queue = nullptr; // The queue may have been unregistered. | |
| 239 | |
| 240 UpdateWorkQueues(should_trigger_wakeup, &previous_task, lazy_now); | |
| 241 | |
| 242 // Only run a single task per batch in nested run loops so that we can | |
| 243 // properly exit the nested loop when someone calls RunLoop::Quit(). | |
| 244 if (delegate_->IsNested()) | |
| 245 break; | |
| 246 } | |
| 247 | |
| 248 // TODO(alexclarke): Consider refactoring the above loop to terminate only | |
| 249 // when there's no more work left to be done, rather than posting a | |
| 250 // continuation task. | |
| 251 if (!selector_.EnabledWorkQueuesEmpty() || TryAdvanceTimeDomains()) | |
| 252 MaybeScheduleImmediateWork(FROM_HERE); | |
| 253 } | |
| 254 | |
| 255 bool TaskQueueManager::TryAdvanceTimeDomains() { | |
| 256 bool can_advance = false; | |
| 257 for (TimeDomain* time_domain : time_domains_) { | |
| 258 can_advance |= time_domain->MaybeAdvanceTime(); | |
| 259 } | |
| 260 return can_advance; | |
| 261 } | |
| 262 | |
| 263 bool TaskQueueManager::SelectWorkQueueToService( | |
| 264 internal::WorkQueue** out_work_queue) { | |
| 265 bool should_run = selector_.SelectWorkQueueToService(out_work_queue); | |
| 266 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( | |
| 267 disabled_by_default_tracing_category_, "TaskQueueManager", this, | |
| 268 AsValueWithSelectorResult(should_run, *out_work_queue)); | |
| 269 return should_run; | |
| 270 } | |
| 271 | |
| 272 void TaskQueueManager::DidQueueTask( | |
| 273 const internal::TaskQueueImpl::Task& pending_task) { | |
| 274 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task); | |
| 275 } | |
| 276 | |
| 277 TaskQueueManager::ProcessTaskResult TaskQueueManager::ProcessTaskFromWorkQueue( | |
| 278 internal::WorkQueue* work_queue, | |
| 279 internal::TaskQueueImpl::Task* out_previous_task) { | |
| 280 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 281 scoped_refptr<DeletionSentinel> protect(deletion_sentinel_); | |
| 282 internal::TaskQueueImpl* queue = work_queue->task_queue(); | |
| 283 | |
| 284 if (queue->GetQuiescenceMonitored()) | |
| 285 task_was_run_on_quiescence_monitored_queue_ = true; | |
| 286 | |
| 287 internal::TaskQueueImpl::Task pending_task = | |
| 288 work_queue->TakeTaskFromWorkQueue(); | |
| 289 if (!pending_task.nestable && delegate_->IsNested()) { | |
| 290 // Defer non-nestable work to the main task runner. NOTE these tasks can be | |
| 291 // arbitrarily delayed so the additional delay should not be a problem. | |
| 292 // TODO(skyostil): Figure out a way to not forget which task queue the | |
| 293 // task is associated with. See http://crbug.com/522843. | |
| 294 delegate_->PostNonNestableTask(pending_task.posted_from, pending_task.task); | |
| 295 return ProcessTaskResult::DEFERRED; | |
| 296 } | |
| 297 | |
| 298 MaybeRecordTaskDelayHistograms(pending_task, queue); | |
| 299 | |
| 300 TRACE_TASK_EXECUTION("TaskQueueManager::ProcessTaskFromWorkQueue", | |
| 301 pending_task); | |
| 302 if (queue->GetShouldNotifyObservers()) { | |
| 303 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
| 304 WillProcessTask(pending_task)); | |
| 305 queue->NotifyWillProcessTask(pending_task); | |
| 306 } | |
| 307 TRACE_EVENT1(tracing_category_, | |
| 308 "TaskQueueManager::RunTask", "queue", queue->GetName()); | |
| 309 // NOTE when TaskQueues get unregistered a reference ends up getting retained | |
| 310 // by |queues_to_delete_| which is cleared at the top of |DoWork|. This means | |
| 311 // we are OK to use raw pointers here. | |
| 312 internal::TaskQueueImpl* prev_executing_task_queue = | |
| 313 currently_executing_task_queue_; | |
| 314 currently_executing_task_queue_ = queue; | |
| 315 task_annotator_.RunTask("TaskQueueManager::PostTask", pending_task); | |
| 316 | |
| 317 // Detect if the TaskQueueManager just got deleted. If this happens we must | |
| 318 // not access any member variables after this point. | |
| 319 if (protect->HasOneRef()) | |
| 320 return ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED; | |
| 321 | |
| 322 currently_executing_task_queue_ = prev_executing_task_queue; | |
| 323 | |
| 324 if (queue->GetShouldNotifyObservers()) { | |
| 325 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, | |
| 326 DidProcessTask(pending_task)); | |
| 327 queue->NotifyDidProcessTask(pending_task); | |
| 328 } | |
| 329 | |
| 330 pending_task.task.Reset(); | |
| 331 *out_previous_task = std::move(pending_task); | |
| 332 return ProcessTaskResult::EXECUTED; | |
| 333 } | |
| 334 | |
| 335 void TaskQueueManager::MaybeRecordTaskDelayHistograms( | |
| 336 const internal::TaskQueueImpl::Task& pending_task, | |
| 337 const internal::TaskQueueImpl* queue) { | |
| 338 if ((task_count_++ % kRecordRecordTaskDelayHistogramsEveryNTasks) != 0) | |
| 339 return; | |
| 340 | |
| 341 // Record delayed task lateness and immediate task queueing durations, but | |
| 342 // only for auto-pumped queues. Manually pumped and after wakeup queues can | |
| 343 // have arbitarially large delayes, which would cloud any analysis. | |
| 344 if (queue->GetPumpPolicy() == TaskQueue::PumpPolicy::AUTO) { | |
| 345 if (!pending_task.delayed_run_time.is_null()) { | |
| 346 RecordDelayedTaskLateness(delegate_->NowTicks() - | |
| 347 pending_task.delayed_run_time); | |
| 348 } else if (!pending_task.time_posted.is_null()) { | |
| 349 RecordImmediateTaskQueueingDuration(tracked_objects::TrackedTime::Now() - | |
| 350 pending_task.time_posted); | |
| 351 } | |
| 352 } | |
| 353 } | |
| 354 | |
| 355 bool TaskQueueManager::RunsTasksOnCurrentThread() const { | |
| 356 return delegate_->RunsTasksOnCurrentThread(); | |
| 357 } | |
| 358 | |
| 359 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { | |
| 360 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 361 DCHECK_GE(work_batch_size, 1); | |
| 362 work_batch_size_ = work_batch_size; | |
| 363 } | |
| 364 | |
| 365 void TaskQueueManager::AddTaskObserver( | |
| 366 base::MessageLoop::TaskObserver* task_observer) { | |
| 367 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 368 task_observers_.AddObserver(task_observer); | |
| 369 } | |
| 370 | |
| 371 void TaskQueueManager::RemoveTaskObserver( | |
| 372 base::MessageLoop::TaskObserver* task_observer) { | |
| 373 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 374 task_observers_.RemoveObserver(task_observer); | |
| 375 } | |
| 376 | |
| 377 bool TaskQueueManager::GetAndClearSystemIsQuiescentBit() { | |
| 378 bool task_was_run = task_was_run_on_quiescence_monitored_queue_; | |
| 379 task_was_run_on_quiescence_monitored_queue_ = false; | |
| 380 return !task_was_run; | |
| 381 } | |
| 382 | |
| 383 const scoped_refptr<TaskQueueManagerDelegate>& TaskQueueManager::delegate() | |
| 384 const { | |
| 385 return delegate_; | |
| 386 } | |
| 387 | |
| 388 internal::EnqueueOrder TaskQueueManager::GetNextSequenceNumber() { | |
| 389 return enqueue_order_generator_.GenerateNext(); | |
| 390 } | |
| 391 | |
| 392 LazyNow TaskQueueManager::CreateLazyNow() const { | |
| 393 return LazyNow(delegate_.get()); | |
| 394 } | |
| 395 | |
| 396 std::unique_ptr<base::trace_event::ConvertableToTraceFormat> | |
| 397 TaskQueueManager::AsValueWithSelectorResult( | |
| 398 bool should_run, | |
| 399 internal::WorkQueue* selected_work_queue) const { | |
| 400 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 401 std::unique_ptr<base::trace_event::TracedValue> state( | |
| 402 new base::trace_event::TracedValue()); | |
| 403 state->BeginArray("queues"); | |
| 404 for (auto& queue : queues_) | |
| 405 queue->AsValueInto(state.get()); | |
| 406 state->EndArray(); | |
| 407 state->BeginDictionary("selector"); | |
| 408 selector_.AsValueInto(state.get()); | |
| 409 state->EndDictionary(); | |
| 410 if (should_run) { | |
| 411 state->SetString("selected_queue", | |
| 412 selected_work_queue->task_queue()->GetName()); | |
| 413 state->SetString("work_queue_name", selected_work_queue->name()); | |
| 414 } | |
| 415 | |
| 416 state->BeginArray("time_domains"); | |
| 417 for (auto* time_domain : time_domains_) | |
| 418 time_domain->AsValueInto(state.get()); | |
| 419 state->EndArray(); | |
| 420 return std::move(state); | |
| 421 } | |
| 422 | |
| 423 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) { | |
| 424 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 425 // Only schedule DoWork if there's something to do. | |
| 426 if (!queue->immediate_work_queue()->Empty() || | |
| 427 !queue->delayed_work_queue()->Empty()) { | |
| 428 MaybeScheduleImmediateWork(FROM_HERE); | |
| 429 } | |
| 430 } | |
| 431 | |
| 432 void TaskQueueManager::OnTriedToSelectBlockedWorkQueue( | |
| 433 internal::WorkQueue* work_queue) { | |
| 434 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 435 DCHECK(!work_queue->Empty()); | |
| 436 if (observer_) { | |
| 437 observer_->OnTriedToExecuteBlockedTask(*work_queue->task_queue(), | |
| 438 *work_queue->GetFrontTask()); | |
| 439 } | |
| 440 } | |
| 441 | |
| 442 } // namespace scheduler | |
| OLD | NEW |