OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/renderer/scheduler/task_queue_manager.h" | 5 #include "content/renderer/scheduler/task_queue_manager.h" |
6 | 6 |
7 #include <queue> | |
8 | |
7 #include "base/bind.h" | 9 #include "base/bind.h" |
8 #include "base/debug/trace_event.h" | 10 #include "base/debug/trace_event.h" |
9 #include "base/debug/trace_event_argument.h" | 11 #include "base/debug/trace_event_argument.h" |
12 #include "cc/test/test_now_source.h" | |
10 #include "content/renderer/scheduler/task_queue_selector.h" | 13 #include "content/renderer/scheduler/task_queue_selector.h" |
11 | 14 |
15 namespace { | |
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max(); | |
17 } | |
18 | |
12 namespace content { | 19 namespace content { |
13 namespace internal { | 20 namespace internal { |
14 | 21 |
15 class TaskQueue : public base::SingleThreadTaskRunner { | 22 class TaskQueue : public base::SingleThreadTaskRunner { |
16 public: | 23 public: |
17 TaskQueue(TaskQueueManager* task_queue_manager); | 24 TaskQueue(TaskQueueManager* task_queue_manager); |
18 | 25 |
19 // base::SingleThreadTaskRunner implementation. | 26 // base::SingleThreadTaskRunner implementation. |
20 bool RunsTasksOnCurrentThread() const override; | 27 bool RunsTasksOnCurrentThread() const override; |
21 bool PostDelayedTask(const tracked_objects::Location& from_here, | 28 bool PostDelayedTask(const tracked_objects::Location& from_here, |
(...skipping 11 matching lines...) Expand all Loading... | |
33 // Adds a task at the end of the incoming task queue and schedules a call to | 40 // Adds a task at the end of the incoming task queue and schedules a call to |
34 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic | 41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic |
35 // pumping is enabled. Can be called on an arbitrary thread. | 42 // pumping is enabled. Can be called on an arbitrary thread. |
36 void EnqueueTask(const base::PendingTask& pending_task); | 43 void EnqueueTask(const base::PendingTask& pending_task); |
37 | 44 |
38 bool IsQueueEmpty() const; | 45 bool IsQueueEmpty() const; |
39 | 46 |
40 void SetAutoPump(bool auto_pump); | 47 void SetAutoPump(bool auto_pump); |
41 void PumpQueue(); | 48 void PumpQueue(); |
42 | 49 |
43 bool UpdateWorkQueue(); | 50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task); |
44 base::PendingTask TakeTaskFromWorkQueue(); | 51 base::PendingTask TakeTaskFromWorkQueue(); |
45 | 52 |
46 void WillDeleteTaskQueueManager(); | 53 void WillDeleteTaskQueueManager(); |
47 | 54 |
48 base::TaskQueue& work_queue() { return work_queue_; } | 55 base::TaskQueue& work_queue() { return work_queue_; } |
49 | 56 |
50 void set_name(const char* name) { name_ = name; } | 57 void set_name(const char* name) { name_ = name; } |
51 | 58 |
52 void AsValueInto(base::debug::TracedValue* state) const; | 59 void AsValueInto(base::debug::TracedValue* state) const; |
53 | 60 |
(...skipping 13 matching lines...) Expand all Loading... | |
67 base::debug::TracedValue* state); | 74 base::debug::TracedValue* state); |
68 static void TaskAsValueInto(const base::PendingTask& task, | 75 static void TaskAsValueInto(const base::PendingTask& task, |
69 base::debug::TracedValue* state); | 76 base::debug::TracedValue* state); |
70 | 77 |
71 // This lock protects all members except the work queue. | 78 // This lock protects all members except the work queue. |
72 mutable base::Lock lock_; | 79 mutable base::Lock lock_; |
73 TaskQueueManager* task_queue_manager_; | 80 TaskQueueManager* task_queue_manager_; |
74 base::TaskQueue incoming_queue_; | 81 base::TaskQueue incoming_queue_; |
75 bool auto_pump_; | 82 bool auto_pump_; |
76 const char* name_; | 83 const char* name_; |
84 std::priority_queue<base::TimeTicks, | |
85 std::vector<base::TimeTicks>, | |
86 std::greater<base::TimeTicks>> delayed_task_run_times_; | |
77 | 87 |
78 base::TaskQueue work_queue_; | 88 base::TaskQueue work_queue_; |
79 | 89 |
80 DISALLOW_COPY_AND_ASSIGN(TaskQueue); | 90 DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
81 }; | 91 }; |
82 | 92 |
83 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) | 93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) |
84 : task_queue_manager_(task_queue_manager), | 94 : task_queue_manager_(task_queue_manager), |
85 auto_pump_(true), | 95 auto_pump_(true), |
86 name_(nullptr) { | 96 name_(nullptr) { |
(...skipping 19 matching lines...) Expand all Loading... | |
106 base::TimeDelta delay, | 116 base::TimeDelta delay, |
107 bool nestable) { | 117 bool nestable) { |
108 base::AutoLock lock(lock_); | 118 base::AutoLock lock(lock_); |
109 if (!task_queue_manager_) | 119 if (!task_queue_manager_) |
110 return false; | 120 return false; |
111 | 121 |
112 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); | 122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); |
113 task_queue_manager_->DidQueueTask(&pending_task); | 123 task_queue_manager_->DidQueueTask(&pending_task); |
114 | 124 |
115 if (delay > base::TimeDelta()) { | 125 if (delay > base::TimeDelta()) { |
126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay; | |
127 delayed_task_run_times_.push(pending_task.delayed_run_time); | |
116 return task_queue_manager_->PostDelayedTask( | 128 return task_queue_manager_->PostDelayedTask( |
117 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); | 129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); |
118 } | 130 } |
119 EnqueueTaskLocked(pending_task); | 131 EnqueueTaskLocked(pending_task); |
120 return true; | 132 return true; |
121 } | 133 } |
122 | 134 |
123 bool TaskQueue::IsQueueEmpty() const { | 135 bool TaskQueue::IsQueueEmpty() const { |
124 if (!work_queue_.empty()) | 136 if (!work_queue_.empty()) |
125 return false; | 137 return false; |
126 | 138 |
127 { | 139 { |
128 base::AutoLock lock(lock_); | 140 base::AutoLock lock(lock_); |
129 return incoming_queue_.empty(); | 141 return incoming_queue_.empty(); |
130 } | 142 } |
131 } | 143 } |
132 | 144 |
133 bool TaskQueue::UpdateWorkQueue() { | 145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) { |
134 if (!work_queue_.empty()) | 146 if (!work_queue_.empty()) |
135 return true; | 147 return true; |
136 | 148 |
137 { | 149 { |
138 base::AutoLock lock(lock_); | 150 base::AutoLock lock(lock_); |
151 if (!delayed_task_run_times_.empty()) { | |
152 *next_pending_delayed_task = | |
153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top()); | |
154 } | |
139 if (!auto_pump_ || incoming_queue_.empty()) | 155 if (!auto_pump_ || incoming_queue_.empty()) |
140 return false; | 156 return false; |
141 work_queue_.Swap(&incoming_queue_); | 157 work_queue_.Swap(&incoming_queue_); |
142 TraceWorkQueueSize(); | 158 TraceWorkQueueSize(); |
143 return true; | 159 return true; |
144 } | 160 } |
145 } | 161 } |
146 | 162 |
147 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { | 163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { |
148 base::PendingTask pending_task = work_queue_.front(); | 164 base::PendingTask pending_task = work_queue_.front(); |
(...skipping 14 matching lines...) Expand all Loading... | |
163 EnqueueTaskLocked(pending_task); | 179 EnqueueTaskLocked(pending_task); |
164 } | 180 } |
165 | 181 |
166 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { | 182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
167 lock_.AssertAcquired(); | 183 lock_.AssertAcquired(); |
168 if (!task_queue_manager_) | 184 if (!task_queue_manager_) |
169 return; | 185 return; |
170 if (auto_pump_ && incoming_queue_.empty()) | 186 if (auto_pump_ && incoming_queue_.empty()) |
171 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | 187 task_queue_manager_->MaybePostDoWorkOnMainRunner(); |
172 incoming_queue_.push(pending_task); | 188 incoming_queue_.push(pending_task); |
189 | |
190 if (!pending_task.delayed_run_time.is_null()) { | |
191 // Update the time of the next pending delayed task. | |
192 while (!delayed_task_run_times_.empty() && | |
193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) { | |
194 delayed_task_run_times_.pop(); | |
195 } | |
196 } | |
173 } | 197 } |
174 | 198 |
175 void TaskQueue::SetAutoPump(bool auto_pump) { | 199 void TaskQueue::SetAutoPump(bool auto_pump) { |
176 base::AutoLock lock(lock_); | 200 base::AutoLock lock(lock_); |
177 if (auto_pump) { | 201 if (auto_pump) { |
178 auto_pump_ = true; | 202 auto_pump_ = true; |
179 PumpQueueLocked(); | 203 PumpQueueLocked(); |
180 } else { | 204 } else { |
181 auto_pump_ = false; | 205 auto_pump_ = false; |
182 } | 206 } |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
238 | 262 |
239 } // namespace internal | 263 } // namespace internal |
240 | 264 |
241 TaskQueueManager::TaskQueueManager( | 265 TaskQueueManager::TaskQueueManager( |
242 size_t task_queue_count, | 266 size_t task_queue_count, |
243 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 267 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
244 TaskQueueSelector* selector) | 268 TaskQueueSelector* selector) |
245 : main_task_runner_(main_task_runner), | 269 : main_task_runner_(main_task_runner), |
246 selector_(selector), | 270 selector_(selector), |
247 pending_dowork_count_(0), | 271 pending_dowork_count_(0), |
272 work_batch_size_(1), | |
273 time_source_(nullptr), | |
248 weak_factory_(this) { | 274 weak_factory_(this) { |
249 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); | 275 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); |
250 TRACE_EVENT_OBJECT_CREATED_WITH_ID( | 276 TRACE_EVENT_OBJECT_CREATED_WITH_ID( |
251 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", | 277 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", |
252 this); | 278 this); |
253 | 279 |
254 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); | 280 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); |
255 for (size_t i = 0; i < task_queue_count; i++) { | 281 for (size_t i = 0; i < task_queue_count; i++) { |
256 scoped_refptr<internal::TaskQueue> queue( | 282 scoped_refptr<internal::TaskQueue> queue( |
257 make_scoped_refptr(new internal::TaskQueue(this))); | 283 make_scoped_refptr(new internal::TaskQueue(this))); |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
292 internal::TaskQueue* queue = Queue(queue_index); | 318 internal::TaskQueue* queue = Queue(queue_index); |
293 queue->SetAutoPump(auto_pump); | 319 queue->SetAutoPump(auto_pump); |
294 } | 320 } |
295 | 321 |
296 void TaskQueueManager::PumpQueue(size_t queue_index) { | 322 void TaskQueueManager::PumpQueue(size_t queue_index) { |
297 main_thread_checker_.CalledOnValidThread(); | 323 main_thread_checker_.CalledOnValidThread(); |
298 internal::TaskQueue* queue = Queue(queue_index); | 324 internal::TaskQueue* queue = Queue(queue_index); |
299 queue->PumpQueue(); | 325 queue->PumpQueue(); |
300 } | 326 } |
301 | 327 |
302 bool TaskQueueManager::UpdateWorkQueues() { | 328 bool TaskQueueManager::UpdateWorkQueues( |
329 base::TimeTicks* next_pending_delayed_task) { | |
303 // TODO(skyostil): This is not efficient when the number of queues grows very | 330 // TODO(skyostil): This is not efficient when the number of queues grows very |
304 // large due to the number of locks taken. Consider optimizing when we get | 331 // large due to the number of locks taken. Consider optimizing when we get |
305 // there. | 332 // there. |
306 main_thread_checker_.CalledOnValidThread(); | 333 main_thread_checker_.CalledOnValidThread(); |
307 bool has_work = false; | 334 bool has_work = false; |
308 for (auto& queue : queues_) | 335 for (auto& queue : queues_) |
309 has_work |= queue->UpdateWorkQueue(); | 336 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task); |
310 return has_work; | 337 return has_work; |
311 } | 338 } |
312 | 339 |
313 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { | 340 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { |
314 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); | 341 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); |
315 if (on_main_thread) { | 342 if (on_main_thread) { |
316 // We only want one pending DoWork posted from the main thread, or we risk | 343 // We only want one pending DoWork posted from the main thread, or we risk |
317 // an explosion of pending DoWorks which could starve out everything else. | 344 // an explosion of pending DoWorks which could starve out everything else. |
318 if (pending_dowork_count_ > 0) { | 345 if (pending_dowork_count_ > 0) { |
319 return; | 346 return; |
320 } | 347 } |
321 pending_dowork_count_++; | 348 pending_dowork_count_++; |
322 } | 349 } |
323 | 350 |
324 main_task_runner_->PostTask( | 351 main_task_runner_->PostTask( |
325 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, | 352 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, |
326 on_main_thread)); | 353 on_main_thread)); |
327 } | 354 } |
328 | 355 |
329 void TaskQueueManager::DoWork(bool posted_from_main_thread) { | 356 void TaskQueueManager::DoWork(bool posted_from_main_thread) { |
330 if (posted_from_main_thread) { | 357 if (posted_from_main_thread) { |
331 pending_dowork_count_--; | 358 pending_dowork_count_--; |
332 DCHECK_GE(pending_dowork_count_, 0); | 359 DCHECK_GE(pending_dowork_count_, 0); |
333 } | 360 } |
334 main_thread_checker_.CalledOnValidThread(); | 361 main_thread_checker_.CalledOnValidThread(); |
335 if (!UpdateWorkQueues()) | |
336 return; | |
337 | 362 |
338 size_t queue_index; | 363 base::TimeTicks next_pending_delayed_task( |
339 if (!SelectWorkQueueToService(&queue_index)) | 364 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); |
340 return; | 365 for (int i = 0; i < work_batch_size_; i++) { |
341 MaybePostDoWorkOnMainRunner(); | 366 if (!UpdateWorkQueues(&next_pending_delayed_task)) |
342 ProcessTaskFromWorkQueue(queue_index); | 367 return; |
368 | |
369 // Interrupt the work batch if we should run the next delayed task. | |
370 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks && | |
371 Now() >= next_pending_delayed_task) | |
372 return; | |
373 | |
374 size_t queue_index; | |
375 if (!SelectWorkQueueToService(&queue_index)) | |
376 return; | |
377 MaybePostDoWorkOnMainRunner(); | |
rmcilroy
2015/01/21 15:13:09
Could you add a comment that the logic in MaybePos
Sami
2015/01/21 15:30:39
Good idea, done.
| |
378 ProcessTaskFromWorkQueue(queue_index); | |
379 } | |
343 } | 380 } |
344 | 381 |
345 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { | 382 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { |
346 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); | 383 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); |
347 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( | 384 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( |
348 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, | 385 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, |
349 AsValueWithSelectorResult(should_run, *out_queue_index)); | 386 AsValueWithSelectorResult(should_run, *out_queue_index)); |
350 return should_run; | 387 return should_run; |
351 } | 388 } |
352 | 389 |
(...skipping 28 matching lines...) Expand all Loading... | |
381 DCHECK(delay > base::TimeDelta()); | 418 DCHECK(delay > base::TimeDelta()); |
382 return main_task_runner_->PostDelayedTask(from_here, task, delay); | 419 return main_task_runner_->PostDelayedTask(from_here, task, delay); |
383 } | 420 } |
384 | 421 |
385 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { | 422 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { |
386 main_thread_checker_.CalledOnValidThread(); | 423 main_thread_checker_.CalledOnValidThread(); |
387 internal::TaskQueue* queue = Queue(queue_index); | 424 internal::TaskQueue* queue = Queue(queue_index); |
388 queue->set_name(name); | 425 queue->set_name(name); |
389 } | 426 } |
390 | 427 |
428 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { | |
429 main_thread_checker_.CalledOnValidThread(); | |
430 DCHECK_GE(work_batch_size, 1); | |
431 work_batch_size_ = work_batch_size; | |
432 } | |
433 | |
434 void TaskQueueManager::SetTimeSourceForTesting( | |
435 scoped_refptr<cc::TestNowSource> time_source) { | |
436 main_thread_checker_.CalledOnValidThread(); | |
437 time_source_ = time_source; | |
438 } | |
439 | |
440 base::TimeTicks TaskQueueManager::Now() const { | |
441 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now(); | |
442 } | |
443 | |
391 scoped_refptr<base::debug::ConvertableToTraceFormat> | 444 scoped_refptr<base::debug::ConvertableToTraceFormat> |
392 TaskQueueManager::AsValueWithSelectorResult(bool should_run, | 445 TaskQueueManager::AsValueWithSelectorResult(bool should_run, |
393 size_t selected_queue) const { | 446 size_t selected_queue) const { |
394 main_thread_checker_.CalledOnValidThread(); | 447 main_thread_checker_.CalledOnValidThread(); |
395 scoped_refptr<base::debug::TracedValue> state = | 448 scoped_refptr<base::debug::TracedValue> state = |
396 new base::debug::TracedValue(); | 449 new base::debug::TracedValue(); |
397 state->BeginArray("queues"); | 450 state->BeginArray("queues"); |
398 for (auto& queue : queues_) | 451 for (auto& queue : queues_) |
399 queue->AsValueInto(state.get()); | 452 queue->AsValueInto(state.get()); |
400 state->EndArray(); | 453 state->EndArray(); |
401 state->BeginDictionary("selector"); | 454 state->BeginDictionary("selector"); |
402 selector_->AsValueInto(state.get()); | 455 selector_->AsValueInto(state.get()); |
403 state->EndDictionary(); | 456 state->EndDictionary(); |
404 if (should_run) | 457 if (should_run) |
405 state->SetInteger("selected_queue", selected_queue); | 458 state->SetInteger("selected_queue", selected_queue); |
406 return state; | 459 return state; |
407 } | 460 } |
408 | 461 |
409 } // namespace content | 462 } // namespace content |
OLD | NEW |