| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/renderer/scheduler/task_queue_manager.h" | 5 #include "content/renderer/scheduler/task_queue_manager.h" |
| 6 | 6 |
| 7 #include <queue> | |
| 8 | |
| 9 #include "base/bind.h" | 7 #include "base/bind.h" |
| 10 #include "base/debug/trace_event.h" | 8 #include "base/debug/trace_event.h" |
| 11 #include "base/debug/trace_event_argument.h" | 9 #include "base/debug/trace_event_argument.h" |
| 12 #include "cc/test/test_now_source.h" | |
| 13 #include "content/renderer/scheduler/task_queue_selector.h" | 10 #include "content/renderer/scheduler/task_queue_selector.h" |
| 14 | 11 |
| 15 namespace { | |
| 16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max(); | |
| 17 } | |
| 18 | |
| 19 namespace content { | 12 namespace content { |
| 20 namespace internal { | 13 namespace internal { |
| 21 | 14 |
| 22 class TaskQueue : public base::SingleThreadTaskRunner { | 15 class TaskQueue : public base::SingleThreadTaskRunner { |
| 23 public: | 16 public: |
| 24 TaskQueue(TaskQueueManager* task_queue_manager); | 17 TaskQueue(TaskQueueManager* task_queue_manager); |
| 25 | 18 |
| 26 // base::SingleThreadTaskRunner implementation. | 19 // base::SingleThreadTaskRunner implementation. |
| 27 bool RunsTasksOnCurrentThread() const override; | 20 bool RunsTasksOnCurrentThread() const override; |
| 28 bool PostDelayedTask(const tracked_objects::Location& from_here, | 21 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| (...skipping 11 matching lines...) Expand all Loading... |
| 40 // Adds a task at the end of the incoming task queue and schedules a call to | 33 // Adds a task at the end of the incoming task queue and schedules a call to |
| 41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic | 34 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic |
| 42 // pumping is enabled. Can be called on an arbitrary thread. | 35 // pumping is enabled. Can be called on an arbitrary thread. |
| 43 void EnqueueTask(const base::PendingTask& pending_task); | 36 void EnqueueTask(const base::PendingTask& pending_task); |
| 44 | 37 |
| 45 bool IsQueueEmpty() const; | 38 bool IsQueueEmpty() const; |
| 46 | 39 |
| 47 void SetAutoPump(bool auto_pump); | 40 void SetAutoPump(bool auto_pump); |
| 48 void PumpQueue(); | 41 void PumpQueue(); |
| 49 | 42 |
| 50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task); | 43 bool UpdateWorkQueue(); |
| 51 base::PendingTask TakeTaskFromWorkQueue(); | 44 base::PendingTask TakeTaskFromWorkQueue(); |
| 52 | 45 |
| 53 void WillDeleteTaskQueueManager(); | 46 void WillDeleteTaskQueueManager(); |
| 54 | 47 |
| 55 base::TaskQueue& work_queue() { return work_queue_; } | 48 base::TaskQueue& work_queue() { return work_queue_; } |
| 56 | 49 |
| 57 void set_name(const char* name) { name_ = name; } | 50 void set_name(const char* name) { name_ = name; } |
| 58 | 51 |
| 59 void AsValueInto(base::debug::TracedValue* state) const; | 52 void AsValueInto(base::debug::TracedValue* state) const; |
| 60 | 53 |
| (...skipping 13 matching lines...) Expand all Loading... |
| 74 base::debug::TracedValue* state); | 67 base::debug::TracedValue* state); |
| 75 static void TaskAsValueInto(const base::PendingTask& task, | 68 static void TaskAsValueInto(const base::PendingTask& task, |
| 76 base::debug::TracedValue* state); | 69 base::debug::TracedValue* state); |
| 77 | 70 |
| 78 // This lock protects all members except the work queue. | 71 // This lock protects all members except the work queue. |
| 79 mutable base::Lock lock_; | 72 mutable base::Lock lock_; |
| 80 TaskQueueManager* task_queue_manager_; | 73 TaskQueueManager* task_queue_manager_; |
| 81 base::TaskQueue incoming_queue_; | 74 base::TaskQueue incoming_queue_; |
| 82 bool auto_pump_; | 75 bool auto_pump_; |
| 83 const char* name_; | 76 const char* name_; |
| 84 std::priority_queue<base::TimeTicks, | |
| 85 std::vector<base::TimeTicks>, | |
| 86 std::greater<base::TimeTicks>> delayed_task_run_times_; | |
| 87 | 77 |
| 88 base::TaskQueue work_queue_; | 78 base::TaskQueue work_queue_; |
| 89 | 79 |
| 90 DISALLOW_COPY_AND_ASSIGN(TaskQueue); | 80 DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
| 91 }; | 81 }; |
| 92 | 82 |
| 93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) | 83 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) |
| 94 : task_queue_manager_(task_queue_manager), | 84 : task_queue_manager_(task_queue_manager), |
| 95 auto_pump_(true), | 85 auto_pump_(true), |
| 96 name_(nullptr) { | 86 name_(nullptr) { |
| (...skipping 19 matching lines...) Expand all Loading... |
| 116 base::TimeDelta delay, | 106 base::TimeDelta delay, |
| 117 bool nestable) { | 107 bool nestable) { |
| 118 base::AutoLock lock(lock_); | 108 base::AutoLock lock(lock_); |
| 119 if (!task_queue_manager_) | 109 if (!task_queue_manager_) |
| 120 return false; | 110 return false; |
| 121 | 111 |
| 122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); | 112 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); |
| 123 task_queue_manager_->DidQueueTask(&pending_task); | 113 task_queue_manager_->DidQueueTask(&pending_task); |
| 124 | 114 |
| 125 if (delay > base::TimeDelta()) { | 115 if (delay > base::TimeDelta()) { |
| 126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay; | |
| 127 delayed_task_run_times_.push(pending_task.delayed_run_time); | |
| 128 return task_queue_manager_->PostDelayedTask( | 116 return task_queue_manager_->PostDelayedTask( |
| 129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); | 117 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); |
| 130 } | 118 } |
| 131 EnqueueTaskLocked(pending_task); | 119 EnqueueTaskLocked(pending_task); |
| 132 return true; | 120 return true; |
| 133 } | 121 } |
| 134 | 122 |
| 135 bool TaskQueue::IsQueueEmpty() const { | 123 bool TaskQueue::IsQueueEmpty() const { |
| 136 if (!work_queue_.empty()) | 124 if (!work_queue_.empty()) |
| 137 return false; | 125 return false; |
| 138 | 126 |
| 139 { | 127 { |
| 140 base::AutoLock lock(lock_); | 128 base::AutoLock lock(lock_); |
| 141 return incoming_queue_.empty(); | 129 return incoming_queue_.empty(); |
| 142 } | 130 } |
| 143 } | 131 } |
| 144 | 132 |
| 145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) { | 133 bool TaskQueue::UpdateWorkQueue() { |
| 146 if (!work_queue_.empty()) | 134 if (!work_queue_.empty()) |
| 147 return true; | 135 return true; |
| 148 | 136 |
| 149 { | 137 { |
| 150 base::AutoLock lock(lock_); | 138 base::AutoLock lock(lock_); |
| 151 if (!delayed_task_run_times_.empty()) { | |
| 152 *next_pending_delayed_task = | |
| 153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top()); | |
| 154 } | |
| 155 if (!auto_pump_ || incoming_queue_.empty()) | 139 if (!auto_pump_ || incoming_queue_.empty()) |
| 156 return false; | 140 return false; |
| 157 work_queue_.Swap(&incoming_queue_); | 141 work_queue_.Swap(&incoming_queue_); |
| 158 TraceWorkQueueSize(); | 142 TraceWorkQueueSize(); |
| 159 return true; | 143 return true; |
| 160 } | 144 } |
| 161 } | 145 } |
| 162 | 146 |
| 163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { | 147 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { |
| 164 base::PendingTask pending_task = work_queue_.front(); | 148 base::PendingTask pending_task = work_queue_.front(); |
| (...skipping 14 matching lines...) Expand all Loading... |
| 179 EnqueueTaskLocked(pending_task); | 163 EnqueueTaskLocked(pending_task); |
| 180 } | 164 } |
| 181 | 165 |
| 182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { | 166 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
| 183 lock_.AssertAcquired(); | 167 lock_.AssertAcquired(); |
| 184 if (!task_queue_manager_) | 168 if (!task_queue_manager_) |
| 185 return; | 169 return; |
| 186 if (auto_pump_ && incoming_queue_.empty()) | 170 if (auto_pump_ && incoming_queue_.empty()) |
| 187 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | 171 task_queue_manager_->MaybePostDoWorkOnMainRunner(); |
| 188 incoming_queue_.push(pending_task); | 172 incoming_queue_.push(pending_task); |
| 189 | |
| 190 if (!pending_task.delayed_run_time.is_null()) { | |
| 191 // Update the time of the next pending delayed task. | |
| 192 while (!delayed_task_run_times_.empty() && | |
| 193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) { | |
| 194 delayed_task_run_times_.pop(); | |
| 195 } | |
| 196 } | |
| 197 } | 173 } |
| 198 | 174 |
| 199 void TaskQueue::SetAutoPump(bool auto_pump) { | 175 void TaskQueue::SetAutoPump(bool auto_pump) { |
| 200 base::AutoLock lock(lock_); | 176 base::AutoLock lock(lock_); |
| 201 if (auto_pump) { | 177 if (auto_pump) { |
| 202 auto_pump_ = true; | 178 auto_pump_ = true; |
| 203 PumpQueueLocked(); | 179 PumpQueueLocked(); |
| 204 } else { | 180 } else { |
| 205 auto_pump_ = false; | 181 auto_pump_ = false; |
| 206 } | 182 } |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 262 | 238 |
| 263 } // namespace internal | 239 } // namespace internal |
| 264 | 240 |
| 265 TaskQueueManager::TaskQueueManager( | 241 TaskQueueManager::TaskQueueManager( |
| 266 size_t task_queue_count, | 242 size_t task_queue_count, |
| 267 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 243 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
| 268 TaskQueueSelector* selector) | 244 TaskQueueSelector* selector) |
| 269 : main_task_runner_(main_task_runner), | 245 : main_task_runner_(main_task_runner), |
| 270 selector_(selector), | 246 selector_(selector), |
| 271 pending_dowork_count_(0), | 247 pending_dowork_count_(0), |
| 272 work_batch_size_(1), | |
| 273 time_source_(nullptr), | |
| 274 weak_factory_(this) { | 248 weak_factory_(this) { |
| 275 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); | 249 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); |
| 276 TRACE_EVENT_OBJECT_CREATED_WITH_ID( | 250 TRACE_EVENT_OBJECT_CREATED_WITH_ID( |
| 277 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", | 251 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", |
| 278 this); | 252 this); |
| 279 | 253 |
| 280 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); | 254 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); |
| 281 for (size_t i = 0; i < task_queue_count; i++) { | 255 for (size_t i = 0; i < task_queue_count; i++) { |
| 282 scoped_refptr<internal::TaskQueue> queue( | 256 scoped_refptr<internal::TaskQueue> queue( |
| 283 make_scoped_refptr(new internal::TaskQueue(this))); | 257 make_scoped_refptr(new internal::TaskQueue(this))); |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 318 internal::TaskQueue* queue = Queue(queue_index); | 292 internal::TaskQueue* queue = Queue(queue_index); |
| 319 queue->SetAutoPump(auto_pump); | 293 queue->SetAutoPump(auto_pump); |
| 320 } | 294 } |
| 321 | 295 |
| 322 void TaskQueueManager::PumpQueue(size_t queue_index) { | 296 void TaskQueueManager::PumpQueue(size_t queue_index) { |
| 323 main_thread_checker_.CalledOnValidThread(); | 297 main_thread_checker_.CalledOnValidThread(); |
| 324 internal::TaskQueue* queue = Queue(queue_index); | 298 internal::TaskQueue* queue = Queue(queue_index); |
| 325 queue->PumpQueue(); | 299 queue->PumpQueue(); |
| 326 } | 300 } |
| 327 | 301 |
| 328 bool TaskQueueManager::UpdateWorkQueues( | 302 bool TaskQueueManager::UpdateWorkQueues() { |
| 329 base::TimeTicks* next_pending_delayed_task) { | |
| 330 // TODO(skyostil): This is not efficient when the number of queues grows very | 303 // TODO(skyostil): This is not efficient when the number of queues grows very |
| 331 // large due to the number of locks taken. Consider optimizing when we get | 304 // large due to the number of locks taken. Consider optimizing when we get |
| 332 // there. | 305 // there. |
| 333 main_thread_checker_.CalledOnValidThread(); | 306 main_thread_checker_.CalledOnValidThread(); |
| 334 bool has_work = false; | 307 bool has_work = false; |
| 335 for (auto& queue : queues_) | 308 for (auto& queue : queues_) |
| 336 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task); | 309 has_work |= queue->UpdateWorkQueue(); |
| 337 return has_work; | 310 return has_work; |
| 338 } | 311 } |
| 339 | 312 |
| 340 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { | 313 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { |
| 341 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); | 314 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); |
| 342 if (on_main_thread) { | 315 if (on_main_thread) { |
| 343 // We only want one pending DoWork posted from the main thread, or we risk | 316 // We only want one pending DoWork posted from the main thread, or we risk |
| 344 // an explosion of pending DoWorks which could starve out everything else. | 317 // an explosion of pending DoWorks which could starve out everything else. |
| 345 if (pending_dowork_count_ > 0) { | 318 if (pending_dowork_count_ > 0) { |
| 346 return; | 319 return; |
| 347 } | 320 } |
| 348 pending_dowork_count_++; | 321 pending_dowork_count_++; |
| 349 } | 322 } |
| 350 | 323 |
| 351 main_task_runner_->PostTask( | 324 main_task_runner_->PostTask( |
| 352 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, | 325 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, |
| 353 on_main_thread)); | 326 on_main_thread)); |
| 354 } | 327 } |
| 355 | 328 |
| 356 void TaskQueueManager::DoWork(bool posted_from_main_thread) { | 329 void TaskQueueManager::DoWork(bool posted_from_main_thread) { |
| 357 if (posted_from_main_thread) { | 330 if (posted_from_main_thread) { |
| 358 pending_dowork_count_--; | 331 pending_dowork_count_--; |
| 359 DCHECK_GE(pending_dowork_count_, 0); | 332 DCHECK_GE(pending_dowork_count_, 0); |
| 360 } | 333 } |
| 361 main_thread_checker_.CalledOnValidThread(); | 334 main_thread_checker_.CalledOnValidThread(); |
| 335 if (!UpdateWorkQueues()) |
| 336 return; |
| 362 | 337 |
| 363 base::TimeTicks next_pending_delayed_task( | 338 size_t queue_index; |
| 364 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); | 339 if (!SelectWorkQueueToService(&queue_index)) |
| 365 for (int i = 0; i < work_batch_size_; i++) { | 340 return; |
| 366 if (!UpdateWorkQueues(&next_pending_delayed_task)) | 341 MaybePostDoWorkOnMainRunner(); |
| 367 return; | 342 ProcessTaskFromWorkQueue(queue_index); |
| 368 | |
| 369 // Interrupt the work batch if we should run the next delayed task. | |
| 370 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks && | |
| 371 Now() >= next_pending_delayed_task) | |
| 372 return; | |
| 373 | |
| 374 size_t queue_index; | |
| 375 if (!SelectWorkQueueToService(&queue_index)) | |
| 376 return; | |
| 377 // Note that this function won't post another call to DoWork if one is | |
| 378 // already pending, so it is safe to call it in a loop. | |
| 379 MaybePostDoWorkOnMainRunner(); | |
| 380 ProcessTaskFromWorkQueue(queue_index); | |
| 381 } | |
| 382 } | 343 } |
| 383 | 344 |
| 384 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { | 345 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { |
| 385 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); | 346 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); |
| 386 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( | 347 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( |
| 387 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, | 348 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, |
| 388 AsValueWithSelectorResult(should_run, *out_queue_index)); | 349 AsValueWithSelectorResult(should_run, *out_queue_index)); |
| 389 return should_run; | 350 return should_run; |
| 390 } | 351 } |
| 391 | 352 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 420 DCHECK(delay > base::TimeDelta()); | 381 DCHECK(delay > base::TimeDelta()); |
| 421 return main_task_runner_->PostDelayedTask(from_here, task, delay); | 382 return main_task_runner_->PostDelayedTask(from_here, task, delay); |
| 422 } | 383 } |
| 423 | 384 |
| 424 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { | 385 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { |
| 425 main_thread_checker_.CalledOnValidThread(); | 386 main_thread_checker_.CalledOnValidThread(); |
| 426 internal::TaskQueue* queue = Queue(queue_index); | 387 internal::TaskQueue* queue = Queue(queue_index); |
| 427 queue->set_name(name); | 388 queue->set_name(name); |
| 428 } | 389 } |
| 429 | 390 |
| 430 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { | |
| 431 main_thread_checker_.CalledOnValidThread(); | |
| 432 DCHECK_GE(work_batch_size, 1); | |
| 433 work_batch_size_ = work_batch_size; | |
| 434 } | |
| 435 | |
| 436 void TaskQueueManager::SetTimeSourceForTesting( | |
| 437 scoped_refptr<cc::TestNowSource> time_source) { | |
| 438 main_thread_checker_.CalledOnValidThread(); | |
| 439 time_source_ = time_source; | |
| 440 } | |
| 441 | |
| 442 base::TimeTicks TaskQueueManager::Now() const { | |
| 443 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now(); | |
| 444 } | |
| 445 | |
| 446 scoped_refptr<base::debug::ConvertableToTraceFormat> | 391 scoped_refptr<base::debug::ConvertableToTraceFormat> |
| 447 TaskQueueManager::AsValueWithSelectorResult(bool should_run, | 392 TaskQueueManager::AsValueWithSelectorResult(bool should_run, |
| 448 size_t selected_queue) const { | 393 size_t selected_queue) const { |
| 449 main_thread_checker_.CalledOnValidThread(); | 394 main_thread_checker_.CalledOnValidThread(); |
| 450 scoped_refptr<base::debug::TracedValue> state = | 395 scoped_refptr<base::debug::TracedValue> state = |
| 451 new base::debug::TracedValue(); | 396 new base::debug::TracedValue(); |
| 452 state->BeginArray("queues"); | 397 state->BeginArray("queues"); |
| 453 for (auto& queue : queues_) | 398 for (auto& queue : queues_) |
| 454 queue->AsValueInto(state.get()); | 399 queue->AsValueInto(state.get()); |
| 455 state->EndArray(); | 400 state->EndArray(); |
| 456 state->BeginDictionary("selector"); | 401 state->BeginDictionary("selector"); |
| 457 selector_->AsValueInto(state.get()); | 402 selector_->AsValueInto(state.get()); |
| 458 state->EndDictionary(); | 403 state->EndDictionary(); |
| 459 if (should_run) | 404 if (should_run) |
| 460 state->SetInteger("selected_queue", selected_queue); | 405 state->SetInteger("selected_queue", selected_queue); |
| 461 return state; | 406 return state; |
| 462 } | 407 } |
| 463 | 408 |
| 464 } // namespace content | 409 } // namespace content |
| OLD | NEW |