OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/renderer/scheduler/task_queue_manager.h" | 5 #include "content/renderer/scheduler/task_queue_manager.h" |
6 | 6 |
7 #include <queue> | |
8 | |
9 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/debug/trace_event.h" |
| 9 #include "base/debug/trace_event_argument.h" |
10 #include "base/trace_event/trace_event.h" | 10 #include "base/trace_event/trace_event.h" |
11 #include "base/trace_event/trace_event_argument.h" | 11 #include "base/trace_event/trace_event_argument.h" |
12 #include "cc/test/test_now_source.h" | |
13 #include "content/renderer/scheduler/task_queue_selector.h" | 12 #include "content/renderer/scheduler/task_queue_selector.h" |
14 | 13 |
15 namespace { | |
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max(); | |
17 } | |
18 | |
19 namespace content { | 14 namespace content { |
20 namespace internal { | 15 namespace internal { |
21 | 16 |
22 class TaskQueue : public base::SingleThreadTaskRunner { | 17 class TaskQueue : public base::SingleThreadTaskRunner { |
23 public: | 18 public: |
24 TaskQueue(TaskQueueManager* task_queue_manager); | 19 TaskQueue(TaskQueueManager* task_queue_manager); |
25 | 20 |
26 // base::SingleThreadTaskRunner implementation. | 21 // base::SingleThreadTaskRunner implementation. |
27 bool RunsTasksOnCurrentThread() const override; | 22 bool RunsTasksOnCurrentThread() const override; |
28 bool PostDelayedTask(const tracked_objects::Location& from_here, | 23 bool PostDelayedTask(const tracked_objects::Location& from_here, |
(...skipping 11 matching lines...) Expand all Loading... |
40 // Adds a task at the end of the incoming task queue and schedules a call to | 35 // Adds a task at the end of the incoming task queue and schedules a call to |
41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic | 36 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic |
42 // pumping is enabled. Can be called on an arbitrary thread. | 37 // pumping is enabled. Can be called on an arbitrary thread. |
43 void EnqueueTask(const base::PendingTask& pending_task); | 38 void EnqueueTask(const base::PendingTask& pending_task); |
44 | 39 |
45 bool IsQueueEmpty() const; | 40 bool IsQueueEmpty() const; |
46 | 41 |
47 void SetAutoPump(bool auto_pump); | 42 void SetAutoPump(bool auto_pump); |
48 void PumpQueue(); | 43 void PumpQueue(); |
49 | 44 |
50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task); | 45 bool UpdateWorkQueue(); |
51 base::PendingTask TakeTaskFromWorkQueue(); | 46 base::PendingTask TakeTaskFromWorkQueue(); |
52 | 47 |
53 void WillDeleteTaskQueueManager(); | 48 void WillDeleteTaskQueueManager(); |
54 | 49 |
55 base::TaskQueue& work_queue() { return work_queue_; } | 50 base::TaskQueue& work_queue() { return work_queue_; } |
56 | 51 |
57 void set_name(const char* name) { name_ = name; } | 52 void set_name(const char* name) { name_ = name; } |
58 | 53 |
59 void AsValueInto(base::debug::TracedValue* state) const; | 54 void AsValueInto(base::debug::TracedValue* state) const; |
60 | 55 |
(...skipping 13 matching lines...) Expand all Loading... |
74 base::debug::TracedValue* state); | 69 base::debug::TracedValue* state); |
75 static void TaskAsValueInto(const base::PendingTask& task, | 70 static void TaskAsValueInto(const base::PendingTask& task, |
76 base::debug::TracedValue* state); | 71 base::debug::TracedValue* state); |
77 | 72 |
78 // This lock protects all members except the work queue. | 73 // This lock protects all members except the work queue. |
79 mutable base::Lock lock_; | 74 mutable base::Lock lock_; |
80 TaskQueueManager* task_queue_manager_; | 75 TaskQueueManager* task_queue_manager_; |
81 base::TaskQueue incoming_queue_; | 76 base::TaskQueue incoming_queue_; |
82 bool auto_pump_; | 77 bool auto_pump_; |
83 const char* name_; | 78 const char* name_; |
84 std::priority_queue<base::TimeTicks, | |
85 std::vector<base::TimeTicks>, | |
86 std::greater<base::TimeTicks>> delayed_task_run_times_; | |
87 | 79 |
88 base::TaskQueue work_queue_; | 80 base::TaskQueue work_queue_; |
89 | 81 |
90 DISALLOW_COPY_AND_ASSIGN(TaskQueue); | 82 DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
91 }; | 83 }; |
92 | 84 |
93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) | 85 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) |
94 : task_queue_manager_(task_queue_manager), | 86 : task_queue_manager_(task_queue_manager), |
95 auto_pump_(true), | 87 auto_pump_(true), |
96 name_(nullptr) { | 88 name_(nullptr) { |
(...skipping 19 matching lines...) Expand all Loading... |
116 base::TimeDelta delay, | 108 base::TimeDelta delay, |
117 bool nestable) { | 109 bool nestable) { |
118 base::AutoLock lock(lock_); | 110 base::AutoLock lock(lock_); |
119 if (!task_queue_manager_) | 111 if (!task_queue_manager_) |
120 return false; | 112 return false; |
121 | 113 |
122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); | 114 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); |
123 task_queue_manager_->DidQueueTask(&pending_task); | 115 task_queue_manager_->DidQueueTask(&pending_task); |
124 | 116 |
125 if (delay > base::TimeDelta()) { | 117 if (delay > base::TimeDelta()) { |
126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay; | |
127 delayed_task_run_times_.push(pending_task.delayed_run_time); | |
128 return task_queue_manager_->PostDelayedTask( | 118 return task_queue_manager_->PostDelayedTask( |
129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); | 119 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); |
130 } | 120 } |
131 EnqueueTaskLocked(pending_task); | 121 EnqueueTaskLocked(pending_task); |
132 return true; | 122 return true; |
133 } | 123 } |
134 | 124 |
135 bool TaskQueue::IsQueueEmpty() const { | 125 bool TaskQueue::IsQueueEmpty() const { |
136 if (!work_queue_.empty()) | 126 if (!work_queue_.empty()) |
137 return false; | 127 return false; |
138 | 128 |
139 { | 129 { |
140 base::AutoLock lock(lock_); | 130 base::AutoLock lock(lock_); |
141 return incoming_queue_.empty(); | 131 return incoming_queue_.empty(); |
142 } | 132 } |
143 } | 133 } |
144 | 134 |
145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) { | 135 bool TaskQueue::UpdateWorkQueue() { |
146 if (!work_queue_.empty()) | 136 if (!work_queue_.empty()) |
147 return true; | 137 return true; |
148 | 138 |
149 { | 139 { |
150 base::AutoLock lock(lock_); | 140 base::AutoLock lock(lock_); |
151 if (!delayed_task_run_times_.empty()) { | |
152 *next_pending_delayed_task = | |
153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top()); | |
154 } | |
155 if (!auto_pump_ || incoming_queue_.empty()) | 141 if (!auto_pump_ || incoming_queue_.empty()) |
156 return false; | 142 return false; |
157 work_queue_.Swap(&incoming_queue_); | 143 work_queue_.Swap(&incoming_queue_); |
158 TraceWorkQueueSize(); | 144 TraceWorkQueueSize(); |
159 return true; | 145 return true; |
160 } | 146 } |
161 } | 147 } |
162 | 148 |
163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { | 149 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { |
164 base::PendingTask pending_task = work_queue_.front(); | 150 base::PendingTask pending_task = work_queue_.front(); |
(...skipping 14 matching lines...) Expand all Loading... |
179 EnqueueTaskLocked(pending_task); | 165 EnqueueTaskLocked(pending_task); |
180 } | 166 } |
181 | 167 |
182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { | 168 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
183 lock_.AssertAcquired(); | 169 lock_.AssertAcquired(); |
184 if (!task_queue_manager_) | 170 if (!task_queue_manager_) |
185 return; | 171 return; |
186 if (auto_pump_ && incoming_queue_.empty()) | 172 if (auto_pump_ && incoming_queue_.empty()) |
187 task_queue_manager_->MaybePostDoWorkOnMainRunner(); | 173 task_queue_manager_->MaybePostDoWorkOnMainRunner(); |
188 incoming_queue_.push(pending_task); | 174 incoming_queue_.push(pending_task); |
189 | |
190 if (!pending_task.delayed_run_time.is_null()) { | |
191 // Update the time of the next pending delayed task. | |
192 while (!delayed_task_run_times_.empty() && | |
193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) { | |
194 delayed_task_run_times_.pop(); | |
195 } | |
196 } | |
197 } | 175 } |
198 | 176 |
199 void TaskQueue::SetAutoPump(bool auto_pump) { | 177 void TaskQueue::SetAutoPump(bool auto_pump) { |
200 base::AutoLock lock(lock_); | 178 base::AutoLock lock(lock_); |
201 if (auto_pump) { | 179 if (auto_pump) { |
202 auto_pump_ = true; | 180 auto_pump_ = true; |
203 PumpQueueLocked(); | 181 PumpQueueLocked(); |
204 } else { | 182 } else { |
205 auto_pump_ = false; | 183 auto_pump_ = false; |
206 } | 184 } |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
262 | 240 |
263 } // namespace internal | 241 } // namespace internal |
264 | 242 |
265 TaskQueueManager::TaskQueueManager( | 243 TaskQueueManager::TaskQueueManager( |
266 size_t task_queue_count, | 244 size_t task_queue_count, |
267 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | 245 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, |
268 TaskQueueSelector* selector) | 246 TaskQueueSelector* selector) |
269 : main_task_runner_(main_task_runner), | 247 : main_task_runner_(main_task_runner), |
270 selector_(selector), | 248 selector_(selector), |
271 pending_dowork_count_(0), | 249 pending_dowork_count_(0), |
272 work_batch_size_(1), | |
273 time_source_(nullptr), | |
274 weak_factory_(this) { | 250 weak_factory_(this) { |
275 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); | 251 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); |
276 TRACE_EVENT_OBJECT_CREATED_WITH_ID( | 252 TRACE_EVENT_OBJECT_CREATED_WITH_ID( |
277 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", | 253 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", |
278 this); | 254 this); |
279 | 255 |
280 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); | 256 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); |
281 for (size_t i = 0; i < task_queue_count; i++) { | 257 for (size_t i = 0; i < task_queue_count; i++) { |
282 scoped_refptr<internal::TaskQueue> queue( | 258 scoped_refptr<internal::TaskQueue> queue( |
283 make_scoped_refptr(new internal::TaskQueue(this))); | 259 make_scoped_refptr(new internal::TaskQueue(this))); |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
318 internal::TaskQueue* queue = Queue(queue_index); | 294 internal::TaskQueue* queue = Queue(queue_index); |
319 queue->SetAutoPump(auto_pump); | 295 queue->SetAutoPump(auto_pump); |
320 } | 296 } |
321 | 297 |
322 void TaskQueueManager::PumpQueue(size_t queue_index) { | 298 void TaskQueueManager::PumpQueue(size_t queue_index) { |
323 main_thread_checker_.CalledOnValidThread(); | 299 main_thread_checker_.CalledOnValidThread(); |
324 internal::TaskQueue* queue = Queue(queue_index); | 300 internal::TaskQueue* queue = Queue(queue_index); |
325 queue->PumpQueue(); | 301 queue->PumpQueue(); |
326 } | 302 } |
327 | 303 |
328 bool TaskQueueManager::UpdateWorkQueues( | 304 bool TaskQueueManager::UpdateWorkQueues() { |
329 base::TimeTicks* next_pending_delayed_task) { | |
330 // TODO(skyostil): This is not efficient when the number of queues grows very | 305 // TODO(skyostil): This is not efficient when the number of queues grows very |
331 // large due to the number of locks taken. Consider optimizing when we get | 306 // large due to the number of locks taken. Consider optimizing when we get |
332 // there. | 307 // there. |
333 main_thread_checker_.CalledOnValidThread(); | 308 main_thread_checker_.CalledOnValidThread(); |
334 bool has_work = false; | 309 bool has_work = false; |
335 for (auto& queue : queues_) | 310 for (auto& queue : queues_) |
336 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task); | 311 has_work |= queue->UpdateWorkQueue(); |
337 return has_work; | 312 return has_work; |
338 } | 313 } |
339 | 314 |
340 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { | 315 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { |
341 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); | 316 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); |
342 if (on_main_thread) { | 317 if (on_main_thread) { |
343 // We only want one pending DoWork posted from the main thread, or we risk | 318 // We only want one pending DoWork posted from the main thread, or we risk |
344 // an explosion of pending DoWorks which could starve out everything else. | 319 // an explosion of pending DoWorks which could starve out everything else. |
345 if (pending_dowork_count_ > 0) { | 320 if (pending_dowork_count_ > 0) { |
346 return; | 321 return; |
347 } | 322 } |
348 pending_dowork_count_++; | 323 pending_dowork_count_++; |
349 } | 324 } |
350 | 325 |
351 main_task_runner_->PostTask( | 326 main_task_runner_->PostTask( |
352 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, | 327 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, |
353 on_main_thread)); | 328 on_main_thread)); |
354 } | 329 } |
355 | 330 |
356 void TaskQueueManager::DoWork(bool posted_from_main_thread) { | 331 void TaskQueueManager::DoWork(bool posted_from_main_thread) { |
357 if (posted_from_main_thread) { | 332 if (posted_from_main_thread) { |
358 pending_dowork_count_--; | 333 pending_dowork_count_--; |
359 DCHECK_GE(pending_dowork_count_, 0); | 334 DCHECK_GE(pending_dowork_count_, 0); |
360 } | 335 } |
361 main_thread_checker_.CalledOnValidThread(); | 336 main_thread_checker_.CalledOnValidThread(); |
| 337 if (!UpdateWorkQueues()) |
| 338 return; |
362 | 339 |
363 base::TimeTicks next_pending_delayed_task( | 340 size_t queue_index; |
364 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); | 341 if (!SelectWorkQueueToService(&queue_index)) |
365 for (int i = 0; i < work_batch_size_; i++) { | 342 return; |
366 if (!UpdateWorkQueues(&next_pending_delayed_task)) | 343 MaybePostDoWorkOnMainRunner(); |
367 return; | 344 ProcessTaskFromWorkQueue(queue_index); |
368 | |
369 // Interrupt the work batch if we should run the next delayed task. | |
370 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks && | |
371 Now() >= next_pending_delayed_task) | |
372 return; | |
373 | |
374 size_t queue_index; | |
375 if (!SelectWorkQueueToService(&queue_index)) | |
376 return; | |
377 // Note that this function won't post another call to DoWork if one is | |
378 // already pending, so it is safe to call it in a loop. | |
379 MaybePostDoWorkOnMainRunner(); | |
380 ProcessTaskFromWorkQueue(queue_index); | |
381 } | |
382 } | 345 } |
383 | 346 |
384 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { | 347 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { |
385 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); | 348 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); |
386 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( | 349 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( |
387 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, | 350 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, |
388 AsValueWithSelectorResult(should_run, *out_queue_index)); | 351 AsValueWithSelectorResult(should_run, *out_queue_index)); |
389 return should_run; | 352 return should_run; |
390 } | 353 } |
391 | 354 |
(...skipping 28 matching lines...) Expand all Loading... |
420 DCHECK(delay > base::TimeDelta()); | 383 DCHECK(delay > base::TimeDelta()); |
421 return main_task_runner_->PostDelayedTask(from_here, task, delay); | 384 return main_task_runner_->PostDelayedTask(from_here, task, delay); |
422 } | 385 } |
423 | 386 |
424 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { | 387 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { |
425 main_thread_checker_.CalledOnValidThread(); | 388 main_thread_checker_.CalledOnValidThread(); |
426 internal::TaskQueue* queue = Queue(queue_index); | 389 internal::TaskQueue* queue = Queue(queue_index); |
427 queue->set_name(name); | 390 queue->set_name(name); |
428 } | 391 } |
429 | 392 |
430 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { | |
431 main_thread_checker_.CalledOnValidThread(); | |
432 DCHECK_GE(work_batch_size, 1); | |
433 work_batch_size_ = work_batch_size; | |
434 } | |
435 | |
436 void TaskQueueManager::SetTimeSourceForTesting( | |
437 scoped_refptr<cc::TestNowSource> time_source) { | |
438 main_thread_checker_.CalledOnValidThread(); | |
439 time_source_ = time_source; | |
440 } | |
441 | |
442 base::TimeTicks TaskQueueManager::Now() const { | |
443 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now(); | |
444 } | |
445 | |
446 scoped_refptr<base::debug::ConvertableToTraceFormat> | 393 scoped_refptr<base::debug::ConvertableToTraceFormat> |
447 TaskQueueManager::AsValueWithSelectorResult(bool should_run, | 394 TaskQueueManager::AsValueWithSelectorResult(bool should_run, |
448 size_t selected_queue) const { | 395 size_t selected_queue) const { |
449 main_thread_checker_.CalledOnValidThread(); | 396 main_thread_checker_.CalledOnValidThread(); |
450 scoped_refptr<base::debug::TracedValue> state = | 397 scoped_refptr<base::debug::TracedValue> state = |
451 new base::debug::TracedValue(); | 398 new base::debug::TracedValue(); |
452 state->BeginArray("queues"); | 399 state->BeginArray("queues"); |
453 for (auto& queue : queues_) | 400 for (auto& queue : queues_) |
454 queue->AsValueInto(state.get()); | 401 queue->AsValueInto(state.get()); |
455 state->EndArray(); | 402 state->EndArray(); |
456 state->BeginDictionary("selector"); | 403 state->BeginDictionary("selector"); |
457 selector_->AsValueInto(state.get()); | 404 selector_->AsValueInto(state.get()); |
458 state->EndDictionary(); | 405 state->EndDictionary(); |
459 if (should_run) | 406 if (should_run) |
460 state->SetInteger("selected_queue", selected_queue); | 407 state->SetInteger("selected_queue", selected_queue); |
461 return state; | 408 return state; |
462 } | 409 } |
463 | 410 |
464 } // namespace content | 411 } // namespace content |
OLD | NEW |