Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(181)

Side by Side Diff: content/renderer/scheduler/task_queue_manager.cc

Issue 845543004: Run task queue manager work in batches (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Review comments. Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/scheduler/task_queue_manager.h" 5 #include "content/renderer/scheduler/task_queue_manager.h"
6 6
7 #include <queue>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/debug/trace_event.h" 10 #include "base/debug/trace_event.h"
9 #include "base/debug/trace_event_argument.h" 11 #include "base/debug/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
10 #include "content/renderer/scheduler/task_queue_selector.h" 13 #include "content/renderer/scheduler/task_queue_selector.h"
11 14
15 namespace {
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
17 }
18
12 namespace content { 19 namespace content {
13 namespace internal { 20 namespace internal {
14 21
15 class TaskQueue : public base::SingleThreadTaskRunner { 22 class TaskQueue : public base::SingleThreadTaskRunner {
16 public: 23 public:
17 TaskQueue(TaskQueueManager* task_queue_manager); 24 TaskQueue(TaskQueueManager* task_queue_manager);
18 25
19 // base::SingleThreadTaskRunner implementation. 26 // base::SingleThreadTaskRunner implementation.
20 bool RunsTasksOnCurrentThread() const override; 27 bool RunsTasksOnCurrentThread() const override;
21 bool PostDelayedTask(const tracked_objects::Location& from_here, 28 bool PostDelayedTask(const tracked_objects::Location& from_here,
(...skipping 11 matching lines...) Expand all
33 // Adds a task at the end of the incoming task queue and schedules a call to 40 // Adds a task at the end of the incoming task queue and schedules a call to
34 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic 41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
35 // pumping is enabled. Can be called on an arbitrary thread. 42 // pumping is enabled. Can be called on an arbitrary thread.
36 void EnqueueTask(const base::PendingTask& pending_task); 43 void EnqueueTask(const base::PendingTask& pending_task);
37 44
38 bool IsQueueEmpty() const; 45 bool IsQueueEmpty() const;
39 46
40 void SetAutoPump(bool auto_pump); 47 void SetAutoPump(bool auto_pump);
41 void PumpQueue(); 48 void PumpQueue();
42 49
43 bool UpdateWorkQueue(); 50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task);
44 base::PendingTask TakeTaskFromWorkQueue(); 51 base::PendingTask TakeTaskFromWorkQueue();
45 52
46 void WillDeleteTaskQueueManager(); 53 void WillDeleteTaskQueueManager();
47 54
48 base::TaskQueue& work_queue() { return work_queue_; } 55 base::TaskQueue& work_queue() { return work_queue_; }
49 56
50 void set_name(const char* name) { name_ = name; } 57 void set_name(const char* name) { name_ = name; }
51 58
52 void AsValueInto(base::debug::TracedValue* state) const; 59 void AsValueInto(base::debug::TracedValue* state) const;
53 60
(...skipping 13 matching lines...) Expand all
67 base::debug::TracedValue* state); 74 base::debug::TracedValue* state);
68 static void TaskAsValueInto(const base::PendingTask& task, 75 static void TaskAsValueInto(const base::PendingTask& task,
69 base::debug::TracedValue* state); 76 base::debug::TracedValue* state);
70 77
71 // This lock protects all members except the work queue. 78 // This lock protects all members except the work queue.
72 mutable base::Lock lock_; 79 mutable base::Lock lock_;
73 TaskQueueManager* task_queue_manager_; 80 TaskQueueManager* task_queue_manager_;
74 base::TaskQueue incoming_queue_; 81 base::TaskQueue incoming_queue_;
75 bool auto_pump_; 82 bool auto_pump_;
76 const char* name_; 83 const char* name_;
84 std::priority_queue<base::TimeTicks,
85 std::vector<base::TimeTicks>,
86 std::greater<base::TimeTicks>> delayed_task_run_times_;
77 87
78 base::TaskQueue work_queue_; 88 base::TaskQueue work_queue_;
79 89
80 DISALLOW_COPY_AND_ASSIGN(TaskQueue); 90 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
81 }; 91 };
82 92
83 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) 93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager)
84 : task_queue_manager_(task_queue_manager), 94 : task_queue_manager_(task_queue_manager),
85 auto_pump_(true), 95 auto_pump_(true),
86 name_(nullptr) { 96 name_(nullptr) {
(...skipping 19 matching lines...) Expand all
106 base::TimeDelta delay, 116 base::TimeDelta delay,
107 bool nestable) { 117 bool nestable) {
108 base::AutoLock lock(lock_); 118 base::AutoLock lock(lock_);
109 if (!task_queue_manager_) 119 if (!task_queue_manager_)
110 return false; 120 return false;
111 121
112 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); 122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable);
113 task_queue_manager_->DidQueueTask(&pending_task); 123 task_queue_manager_->DidQueueTask(&pending_task);
114 124
115 if (delay > base::TimeDelta()) { 125 if (delay > base::TimeDelta()) {
126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
127 delayed_task_run_times_.push(pending_task.delayed_run_time);
116 return task_queue_manager_->PostDelayedTask( 128 return task_queue_manager_->PostDelayedTask(
117 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); 129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
118 } 130 }
119 EnqueueTaskLocked(pending_task); 131 EnqueueTaskLocked(pending_task);
120 return true; 132 return true;
121 } 133 }
122 134
123 bool TaskQueue::IsQueueEmpty() const { 135 bool TaskQueue::IsQueueEmpty() const {
124 if (!work_queue_.empty()) 136 if (!work_queue_.empty())
125 return false; 137 return false;
126 138
127 { 139 {
128 base::AutoLock lock(lock_); 140 base::AutoLock lock(lock_);
129 return incoming_queue_.empty(); 141 return incoming_queue_.empty();
130 } 142 }
131 } 143 }
132 144
133 bool TaskQueue::UpdateWorkQueue() { 145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) {
134 if (!work_queue_.empty()) 146 if (!work_queue_.empty())
135 return true; 147 return true;
136 148
137 { 149 {
138 base::AutoLock lock(lock_); 150 base::AutoLock lock(lock_);
151 if (!delayed_task_run_times_.empty()) {
152 *next_pending_delayed_task =
153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
154 }
139 if (!auto_pump_ || incoming_queue_.empty()) 155 if (!auto_pump_ || incoming_queue_.empty())
140 return false; 156 return false;
141 work_queue_.Swap(&incoming_queue_); 157 work_queue_.Swap(&incoming_queue_);
142 TraceWorkQueueSize(); 158 TraceWorkQueueSize();
143 return true; 159 return true;
144 } 160 }
145 } 161 }
146 162
147 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { 163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
148 base::PendingTask pending_task = work_queue_.front(); 164 base::PendingTask pending_task = work_queue_.front();
(...skipping 14 matching lines...) Expand all
163 EnqueueTaskLocked(pending_task); 179 EnqueueTaskLocked(pending_task);
164 } 180 }
165 181
166 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { 182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
167 lock_.AssertAcquired(); 183 lock_.AssertAcquired();
168 if (!task_queue_manager_) 184 if (!task_queue_manager_)
169 return; 185 return;
170 if (auto_pump_ && incoming_queue_.empty()) 186 if (auto_pump_ && incoming_queue_.empty())
171 task_queue_manager_->MaybePostDoWorkOnMainRunner(); 187 task_queue_manager_->MaybePostDoWorkOnMainRunner();
172 incoming_queue_.push(pending_task); 188 incoming_queue_.push(pending_task);
189
190 if (!pending_task.delayed_run_time.is_null()) {
191 // Update the time of the next pending delayed task.
192 while (!delayed_task_run_times_.empty() &&
193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
194 delayed_task_run_times_.pop();
195 }
196 }
173 } 197 }
174 198
175 void TaskQueue::SetAutoPump(bool auto_pump) { 199 void TaskQueue::SetAutoPump(bool auto_pump) {
176 base::AutoLock lock(lock_); 200 base::AutoLock lock(lock_);
177 if (auto_pump) { 201 if (auto_pump) {
178 auto_pump_ = true; 202 auto_pump_ = true;
179 PumpQueueLocked(); 203 PumpQueueLocked();
180 } else { 204 } else {
181 auto_pump_ = false; 205 auto_pump_ = false;
182 } 206 }
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 262
239 } // namespace internal 263 } // namespace internal
240 264
241 TaskQueueManager::TaskQueueManager( 265 TaskQueueManager::TaskQueueManager(
242 size_t task_queue_count, 266 size_t task_queue_count,
243 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, 267 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
244 TaskQueueSelector* selector) 268 TaskQueueSelector* selector)
245 : main_task_runner_(main_task_runner), 269 : main_task_runner_(main_task_runner),
246 selector_(selector), 270 selector_(selector),
247 pending_dowork_count_(0), 271 pending_dowork_count_(0),
272 work_batch_size_(1),
273 time_source_(nullptr),
248 weak_factory_(this) { 274 weak_factory_(this) {
249 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); 275 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
250 TRACE_EVENT_OBJECT_CREATED_WITH_ID( 276 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
251 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", 277 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
252 this); 278 this);
253 279
254 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); 280 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr();
255 for (size_t i = 0; i < task_queue_count; i++) { 281 for (size_t i = 0; i < task_queue_count; i++) {
256 scoped_refptr<internal::TaskQueue> queue( 282 scoped_refptr<internal::TaskQueue> queue(
257 make_scoped_refptr(new internal::TaskQueue(this))); 283 make_scoped_refptr(new internal::TaskQueue(this)));
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 internal::TaskQueue* queue = Queue(queue_index); 318 internal::TaskQueue* queue = Queue(queue_index);
293 queue->SetAutoPump(auto_pump); 319 queue->SetAutoPump(auto_pump);
294 } 320 }
295 321
296 void TaskQueueManager::PumpQueue(size_t queue_index) { 322 void TaskQueueManager::PumpQueue(size_t queue_index) {
297 main_thread_checker_.CalledOnValidThread(); 323 main_thread_checker_.CalledOnValidThread();
298 internal::TaskQueue* queue = Queue(queue_index); 324 internal::TaskQueue* queue = Queue(queue_index);
299 queue->PumpQueue(); 325 queue->PumpQueue();
300 } 326 }
301 327
302 bool TaskQueueManager::UpdateWorkQueues() { 328 bool TaskQueueManager::UpdateWorkQueues(
329 base::TimeTicks* next_pending_delayed_task) {
303 // TODO(skyostil): This is not efficient when the number of queues grows very 330 // TODO(skyostil): This is not efficient when the number of queues grows very
304 // large due to the number of locks taken. Consider optimizing when we get 331 // large due to the number of locks taken. Consider optimizing when we get
305 // there. 332 // there.
306 main_thread_checker_.CalledOnValidThread(); 333 main_thread_checker_.CalledOnValidThread();
307 bool has_work = false; 334 bool has_work = false;
308 for (auto& queue : queues_) 335 for (auto& queue : queues_)
309 has_work |= queue->UpdateWorkQueue(); 336 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task);
310 return has_work; 337 return has_work;
311 } 338 }
312 339
313 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { 340 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
314 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); 341 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
315 if (on_main_thread) { 342 if (on_main_thread) {
316 // We only want one pending DoWork posted from the main thread, or we risk 343 // We only want one pending DoWork posted from the main thread, or we risk
317 // an explosion of pending DoWorks which could starve out everything else. 344 // an explosion of pending DoWorks which could starve out everything else.
318 if (pending_dowork_count_ > 0) { 345 if (pending_dowork_count_ > 0) {
319 return; 346 return;
320 } 347 }
321 pending_dowork_count_++; 348 pending_dowork_count_++;
322 } 349 }
323 350
324 main_task_runner_->PostTask( 351 main_task_runner_->PostTask(
325 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, 352 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_,
326 on_main_thread)); 353 on_main_thread));
327 } 354 }
328 355
329 void TaskQueueManager::DoWork(bool posted_from_main_thread) { 356 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
330 if (posted_from_main_thread) { 357 if (posted_from_main_thread) {
331 pending_dowork_count_--; 358 pending_dowork_count_--;
332 DCHECK_GE(pending_dowork_count_, 0); 359 DCHECK_GE(pending_dowork_count_, 0);
333 } 360 }
334 main_thread_checker_.CalledOnValidThread(); 361 main_thread_checker_.CalledOnValidThread();
335 if (!UpdateWorkQueues())
336 return;
337 362
338 size_t queue_index; 363 base::TimeTicks next_pending_delayed_task(
339 if (!SelectWorkQueueToService(&queue_index)) 364 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
340 return; 365 for (int i = 0; i < work_batch_size_; i++) {
341 MaybePostDoWorkOnMainRunner(); 366 if (!UpdateWorkQueues(&next_pending_delayed_task))
342 ProcessTaskFromWorkQueue(queue_index); 367 return;
368
369 // Interrupt the work batch if we should run the next delayed task.
370 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
371 Now() >= next_pending_delayed_task)
372 return;
373
374 size_t queue_index;
375 if (!SelectWorkQueueToService(&queue_index))
376 return;
377 MaybePostDoWorkOnMainRunner();
rmcilroy 2015/01/21 15:13:09 Could you add a comment that the logic in MaybePos
Sami 2015/01/21 15:30:39 Good idea, done.
378 ProcessTaskFromWorkQueue(queue_index);
379 }
343 } 380 }
344 381
345 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { 382 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
346 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); 383 bool should_run = selector_->SelectWorkQueueToService(out_queue_index);
347 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( 384 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
348 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, 385 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
349 AsValueWithSelectorResult(should_run, *out_queue_index)); 386 AsValueWithSelectorResult(should_run, *out_queue_index));
350 return should_run; 387 return should_run;
351 } 388 }
352 389
(...skipping 28 matching lines...) Expand all
381 DCHECK(delay > base::TimeDelta()); 418 DCHECK(delay > base::TimeDelta());
382 return main_task_runner_->PostDelayedTask(from_here, task, delay); 419 return main_task_runner_->PostDelayedTask(from_here, task, delay);
383 } 420 }
384 421
385 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { 422 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
386 main_thread_checker_.CalledOnValidThread(); 423 main_thread_checker_.CalledOnValidThread();
387 internal::TaskQueue* queue = Queue(queue_index); 424 internal::TaskQueue* queue = Queue(queue_index);
388 queue->set_name(name); 425 queue->set_name(name);
389 } 426 }
390 427
428 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
429 main_thread_checker_.CalledOnValidThread();
430 DCHECK_GE(work_batch_size, 1);
431 work_batch_size_ = work_batch_size;
432 }
433
434 void TaskQueueManager::SetTimeSourceForTesting(
435 scoped_refptr<cc::TestNowSource> time_source) {
436 main_thread_checker_.CalledOnValidThread();
437 time_source_ = time_source;
438 }
439
440 base::TimeTicks TaskQueueManager::Now() const {
441 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
442 }
443
391 scoped_refptr<base::debug::ConvertableToTraceFormat> 444 scoped_refptr<base::debug::ConvertableToTraceFormat>
392 TaskQueueManager::AsValueWithSelectorResult(bool should_run, 445 TaskQueueManager::AsValueWithSelectorResult(bool should_run,
393 size_t selected_queue) const { 446 size_t selected_queue) const {
394 main_thread_checker_.CalledOnValidThread(); 447 main_thread_checker_.CalledOnValidThread();
395 scoped_refptr<base::debug::TracedValue> state = 448 scoped_refptr<base::debug::TracedValue> state =
396 new base::debug::TracedValue(); 449 new base::debug::TracedValue();
397 state->BeginArray("queues"); 450 state->BeginArray("queues");
398 for (auto& queue : queues_) 451 for (auto& queue : queues_)
399 queue->AsValueInto(state.get()); 452 queue->AsValueInto(state.get());
400 state->EndArray(); 453 state->EndArray();
401 state->BeginDictionary("selector"); 454 state->BeginDictionary("selector");
402 selector_->AsValueInto(state.get()); 455 selector_->AsValueInto(state.get());
403 state->EndDictionary(); 456 state->EndDictionary();
404 if (should_run) 457 if (should_run)
405 state->SetInteger("selected_queue", selected_queue); 458 state->SetInteger("selected_queue", selected_queue);
406 return state; 459 return state;
407 } 460 }
408 461
409 } // namespace content 462 } // namespace content
OLDNEW
« no previous file with comments | « content/renderer/scheduler/task_queue_manager.h ('k') | content/renderer/scheduler/task_queue_manager_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698