OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/renderer/scheduler/task_queue_manager.h" | 5 #include "content/renderer/scheduler/task_queue_manager.h" |
6 | 6 |
| 7 #include <queue> |
| 8 |
7 #include "base/bind.h" | 9 #include "base/bind.h" |
8 #include "base/debug/trace_event.h" | |
9 #include "base/debug/trace_event_argument.h" | |
10 #include "base/trace_event/trace_event.h" | 10 #include "base/trace_event/trace_event.h" |
11 #include "base/trace_event/trace_event_argument.h" | 11 #include "base/trace_event/trace_event_argument.h" |
| 12 #include "cc/test/test_now_source.h" |
12 #include "content/renderer/scheduler/task_queue_selector.h" | 13 #include "content/renderer/scheduler/task_queue_selector.h" |
13 | 14 |
| 15 namespace { |
| 16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max(); |
| 17 } |
| 18 |
14 namespace content { | 19 namespace content { |
15 namespace internal { | 20 namespace internal { |
16 | 21 |
17 class TaskQueue : public base::SingleThreadTaskRunner { | 22 class TaskQueue : public base::SingleThreadTaskRunner { |
18 public: | 23 public: |
19 TaskQueue(TaskQueueManager* task_queue_manager); | 24 TaskQueue(TaskQueueManager* task_queue_manager); |
20 | 25 |
21 // base::SingleThreadTaskRunner implementation. | 26 // base::SingleThreadTaskRunner implementation. |
22 bool RunsTasksOnCurrentThread() const override; | 27 bool RunsTasksOnCurrentThread() const override; |
23 bool PostDelayedTask(const tracked_objects::Location& from_here, | 28 bool PostDelayedTask(const tracked_objects::Location& from_here, |
(...skipping 11 matching lines...) Expand all Loading... |
35 // Adds a task at the end of the incoming task queue and schedules a call to | 40 // Adds a task at the end of the incoming task queue and schedules a call to |
36 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic | 41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic |
37 // pumping is enabled. Can be called on an arbitrary thread. | 42 // pumping is enabled. Can be called on an arbitrary thread. |
38 void EnqueueTask(const base::PendingTask& pending_task); | 43 void EnqueueTask(const base::PendingTask& pending_task); |
39 | 44 |
40 bool IsQueueEmpty() const; | 45 bool IsQueueEmpty() const; |
41 | 46 |
42 void SetAutoPump(bool auto_pump); | 47 void SetAutoPump(bool auto_pump); |
43 void PumpQueue(); | 48 void PumpQueue(); |
44 | 49 |
45 bool UpdateWorkQueue(); | 50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task); |
46 base::PendingTask TakeTaskFromWorkQueue(); | 51 base::PendingTask TakeTaskFromWorkQueue(); |
47 | 52 |
48 void WillDeleteTaskQueueManager(); | 53 void WillDeleteTaskQueueManager(); |
49 | 54 |
50 base::TaskQueue& work_queue() { return work_queue_; } | 55 base::TaskQueue& work_queue() { return work_queue_; } |
51 | 56 |
52 void set_name(const char* name) { name_ = name; } | 57 void set_name(const char* name) { name_ = name; } |
53 | 58 |
54 void AsValueInto(base::debug::TracedValue* state) const; | 59 void AsValueInto(base::debug::TracedValue* state) const; |
55 | 60 |
(...skipping 13 matching lines...) Expand all Loading... |
69 base::debug::TracedValue* state); | 74 base::debug::TracedValue* state); |
70 static void TaskAsValueInto(const base::PendingTask& task, | 75 static void TaskAsValueInto(const base::PendingTask& task, |
71 base::debug::TracedValue* state); | 76 base::debug::TracedValue* state); |
72 | 77 |
73 // This lock protects all members except the work queue. | 78 // This lock protects all members except the work queue. |
74 mutable base::Lock lock_; | 79 mutable base::Lock lock_; |
75 TaskQueueManager* task_queue_manager_; | 80 TaskQueueManager* task_queue_manager_; |
76 base::TaskQueue incoming_queue_; | 81 base::TaskQueue incoming_queue_; |
77 bool auto_pump_; | 82 bool auto_pump_; |
78 const char* name_; | 83 const char* name_; |
| 84 std::priority_queue<base::TimeTicks, |
| 85 std::vector<base::TimeTicks>, |
| 86 std::greater<base::TimeTicks>> delayed_task_run_times_; |
79 | 87 |
80 base::TaskQueue work_queue_; | 88 base::TaskQueue work_queue_; |
81 | 89 |
82 DISALLOW_COPY_AND_ASSIGN(TaskQueue); | 90 DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
83 }; | 91 }; |
84 | 92 |
85 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) | 93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) |
86 : task_queue_manager_(task_queue_manager), | 94 : task_queue_manager_(task_queue_manager), |
87 auto_pump_(true), | 95 auto_pump_(true), |
88 name_(nullptr) { | 96 name_(nullptr) { |
(...skipping 19 matching lines...) Expand all Loading... |
108 base::TimeDelta delay, | 116 base::TimeDelta delay, |
109 bool nestable) { | 117 bool nestable) { |
110 base::AutoLock lock(lock_); | 118 base::AutoLock lock(lock_); |
111 if (!task_queue_manager_) | 119 if (!task_queue_manager_) |
112 return false; | 120 return false; |
113 | 121 |
114 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); | 122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); |
115 task_queue_manager_->DidQueueTask(&pending_task); | 123 task_queue_manager_->DidQueueTask(&pending_task); |
116 | 124 |
117 if (delay > base::TimeDelta()) { | 125 if (delay > base::TimeDelta()) { |
| 126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay; |
| 127 delayed_task_run_times_.push(pending_task.delayed_run_time); |
118 return task_queue_manager_->PostDelayedTask( | 128 return task_queue_manager_->PostDelayedTask( |
119 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); | 129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); |
120 } | 130 } |
121 EnqueueTaskLocked(pending_task); | 131 EnqueueTaskLocked(pending_task); |
122 return true; | 132 return true; |
123 } | 133 } |
124 | 134 |
125 bool TaskQueue::IsQueueEmpty() const { | 135 bool TaskQueue::IsQueueEmpty() const { |
126 if (!work_queue_.empty()) | 136 if (!work_queue_.empty()) |
127 return false; | 137 return false; |
128 | 138 |
129 { | 139 { |
130 base::AutoLock lock(lock_); | 140 base::AutoLock lock(lock_); |
131 return incoming_queue_.empty(); | 141 return incoming_queue_.empty(); |
132 } | 142 } |
133 } | 143 } |
134 | 144 |
135 bool TaskQueue::UpdateWorkQueue() { | 145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) { |
136 if (!work_queue_.empty()) | 146 if (!work_queue_.empty()) |
137 return true; | 147 return true; |
138 | 148 |
139 { | 149 { |
140 base::AutoLock lock(lock_); | 150 base::AutoLock lock(lock_); |
| 151 if (!delayed_task_run_times_.empty()) { |
| 152 *next_pending_delayed_task = |
| 153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top()); |
| 154 } |
141 if (!auto_pump_ || incoming_queue_.empty()) | 155 if (!auto_pump_ || incoming_queue_.empty()) |
142 return false; | 156 return false; |
143 work_queue_.Swap(&incoming_queue_); | 157 work_queue_.Swap(&incoming_queue_); |
144 TraceWorkQueueSize(); | 158 TraceWorkQueueSize(); |
145 return true; | 159 return true; |
146 } | 160 } |
147 } | 161 } |
148 | 162 |
149 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { | 163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { |
150 base::PendingTask pending_task = work_queue_.front(); | 164 base::PendingTask pending_task = work_queue_.front(); |
(...skipping 14 matching lines...) Expand all Loading... |
165 EnqueueTaskLocked(pending_task); | 179 EnqueueTaskLocked(pending_task); |
166 } | 180 } |
167 | 181 |
168 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { | 182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
169 lock_.AssertAcquired(); | 183 lock_.AssertAcquired(); |
170 if (!task_queue_manager_) | 184 if (!task_queue_manager_) |
171 return; | 185 return; |
172 if (auto_pump_ && incoming_queue_.empty()) | 186 if (auto_pump_ && incoming_queue_.empty()) |
173 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | 187 task_queue_manager_->MaybePostDoWorkOnMainRunner(); |
174 incoming_queue_.push(pending_task); | 188 incoming_queue_.push(pending_task); |
| 189 |
| 190 if (!pending_task.delayed_run_time.is_null()) { |
| 191 // Update the time of the next pending delayed task. |
| 192 while (!delayed_task_run_times_.empty() && |
| 193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) { |
| 194 delayed_task_run_times_.pop(); |
| 195 } |
| 196 // Clear the delayed run time because we've already applied the delay |
| 197 // before getting here. |
| 198 incoming_queue_.back().delayed_run_time = base::TimeTicks(); |
| 199 } |
175 } | 200 } |
176 | 201 |
177 void TaskQueue::SetAutoPump(bool auto_pump) { | 202 void TaskQueue::SetAutoPump(bool auto_pump) { |
178 base::AutoLock lock(lock_); | 203 base::AutoLock lock(lock_); |
179 if (auto_pump) { | 204 if (auto_pump) { |
180 auto_pump_ = true; | 205 auto_pump_ = true; |
181 PumpQueueLocked(); | 206 PumpQueueLocked(); |
182 } else { | 207 } else { |
183 auto_pump_ = false; | 208 auto_pump_ = false; |
184 } | 209 } |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
240 | 265 |
241 } // namespace internal | 266 } // namespace internal |
242 | 267 |
243 TaskQueueManager::TaskQueueManager( | 268 TaskQueueManager::TaskQueueManager( |
244 size_t task_queue_count, | 269 size_t task_queue_count, |
245 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 270 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
246 TaskQueueSelector* selector) | 271 TaskQueueSelector* selector) |
247 : main_task_runner_(main_task_runner), | 272 : main_task_runner_(main_task_runner), |
248 selector_(selector), | 273 selector_(selector), |
249 pending_dowork_count_(0), | 274 pending_dowork_count_(0), |
| 275 work_batch_size_(1), |
| 276 time_source_(nullptr), |
250 weak_factory_(this) { | 277 weak_factory_(this) { |
251 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); | 278 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); |
252 TRACE_EVENT_OBJECT_CREATED_WITH_ID( | 279 TRACE_EVENT_OBJECT_CREATED_WITH_ID( |
253 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", | 280 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", |
254 this); | 281 this); |
255 | 282 |
256 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); | 283 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); |
257 for (size_t i = 0; i < task_queue_count; i++) { | 284 for (size_t i = 0; i < task_queue_count; i++) { |
258 scoped_refptr<internal::TaskQueue> queue( | 285 scoped_refptr<internal::TaskQueue> queue( |
259 make_scoped_refptr(new internal::TaskQueue(this))); | 286 make_scoped_refptr(new internal::TaskQueue(this))); |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
294 internal::TaskQueue* queue = Queue(queue_index); | 321 internal::TaskQueue* queue = Queue(queue_index); |
295 queue->SetAutoPump(auto_pump); | 322 queue->SetAutoPump(auto_pump); |
296 } | 323 } |
297 | 324 |
298 void TaskQueueManager::PumpQueue(size_t queue_index) { | 325 void TaskQueueManager::PumpQueue(size_t queue_index) { |
299 main_thread_checker_.CalledOnValidThread(); | 326 main_thread_checker_.CalledOnValidThread(); |
300 internal::TaskQueue* queue = Queue(queue_index); | 327 internal::TaskQueue* queue = Queue(queue_index); |
301 queue->PumpQueue(); | 328 queue->PumpQueue(); |
302 } | 329 } |
303 | 330 |
304 bool TaskQueueManager::UpdateWorkQueues() { | 331 bool TaskQueueManager::UpdateWorkQueues( |
| 332 base::TimeTicks* next_pending_delayed_task) { |
305 // TODO(skyostil): This is not efficient when the number of queues grows very | 333 // TODO(skyostil): This is not efficient when the number of queues grows very |
306 // large due to the number of locks taken. Consider optimizing when we get | 334 // large due to the number of locks taken. Consider optimizing when we get |
307 // there. | 335 // there. |
308 main_thread_checker_.CalledOnValidThread(); | 336 main_thread_checker_.CalledOnValidThread(); |
309 bool has_work = false; | 337 bool has_work = false; |
310 for (auto& queue : queues_) | 338 for (auto& queue : queues_) { |
311 has_work |= queue->UpdateWorkQueue(); | 339 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task); |
| 340 if (!queue->work_queue().empty()) { |
| 341 // Currently we should not be getting tasks with delayed run times in any |
| 342 // of the work queues. |
| 343 DCHECK(queue->work_queue().front().delayed_run_time.is_null()); |
| 344 } |
| 345 } |
312 return has_work; | 346 return has_work; |
313 } | 347 } |
314 | 348 |
315 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { | 349 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { |
316 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); | 350 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); |
317 if (on_main_thread) { | 351 if (on_main_thread) { |
318 // We only want one pending DoWork posted from the main thread, or we risk | 352 // We only want one pending DoWork posted from the main thread, or we risk |
319 // an explosion of pending DoWorks which could starve out everything else. | 353 // an explosion of pending DoWorks which could starve out everything else. |
320 if (pending_dowork_count_ > 0) { | 354 if (pending_dowork_count_ > 0) { |
321 return; | 355 return; |
322 } | 356 } |
323 pending_dowork_count_++; | 357 pending_dowork_count_++; |
324 } | 358 } |
325 | 359 |
326 main_task_runner_->PostTask( | 360 main_task_runner_->PostTask( |
327 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, | 361 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, |
328 on_main_thread)); | 362 on_main_thread)); |
329 } | 363 } |
330 | 364 |
331 void TaskQueueManager::DoWork(bool posted_from_main_thread) { | 365 void TaskQueueManager::DoWork(bool posted_from_main_thread) { |
332 if (posted_from_main_thread) { | 366 if (posted_from_main_thread) { |
333 pending_dowork_count_--; | 367 pending_dowork_count_--; |
334 DCHECK_GE(pending_dowork_count_, 0); | 368 DCHECK_GE(pending_dowork_count_, 0); |
335 } | 369 } |
336 main_thread_checker_.CalledOnValidThread(); | 370 main_thread_checker_.CalledOnValidThread(); |
337 if (!UpdateWorkQueues()) | |
338 return; | |
339 | 371 |
340 size_t queue_index; | 372 base::TimeTicks next_pending_delayed_task( |
341 if (!SelectWorkQueueToService(&queue_index)) | 373 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); |
342 return; | 374 for (int i = 0; i < work_batch_size_; i++) { |
343 MaybePostDoWorkOnMainRunner(); | 375 if (!UpdateWorkQueues(&next_pending_delayed_task)) |
344 ProcessTaskFromWorkQueue(queue_index); | 376 return; |
| 377 |
| 378 // Interrupt the work batch if we should run the next delayed task. |
| 379 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks && |
| 380 Now() >= next_pending_delayed_task) |
| 381 return; |
| 382 |
| 383 size_t queue_index; |
| 384 if (!SelectWorkQueueToService(&queue_index)) |
| 385 return; |
| 386 // Note that this function won't post another call to DoWork if one is |
| 387 // already pending, so it is safe to call it in a loop. |
| 388 MaybePostDoWorkOnMainRunner(); |
| 389 ProcessTaskFromWorkQueue(queue_index); |
| 390 } |
345 } | 391 } |
346 | 392 |
347 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { | 393 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { |
348 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); | 394 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); |
349 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( | 395 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( |
350 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, | 396 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, |
351 AsValueWithSelectorResult(should_run, *out_queue_index)); | 397 AsValueWithSelectorResult(should_run, *out_queue_index)); |
352 return should_run; | 398 return should_run; |
353 } | 399 } |
354 | 400 |
(...skipping 28 matching lines...) Expand all Loading... |
383 DCHECK(delay > base::TimeDelta()); | 429 DCHECK(delay > base::TimeDelta()); |
384 return main_task_runner_->PostDelayedTask(from_here, task, delay); | 430 return main_task_runner_->PostDelayedTask(from_here, task, delay); |
385 } | 431 } |
386 | 432 |
387 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { | 433 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { |
388 main_thread_checker_.CalledOnValidThread(); | 434 main_thread_checker_.CalledOnValidThread(); |
389 internal::TaskQueue* queue = Queue(queue_index); | 435 internal::TaskQueue* queue = Queue(queue_index); |
390 queue->set_name(name); | 436 queue->set_name(name); |
391 } | 437 } |
392 | 438 |
| 439 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { |
| 440 main_thread_checker_.CalledOnValidThread(); |
| 441 DCHECK_GE(work_batch_size, 1); |
| 442 work_batch_size_ = work_batch_size; |
| 443 } |
| 444 |
| 445 void TaskQueueManager::SetTimeSourceForTesting( |
| 446 scoped_refptr<cc::TestNowSource> time_source) { |
| 447 main_thread_checker_.CalledOnValidThread(); |
| 448 time_source_ = time_source; |
| 449 } |
| 450 |
| 451 base::TimeTicks TaskQueueManager::Now() const { |
| 452 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now(); |
| 453 } |
| 454 |
393 scoped_refptr<base::debug::ConvertableToTraceFormat> | 455 scoped_refptr<base::debug::ConvertableToTraceFormat> |
394 TaskQueueManager::AsValueWithSelectorResult(bool should_run, | 456 TaskQueueManager::AsValueWithSelectorResult(bool should_run, |
395 size_t selected_queue) const { | 457 size_t selected_queue) const { |
396 main_thread_checker_.CalledOnValidThread(); | 458 main_thread_checker_.CalledOnValidThread(); |
397 scoped_refptr<base::debug::TracedValue> state = | 459 scoped_refptr<base::debug::TracedValue> state = |
398 new base::debug::TracedValue(); | 460 new base::debug::TracedValue(); |
399 state->BeginArray("queues"); | 461 state->BeginArray("queues"); |
400 for (auto& queue : queues_) | 462 for (auto& queue : queues_) |
401 queue->AsValueInto(state.get()); | 463 queue->AsValueInto(state.get()); |
402 state->EndArray(); | 464 state->EndArray(); |
403 state->BeginDictionary("selector"); | 465 state->BeginDictionary("selector"); |
404 selector_->AsValueInto(state.get()); | 466 selector_->AsValueInto(state.get()); |
405 state->EndDictionary(); | 467 state->EndDictionary(); |
406 if (should_run) | 468 if (should_run) |
407 state->SetInteger("selected_queue", selected_queue); | 469 state->SetInteger("selected_queue", selected_queue); |
408 return state; | 470 return state; |
409 } | 471 } |
410 | 472 |
411 } // namespace content | 473 } // namespace content |
OLD | NEW |