Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(277)

Side by Side Diff: content/renderer/scheduler/task_queue_manager.cc

Issue 845543004: Run task queue manager work in batches (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: DCHECK tweak. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/scheduler/task_queue_manager.h" 5 #include "content/renderer/scheduler/task_queue_manager.h"
6 6
7 #include <queue>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/debug/trace_event.h"
9 #include "base/debug/trace_event_argument.h"
10 #include "base/trace_event/trace_event.h" 10 #include "base/trace_event/trace_event.h"
11 #include "base/trace_event/trace_event_argument.h" 11 #include "base/trace_event/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
12 #include "content/renderer/scheduler/task_queue_selector.h" 13 #include "content/renderer/scheduler/task_queue_selector.h"
13 14
15 namespace {
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
17 }
18
14 namespace content { 19 namespace content {
15 namespace internal { 20 namespace internal {
16 21
17 class TaskQueue : public base::SingleThreadTaskRunner { 22 class TaskQueue : public base::SingleThreadTaskRunner {
18 public: 23 public:
19 TaskQueue(TaskQueueManager* task_queue_manager); 24 TaskQueue(TaskQueueManager* task_queue_manager);
20 25
21 // base::SingleThreadTaskRunner implementation. 26 // base::SingleThreadTaskRunner implementation.
22 bool RunsTasksOnCurrentThread() const override; 27 bool RunsTasksOnCurrentThread() const override;
23 bool PostDelayedTask(const tracked_objects::Location& from_here, 28 bool PostDelayedTask(const tracked_objects::Location& from_here,
(...skipping 11 matching lines...) Expand all
35 // Adds a task at the end of the incoming task queue and schedules a call to 40 // Adds a task at the end of the incoming task queue and schedules a call to
36 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic 41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
37 // pumping is enabled. Can be called on an arbitrary thread. 42 // pumping is enabled. Can be called on an arbitrary thread.
38 void EnqueueTask(const base::PendingTask& pending_task); 43 void EnqueueTask(const base::PendingTask& pending_task);
39 44
40 bool IsQueueEmpty() const; 45 bool IsQueueEmpty() const;
41 46
42 void SetAutoPump(bool auto_pump); 47 void SetAutoPump(bool auto_pump);
43 void PumpQueue(); 48 void PumpQueue();
44 49
45 bool UpdateWorkQueue(); 50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task);
46 base::PendingTask TakeTaskFromWorkQueue(); 51 base::PendingTask TakeTaskFromWorkQueue();
47 52
48 void WillDeleteTaskQueueManager(); 53 void WillDeleteTaskQueueManager();
49 54
50 base::TaskQueue& work_queue() { return work_queue_; } 55 base::TaskQueue& work_queue() { return work_queue_; }
51 56
52 void set_name(const char* name) { name_ = name; } 57 void set_name(const char* name) { name_ = name; }
53 58
54 void AsValueInto(base::debug::TracedValue* state) const; 59 void AsValueInto(base::debug::TracedValue* state) const;
55 60
(...skipping 13 matching lines...) Expand all
69 base::debug::TracedValue* state); 74 base::debug::TracedValue* state);
70 static void TaskAsValueInto(const base::PendingTask& task, 75 static void TaskAsValueInto(const base::PendingTask& task,
71 base::debug::TracedValue* state); 76 base::debug::TracedValue* state);
72 77
73 // This lock protects all members except the work queue. 78 // This lock protects all members except the work queue.
74 mutable base::Lock lock_; 79 mutable base::Lock lock_;
75 TaskQueueManager* task_queue_manager_; 80 TaskQueueManager* task_queue_manager_;
76 base::TaskQueue incoming_queue_; 81 base::TaskQueue incoming_queue_;
77 bool auto_pump_; 82 bool auto_pump_;
78 const char* name_; 83 const char* name_;
84 std::priority_queue<base::TimeTicks,
85 std::vector<base::TimeTicks>,
86 std::greater<base::TimeTicks>> delayed_task_run_times_;
79 87
80 base::TaskQueue work_queue_; 88 base::TaskQueue work_queue_;
81 89
82 DISALLOW_COPY_AND_ASSIGN(TaskQueue); 90 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
83 }; 91 };
84 92
85 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) 93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager)
86 : task_queue_manager_(task_queue_manager), 94 : task_queue_manager_(task_queue_manager),
87 auto_pump_(true), 95 auto_pump_(true),
88 name_(nullptr) { 96 name_(nullptr) {
(...skipping 19 matching lines...) Expand all
108 base::TimeDelta delay, 116 base::TimeDelta delay,
109 bool nestable) { 117 bool nestable) {
110 base::AutoLock lock(lock_); 118 base::AutoLock lock(lock_);
111 if (!task_queue_manager_) 119 if (!task_queue_manager_)
112 return false; 120 return false;
113 121
114 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); 122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable);
115 task_queue_manager_->DidQueueTask(&pending_task); 123 task_queue_manager_->DidQueueTask(&pending_task);
116 124
117 if (delay > base::TimeDelta()) { 125 if (delay > base::TimeDelta()) {
126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
127 delayed_task_run_times_.push(pending_task.delayed_run_time);
118 return task_queue_manager_->PostDelayedTask( 128 return task_queue_manager_->PostDelayedTask(
119 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); 129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
120 } 130 }
121 EnqueueTaskLocked(pending_task); 131 EnqueueTaskLocked(pending_task);
122 return true; 132 return true;
123 } 133 }
124 134
125 bool TaskQueue::IsQueueEmpty() const { 135 bool TaskQueue::IsQueueEmpty() const {
126 if (!work_queue_.empty()) 136 if (!work_queue_.empty())
127 return false; 137 return false;
128 138
129 { 139 {
130 base::AutoLock lock(lock_); 140 base::AutoLock lock(lock_);
131 return incoming_queue_.empty(); 141 return incoming_queue_.empty();
132 } 142 }
133 } 143 }
134 144
135 bool TaskQueue::UpdateWorkQueue() { 145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) {
136 if (!work_queue_.empty()) 146 if (!work_queue_.empty())
137 return true; 147 return true;
138 148
139 { 149 {
140 base::AutoLock lock(lock_); 150 base::AutoLock lock(lock_);
151 if (!delayed_task_run_times_.empty()) {
152 *next_pending_delayed_task =
153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
154 }
141 if (!auto_pump_ || incoming_queue_.empty()) 155 if (!auto_pump_ || incoming_queue_.empty())
142 return false; 156 return false;
143 work_queue_.Swap(&incoming_queue_); 157 work_queue_.Swap(&incoming_queue_);
144 TraceWorkQueueSize(); 158 TraceWorkQueueSize();
145 return true; 159 return true;
146 } 160 }
147 } 161 }
148 162
149 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { 163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
150 base::PendingTask pending_task = work_queue_.front(); 164 base::PendingTask pending_task = work_queue_.front();
(...skipping 14 matching lines...) Expand all
165 EnqueueTaskLocked(pending_task); 179 EnqueueTaskLocked(pending_task);
166 } 180 }
167 181
168 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { 182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
169 lock_.AssertAcquired(); 183 lock_.AssertAcquired();
170 if (!task_queue_manager_) 184 if (!task_queue_manager_)
171 return; 185 return;
172 if (auto_pump_ && incoming_queue_.empty()) 186 if (auto_pump_ && incoming_queue_.empty())
173 task_queue_manager_->MaybePostDoWorkOnMainRunner(); 187 task_queue_manager_->MaybePostDoWorkOnMainRunner();
174 incoming_queue_.push(pending_task); 188 incoming_queue_.push(pending_task);
189
190 if (!pending_task.delayed_run_time.is_null()) {
191 // Update the time of the next pending delayed task.
192 while (!delayed_task_run_times_.empty() &&
193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
194 delayed_task_run_times_.pop();
195 }
196 // Clear the delayed run time because we've already applied the delay
197 // before getting here.
198 incoming_queue_.back().delayed_run_time = base::TimeTicks();
199 }
175 } 200 }
176 201
177 void TaskQueue::SetAutoPump(bool auto_pump) { 202 void TaskQueue::SetAutoPump(bool auto_pump) {
178 base::AutoLock lock(lock_); 203 base::AutoLock lock(lock_);
179 if (auto_pump) { 204 if (auto_pump) {
180 auto_pump_ = true; 205 auto_pump_ = true;
181 PumpQueueLocked(); 206 PumpQueueLocked();
182 } else { 207 } else {
183 auto_pump_ = false; 208 auto_pump_ = false;
184 } 209 }
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
240 265
241 } // namespace internal 266 } // namespace internal
242 267
243 TaskQueueManager::TaskQueueManager( 268 TaskQueueManager::TaskQueueManager(
244 size_t task_queue_count, 269 size_t task_queue_count,
245 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, 270 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
246 TaskQueueSelector* selector) 271 TaskQueueSelector* selector)
247 : main_task_runner_(main_task_runner), 272 : main_task_runner_(main_task_runner),
248 selector_(selector), 273 selector_(selector),
249 pending_dowork_count_(0), 274 pending_dowork_count_(0),
275 work_batch_size_(1),
276 time_source_(nullptr),
250 weak_factory_(this) { 277 weak_factory_(this) {
251 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); 278 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
252 TRACE_EVENT_OBJECT_CREATED_WITH_ID( 279 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
253 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", 280 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
254 this); 281 this);
255 282
256 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); 283 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr();
257 for (size_t i = 0; i < task_queue_count; i++) { 284 for (size_t i = 0; i < task_queue_count; i++) {
258 scoped_refptr<internal::TaskQueue> queue( 285 scoped_refptr<internal::TaskQueue> queue(
259 make_scoped_refptr(new internal::TaskQueue(this))); 286 make_scoped_refptr(new internal::TaskQueue(this)));
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
294 internal::TaskQueue* queue = Queue(queue_index); 321 internal::TaskQueue* queue = Queue(queue_index);
295 queue->SetAutoPump(auto_pump); 322 queue->SetAutoPump(auto_pump);
296 } 323 }
297 324
298 void TaskQueueManager::PumpQueue(size_t queue_index) { 325 void TaskQueueManager::PumpQueue(size_t queue_index) {
299 main_thread_checker_.CalledOnValidThread(); 326 main_thread_checker_.CalledOnValidThread();
300 internal::TaskQueue* queue = Queue(queue_index); 327 internal::TaskQueue* queue = Queue(queue_index);
301 queue->PumpQueue(); 328 queue->PumpQueue();
302 } 329 }
303 330
304 bool TaskQueueManager::UpdateWorkQueues() { 331 bool TaskQueueManager::UpdateWorkQueues(
332 base::TimeTicks* next_pending_delayed_task) {
305 // TODO(skyostil): This is not efficient when the number of queues grows very 333 // TODO(skyostil): This is not efficient when the number of queues grows very
306 // large due to the number of locks taken. Consider optimizing when we get 334 // large due to the number of locks taken. Consider optimizing when we get
307 // there. 335 // there.
308 main_thread_checker_.CalledOnValidThread(); 336 main_thread_checker_.CalledOnValidThread();
309 bool has_work = false; 337 bool has_work = false;
310 for (auto& queue : queues_) 338 for (auto& queue : queues_) {
311 has_work |= queue->UpdateWorkQueue(); 339 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task);
340 if (!queue->work_queue().empty()) {
341 // Currently we should not be getting tasks with delayed run times in any
342 // of the work queues.
343 DCHECK(queue->work_queue().front().delayed_run_time.is_null());
344 }
345 }
312 return has_work; 346 return has_work;
313 } 347 }
314 348
315 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { 349 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
316 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); 350 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
317 if (on_main_thread) { 351 if (on_main_thread) {
318 // We only want one pending DoWork posted from the main thread, or we risk 352 // We only want one pending DoWork posted from the main thread, or we risk
319 // an explosion of pending DoWorks which could starve out everything else. 353 // an explosion of pending DoWorks which could starve out everything else.
320 if (pending_dowork_count_ > 0) { 354 if (pending_dowork_count_ > 0) {
321 return; 355 return;
322 } 356 }
323 pending_dowork_count_++; 357 pending_dowork_count_++;
324 } 358 }
325 359
326 main_task_runner_->PostTask( 360 main_task_runner_->PostTask(
327 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, 361 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_,
328 on_main_thread)); 362 on_main_thread));
329 } 363 }
330 364
331 void TaskQueueManager::DoWork(bool posted_from_main_thread) { 365 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
332 if (posted_from_main_thread) { 366 if (posted_from_main_thread) {
333 pending_dowork_count_--; 367 pending_dowork_count_--;
334 DCHECK_GE(pending_dowork_count_, 0); 368 DCHECK_GE(pending_dowork_count_, 0);
335 } 369 }
336 main_thread_checker_.CalledOnValidThread(); 370 main_thread_checker_.CalledOnValidThread();
337 if (!UpdateWorkQueues())
338 return;
339 371
340 size_t queue_index; 372 base::TimeTicks next_pending_delayed_task(
341 if (!SelectWorkQueueToService(&queue_index)) 373 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
342 return; 374 for (int i = 0; i < work_batch_size_; i++) {
343 MaybePostDoWorkOnMainRunner(); 375 if (!UpdateWorkQueues(&next_pending_delayed_task))
344 ProcessTaskFromWorkQueue(queue_index); 376 return;
377
378 // Interrupt the work batch if we should run the next delayed task.
379 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
380 Now() >= next_pending_delayed_task)
381 return;
382
383 size_t queue_index;
384 if (!SelectWorkQueueToService(&queue_index))
385 return;
386 // Note that this function won't post another call to DoWork if one is
387 // already pending, so it is safe to call it in a loop.
388 MaybePostDoWorkOnMainRunner();
389 ProcessTaskFromWorkQueue(queue_index);
390 }
345 } 391 }
346 392
347 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { 393 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
348 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); 394 bool should_run = selector_->SelectWorkQueueToService(out_queue_index);
349 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( 395 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
350 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, 396 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
351 AsValueWithSelectorResult(should_run, *out_queue_index)); 397 AsValueWithSelectorResult(should_run, *out_queue_index));
352 return should_run; 398 return should_run;
353 } 399 }
354 400
(...skipping 28 matching lines...) Expand all
383 DCHECK(delay > base::TimeDelta()); 429 DCHECK(delay > base::TimeDelta());
384 return main_task_runner_->PostDelayedTask(from_here, task, delay); 430 return main_task_runner_->PostDelayedTask(from_here, task, delay);
385 } 431 }
386 432
387 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { 433 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
388 main_thread_checker_.CalledOnValidThread(); 434 main_thread_checker_.CalledOnValidThread();
389 internal::TaskQueue* queue = Queue(queue_index); 435 internal::TaskQueue* queue = Queue(queue_index);
390 queue->set_name(name); 436 queue->set_name(name);
391 } 437 }
392 438
439 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
440 main_thread_checker_.CalledOnValidThread();
441 DCHECK_GE(work_batch_size, 1);
442 work_batch_size_ = work_batch_size;
443 }
444
445 void TaskQueueManager::SetTimeSourceForTesting(
446 scoped_refptr<cc::TestNowSource> time_source) {
447 main_thread_checker_.CalledOnValidThread();
448 time_source_ = time_source;
449 }
450
451 base::TimeTicks TaskQueueManager::Now() const {
452 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
453 }
454
393 scoped_refptr<base::debug::ConvertableToTraceFormat> 455 scoped_refptr<base::debug::ConvertableToTraceFormat>
394 TaskQueueManager::AsValueWithSelectorResult(bool should_run, 456 TaskQueueManager::AsValueWithSelectorResult(bool should_run,
395 size_t selected_queue) const { 457 size_t selected_queue) const {
396 main_thread_checker_.CalledOnValidThread(); 458 main_thread_checker_.CalledOnValidThread();
397 scoped_refptr<base::debug::TracedValue> state = 459 scoped_refptr<base::debug::TracedValue> state =
398 new base::debug::TracedValue(); 460 new base::debug::TracedValue();
399 state->BeginArray("queues"); 461 state->BeginArray("queues");
400 for (auto& queue : queues_) 462 for (auto& queue : queues_)
401 queue->AsValueInto(state.get()); 463 queue->AsValueInto(state.get());
402 state->EndArray(); 464 state->EndArray();
403 state->BeginDictionary("selector"); 465 state->BeginDictionary("selector");
404 selector_->AsValueInto(state.get()); 466 selector_->AsValueInto(state.get());
405 state->EndDictionary(); 467 state->EndDictionary();
406 if (should_run) 468 if (should_run)
407 state->SetInteger("selected_queue", selected_queue); 469 state->SetInteger("selected_queue", selected_queue);
408 return state; 470 return state;
409 } 471 }
410 472
411 } // namespace content 473 } // namespace content
OLDNEW
« no previous file with comments | « content/renderer/scheduler/task_queue_manager.h ('k') | content/renderer/scheduler/task_queue_manager_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698