Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(62)

Side by Side Diff: content/renderer/scheduler/task_queue_manager.cc

Issue 894183002: Revert of Run task queue manager work in batches (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/scheduler/task_queue_manager.h" 5 #include "content/renderer/scheduler/task_queue_manager.h"
6 6
7 #include <queue>
8
9 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/debug/trace_event.h"
9 #include "base/debug/trace_event_argument.h"
10 #include "base/trace_event/trace_event.h" 10 #include "base/trace_event/trace_event.h"
11 #include "base/trace_event/trace_event_argument.h" 11 #include "base/trace_event/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
13 #include "content/renderer/scheduler/task_queue_selector.h" 12 #include "content/renderer/scheduler/task_queue_selector.h"
14 13
15 namespace {
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
17 }
18
19 namespace content { 14 namespace content {
20 namespace internal { 15 namespace internal {
21 16
22 class TaskQueue : public base::SingleThreadTaskRunner { 17 class TaskQueue : public base::SingleThreadTaskRunner {
23 public: 18 public:
24 TaskQueue(TaskQueueManager* task_queue_manager); 19 TaskQueue(TaskQueueManager* task_queue_manager);
25 20
26 // base::SingleThreadTaskRunner implementation. 21 // base::SingleThreadTaskRunner implementation.
27 bool RunsTasksOnCurrentThread() const override; 22 bool RunsTasksOnCurrentThread() const override;
28 bool PostDelayedTask(const tracked_objects::Location& from_here, 23 bool PostDelayedTask(const tracked_objects::Location& from_here,
(...skipping 11 matching lines...) Expand all
40 // Adds a task at the end of the incoming task queue and schedules a call to 35 // Adds a task at the end of the incoming task queue and schedules a call to
41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic 36 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
42 // pumping is enabled. Can be called on an arbitrary thread. 37 // pumping is enabled. Can be called on an arbitrary thread.
43 void EnqueueTask(const base::PendingTask& pending_task); 38 void EnqueueTask(const base::PendingTask& pending_task);
44 39
45 bool IsQueueEmpty() const; 40 bool IsQueueEmpty() const;
46 41
47 void SetAutoPump(bool auto_pump); 42 void SetAutoPump(bool auto_pump);
48 void PumpQueue(); 43 void PumpQueue();
49 44
50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task); 45 bool UpdateWorkQueue();
51 base::PendingTask TakeTaskFromWorkQueue(); 46 base::PendingTask TakeTaskFromWorkQueue();
52 47
53 void WillDeleteTaskQueueManager(); 48 void WillDeleteTaskQueueManager();
54 49
55 base::TaskQueue& work_queue() { return work_queue_; } 50 base::TaskQueue& work_queue() { return work_queue_; }
56 51
57 void set_name(const char* name) { name_ = name; } 52 void set_name(const char* name) { name_ = name; }
58 53
59 void AsValueInto(base::debug::TracedValue* state) const; 54 void AsValueInto(base::debug::TracedValue* state) const;
60 55
(...skipping 13 matching lines...) Expand all
74 base::debug::TracedValue* state); 69 base::debug::TracedValue* state);
75 static void TaskAsValueInto(const base::PendingTask& task, 70 static void TaskAsValueInto(const base::PendingTask& task,
76 base::debug::TracedValue* state); 71 base::debug::TracedValue* state);
77 72
78 // This lock protects all members except the work queue. 73 // This lock protects all members except the work queue.
79 mutable base::Lock lock_; 74 mutable base::Lock lock_;
80 TaskQueueManager* task_queue_manager_; 75 TaskQueueManager* task_queue_manager_;
81 base::TaskQueue incoming_queue_; 76 base::TaskQueue incoming_queue_;
82 bool auto_pump_; 77 bool auto_pump_;
83 const char* name_; 78 const char* name_;
84 std::priority_queue<base::TimeTicks,
85 std::vector<base::TimeTicks>,
86 std::greater<base::TimeTicks>> delayed_task_run_times_;
87 79
88 base::TaskQueue work_queue_; 80 base::TaskQueue work_queue_;
89 81
90 DISALLOW_COPY_AND_ASSIGN(TaskQueue); 82 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
91 }; 83 };
92 84
93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) 85 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager)
94 : task_queue_manager_(task_queue_manager), 86 : task_queue_manager_(task_queue_manager),
95 auto_pump_(true), 87 auto_pump_(true),
96 name_(nullptr) { 88 name_(nullptr) {
(...skipping 19 matching lines...) Expand all
116 base::TimeDelta delay, 108 base::TimeDelta delay,
117 bool nestable) { 109 bool nestable) {
118 base::AutoLock lock(lock_); 110 base::AutoLock lock(lock_);
119 if (!task_queue_manager_) 111 if (!task_queue_manager_)
120 return false; 112 return false;
121 113
122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); 114 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable);
123 task_queue_manager_->DidQueueTask(&pending_task); 115 task_queue_manager_->DidQueueTask(&pending_task);
124 116
125 if (delay > base::TimeDelta()) { 117 if (delay > base::TimeDelta()) {
126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
127 delayed_task_run_times_.push(pending_task.delayed_run_time);
128 return task_queue_manager_->PostDelayedTask( 118 return task_queue_manager_->PostDelayedTask(
129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); 119 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
130 } 120 }
131 EnqueueTaskLocked(pending_task); 121 EnqueueTaskLocked(pending_task);
132 return true; 122 return true;
133 } 123 }
134 124
135 bool TaskQueue::IsQueueEmpty() const { 125 bool TaskQueue::IsQueueEmpty() const {
136 if (!work_queue_.empty()) 126 if (!work_queue_.empty())
137 return false; 127 return false;
138 128
139 { 129 {
140 base::AutoLock lock(lock_); 130 base::AutoLock lock(lock_);
141 return incoming_queue_.empty(); 131 return incoming_queue_.empty();
142 } 132 }
143 } 133 }
144 134
145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) { 135 bool TaskQueue::UpdateWorkQueue() {
146 if (!work_queue_.empty()) 136 if (!work_queue_.empty())
147 return true; 137 return true;
148 138
149 { 139 {
150 base::AutoLock lock(lock_); 140 base::AutoLock lock(lock_);
151 if (!delayed_task_run_times_.empty()) {
152 *next_pending_delayed_task =
153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
154 }
155 if (!auto_pump_ || incoming_queue_.empty()) 141 if (!auto_pump_ || incoming_queue_.empty())
156 return false; 142 return false;
157 work_queue_.Swap(&incoming_queue_); 143 work_queue_.Swap(&incoming_queue_);
158 TraceWorkQueueSize(); 144 TraceWorkQueueSize();
159 return true; 145 return true;
160 } 146 }
161 } 147 }
162 148
163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { 149 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
164 base::PendingTask pending_task = work_queue_.front(); 150 base::PendingTask pending_task = work_queue_.front();
(...skipping 14 matching lines...) Expand all
179 EnqueueTaskLocked(pending_task); 165 EnqueueTaskLocked(pending_task);
180 } 166 }
181 167
182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { 168 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
183 lock_.AssertAcquired(); 169 lock_.AssertAcquired();
184 if (!task_queue_manager_) 170 if (!task_queue_manager_)
185 return; 171 return;
186 if (auto_pump_ && incoming_queue_.empty()) 172 if (auto_pump_ && incoming_queue_.empty())
187 task_queue_manager_->MaybePostDoWorkOnMainRunner(); 173 task_queue_manager_->MaybePostDoWorkOnMainRunner();
188 incoming_queue_.push(pending_task); 174 incoming_queue_.push(pending_task);
189
190 if (!pending_task.delayed_run_time.is_null()) {
191 // Update the time of the next pending delayed task.
192 while (!delayed_task_run_times_.empty() &&
193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
194 delayed_task_run_times_.pop();
195 }
196 }
197 } 175 }
198 176
199 void TaskQueue::SetAutoPump(bool auto_pump) { 177 void TaskQueue::SetAutoPump(bool auto_pump) {
200 base::AutoLock lock(lock_); 178 base::AutoLock lock(lock_);
201 if (auto_pump) { 179 if (auto_pump) {
202 auto_pump_ = true; 180 auto_pump_ = true;
203 PumpQueueLocked(); 181 PumpQueueLocked();
204 } else { 182 } else {
205 auto_pump_ = false; 183 auto_pump_ = false;
206 } 184 }
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
262 240
263 } // namespace internal 241 } // namespace internal
264 242
265 TaskQueueManager::TaskQueueManager( 243 TaskQueueManager::TaskQueueManager(
266 size_t task_queue_count, 244 size_t task_queue_count,
267 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, 245 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
268 TaskQueueSelector* selector) 246 TaskQueueSelector* selector)
269 : main_task_runner_(main_task_runner), 247 : main_task_runner_(main_task_runner),
270 selector_(selector), 248 selector_(selector),
271 pending_dowork_count_(0), 249 pending_dowork_count_(0),
272 work_batch_size_(1),
273 time_source_(nullptr),
274 weak_factory_(this) { 250 weak_factory_(this) {
275 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); 251 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
276 TRACE_EVENT_OBJECT_CREATED_WITH_ID( 252 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
277 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", 253 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
278 this); 254 this);
279 255
280 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); 256 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr();
281 for (size_t i = 0; i < task_queue_count; i++) { 257 for (size_t i = 0; i < task_queue_count; i++) {
282 scoped_refptr<internal::TaskQueue> queue( 258 scoped_refptr<internal::TaskQueue> queue(
283 make_scoped_refptr(new internal::TaskQueue(this))); 259 make_scoped_refptr(new internal::TaskQueue(this)));
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
318 internal::TaskQueue* queue = Queue(queue_index); 294 internal::TaskQueue* queue = Queue(queue_index);
319 queue->SetAutoPump(auto_pump); 295 queue->SetAutoPump(auto_pump);
320 } 296 }
321 297
322 void TaskQueueManager::PumpQueue(size_t queue_index) { 298 void TaskQueueManager::PumpQueue(size_t queue_index) {
323 main_thread_checker_.CalledOnValidThread(); 299 main_thread_checker_.CalledOnValidThread();
324 internal::TaskQueue* queue = Queue(queue_index); 300 internal::TaskQueue* queue = Queue(queue_index);
325 queue->PumpQueue(); 301 queue->PumpQueue();
326 } 302 }
327 303
328 bool TaskQueueManager::UpdateWorkQueues( 304 bool TaskQueueManager::UpdateWorkQueues() {
329 base::TimeTicks* next_pending_delayed_task) {
330 // TODO(skyostil): This is not efficient when the number of queues grows very 305 // TODO(skyostil): This is not efficient when the number of queues grows very
331 // large due to the number of locks taken. Consider optimizing when we get 306 // large due to the number of locks taken. Consider optimizing when we get
332 // there. 307 // there.
333 main_thread_checker_.CalledOnValidThread(); 308 main_thread_checker_.CalledOnValidThread();
334 bool has_work = false; 309 bool has_work = false;
335 for (auto& queue : queues_) 310 for (auto& queue : queues_)
336 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task); 311 has_work |= queue->UpdateWorkQueue();
337 return has_work; 312 return has_work;
338 } 313 }
339 314
340 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { 315 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
341 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); 316 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
342 if (on_main_thread) { 317 if (on_main_thread) {
343 // We only want one pending DoWork posted from the main thread, or we risk 318 // We only want one pending DoWork posted from the main thread, or we risk
344 // an explosion of pending DoWorks which could starve out everything else. 319 // an explosion of pending DoWorks which could starve out everything else.
345 if (pending_dowork_count_ > 0) { 320 if (pending_dowork_count_ > 0) {
346 return; 321 return;
347 } 322 }
348 pending_dowork_count_++; 323 pending_dowork_count_++;
349 } 324 }
350 325
351 main_task_runner_->PostTask( 326 main_task_runner_->PostTask(
352 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, 327 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_,
353 on_main_thread)); 328 on_main_thread));
354 } 329 }
355 330
356 void TaskQueueManager::DoWork(bool posted_from_main_thread) { 331 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
357 if (posted_from_main_thread) { 332 if (posted_from_main_thread) {
358 pending_dowork_count_--; 333 pending_dowork_count_--;
359 DCHECK_GE(pending_dowork_count_, 0); 334 DCHECK_GE(pending_dowork_count_, 0);
360 } 335 }
361 main_thread_checker_.CalledOnValidThread(); 336 main_thread_checker_.CalledOnValidThread();
337 if (!UpdateWorkQueues())
338 return;
362 339
363 base::TimeTicks next_pending_delayed_task( 340 size_t queue_index;
364 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); 341 if (!SelectWorkQueueToService(&queue_index))
365 for (int i = 0; i < work_batch_size_; i++) { 342 return;
366 if (!UpdateWorkQueues(&next_pending_delayed_task)) 343 MaybePostDoWorkOnMainRunner();
367 return; 344 ProcessTaskFromWorkQueue(queue_index);
368
369 // Interrupt the work batch if we should run the next delayed task.
370 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
371 Now() >= next_pending_delayed_task)
372 return;
373
374 size_t queue_index;
375 if (!SelectWorkQueueToService(&queue_index))
376 return;
377 // Note that this function won't post another call to DoWork if one is
378 // already pending, so it is safe to call it in a loop.
379 MaybePostDoWorkOnMainRunner();
380 ProcessTaskFromWorkQueue(queue_index);
381 }
382 } 345 }
383 346
384 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { 347 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
385 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); 348 bool should_run = selector_->SelectWorkQueueToService(out_queue_index);
386 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( 349 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
387 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, 350 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
388 AsValueWithSelectorResult(should_run, *out_queue_index)); 351 AsValueWithSelectorResult(should_run, *out_queue_index));
389 return should_run; 352 return should_run;
390 } 353 }
391 354
(...skipping 28 matching lines...) Expand all
420 DCHECK(delay > base::TimeDelta()); 383 DCHECK(delay > base::TimeDelta());
421 return main_task_runner_->PostDelayedTask(from_here, task, delay); 384 return main_task_runner_->PostDelayedTask(from_here, task, delay);
422 } 385 }
423 386
424 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { 387 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
425 main_thread_checker_.CalledOnValidThread(); 388 main_thread_checker_.CalledOnValidThread();
426 internal::TaskQueue* queue = Queue(queue_index); 389 internal::TaskQueue* queue = Queue(queue_index);
427 queue->set_name(name); 390 queue->set_name(name);
428 } 391 }
429 392
430 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
431 main_thread_checker_.CalledOnValidThread();
432 DCHECK_GE(work_batch_size, 1);
433 work_batch_size_ = work_batch_size;
434 }
435
436 void TaskQueueManager::SetTimeSourceForTesting(
437 scoped_refptr<cc::TestNowSource> time_source) {
438 main_thread_checker_.CalledOnValidThread();
439 time_source_ = time_source;
440 }
441
442 base::TimeTicks TaskQueueManager::Now() const {
443 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
444 }
445
446 scoped_refptr<base::debug::ConvertableToTraceFormat> 393 scoped_refptr<base::debug::ConvertableToTraceFormat>
447 TaskQueueManager::AsValueWithSelectorResult(bool should_run, 394 TaskQueueManager::AsValueWithSelectorResult(bool should_run,
448 size_t selected_queue) const { 395 size_t selected_queue) const {
449 main_thread_checker_.CalledOnValidThread(); 396 main_thread_checker_.CalledOnValidThread();
450 scoped_refptr<base::debug::TracedValue> state = 397 scoped_refptr<base::debug::TracedValue> state =
451 new base::debug::TracedValue(); 398 new base::debug::TracedValue();
452 state->BeginArray("queues"); 399 state->BeginArray("queues");
453 for (auto& queue : queues_) 400 for (auto& queue : queues_)
454 queue->AsValueInto(state.get()); 401 queue->AsValueInto(state.get());
455 state->EndArray(); 402 state->EndArray();
456 state->BeginDictionary("selector"); 403 state->BeginDictionary("selector");
457 selector_->AsValueInto(state.get()); 404 selector_->AsValueInto(state.get());
458 state->EndDictionary(); 405 state->EndDictionary();
459 if (should_run) 406 if (should_run)
460 state->SetInteger("selected_queue", selected_queue); 407 state->SetInteger("selected_queue", selected_queue);
461 return state; 408 return state;
462 } 409 }
463 410
464 } // namespace content 411 } // namespace content
OLDNEW
« no previous file with comments | « content/renderer/scheduler/task_queue_manager.h ('k') | content/renderer/scheduler/task_queue_manager_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698