Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(28)

Side by Side Diff: content/renderer/scheduler/task_queue_manager.cc

Issue 867353002: Revert of Run task queue manager work in batches (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/scheduler/task_queue_manager.h" 5 #include "content/renderer/scheduler/task_queue_manager.h"
6 6
7 #include <queue>
8
9 #include "base/bind.h" 7 #include "base/bind.h"
10 #include "base/debug/trace_event.h" 8 #include "base/debug/trace_event.h"
11 #include "base/debug/trace_event_argument.h" 9 #include "base/debug/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
13 #include "content/renderer/scheduler/task_queue_selector.h" 10 #include "content/renderer/scheduler/task_queue_selector.h"
14 11
15 namespace {
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
17 }
18
19 namespace content { 12 namespace content {
20 namespace internal { 13 namespace internal {
21 14
22 class TaskQueue : public base::SingleThreadTaskRunner { 15 class TaskQueue : public base::SingleThreadTaskRunner {
23 public: 16 public:
24 TaskQueue(TaskQueueManager* task_queue_manager); 17 TaskQueue(TaskQueueManager* task_queue_manager);
25 18
26 // base::SingleThreadTaskRunner implementation. 19 // base::SingleThreadTaskRunner implementation.
27 bool RunsTasksOnCurrentThread() const override; 20 bool RunsTasksOnCurrentThread() const override;
28 bool PostDelayedTask(const tracked_objects::Location& from_here, 21 bool PostDelayedTask(const tracked_objects::Location& from_here,
(...skipping 11 matching lines...) Expand all
40 // Adds a task at the end of the incoming task queue and schedules a call to 33 // Adds a task at the end of the incoming task queue and schedules a call to
41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic 34 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
42 // pumping is enabled. Can be called on an arbitrary thread. 35 // pumping is enabled. Can be called on an arbitrary thread.
43 void EnqueueTask(const base::PendingTask& pending_task); 36 void EnqueueTask(const base::PendingTask& pending_task);
44 37
45 bool IsQueueEmpty() const; 38 bool IsQueueEmpty() const;
46 39
47 void SetAutoPump(bool auto_pump); 40 void SetAutoPump(bool auto_pump);
48 void PumpQueue(); 41 void PumpQueue();
49 42
50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task); 43 bool UpdateWorkQueue();
51 base::PendingTask TakeTaskFromWorkQueue(); 44 base::PendingTask TakeTaskFromWorkQueue();
52 45
53 void WillDeleteTaskQueueManager(); 46 void WillDeleteTaskQueueManager();
54 47
55 base::TaskQueue& work_queue() { return work_queue_; } 48 base::TaskQueue& work_queue() { return work_queue_; }
56 49
57 void set_name(const char* name) { name_ = name; } 50 void set_name(const char* name) { name_ = name; }
58 51
59 void AsValueInto(base::debug::TracedValue* state) const; 52 void AsValueInto(base::debug::TracedValue* state) const;
60 53
(...skipping 13 matching lines...) Expand all
74 base::debug::TracedValue* state); 67 base::debug::TracedValue* state);
75 static void TaskAsValueInto(const base::PendingTask& task, 68 static void TaskAsValueInto(const base::PendingTask& task,
76 base::debug::TracedValue* state); 69 base::debug::TracedValue* state);
77 70
78 // This lock protects all members except the work queue. 71 // This lock protects all members except the work queue.
79 mutable base::Lock lock_; 72 mutable base::Lock lock_;
80 TaskQueueManager* task_queue_manager_; 73 TaskQueueManager* task_queue_manager_;
81 base::TaskQueue incoming_queue_; 74 base::TaskQueue incoming_queue_;
82 bool auto_pump_; 75 bool auto_pump_;
83 const char* name_; 76 const char* name_;
84 std::priority_queue<base::TimeTicks,
85 std::vector<base::TimeTicks>,
86 std::greater<base::TimeTicks>> delayed_task_run_times_;
87 77
88 base::TaskQueue work_queue_; 78 base::TaskQueue work_queue_;
89 79
90 DISALLOW_COPY_AND_ASSIGN(TaskQueue); 80 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
91 }; 81 };
92 82
93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) 83 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager)
94 : task_queue_manager_(task_queue_manager), 84 : task_queue_manager_(task_queue_manager),
95 auto_pump_(true), 85 auto_pump_(true),
96 name_(nullptr) { 86 name_(nullptr) {
(...skipping 19 matching lines...) Expand all
116 base::TimeDelta delay, 106 base::TimeDelta delay,
117 bool nestable) { 107 bool nestable) {
118 base::AutoLock lock(lock_); 108 base::AutoLock lock(lock_);
119 if (!task_queue_manager_) 109 if (!task_queue_manager_)
120 return false; 110 return false;
121 111
122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable); 112 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable);
123 task_queue_manager_->DidQueueTask(&pending_task); 113 task_queue_manager_->DidQueueTask(&pending_task);
124 114
125 if (delay > base::TimeDelta()) { 115 if (delay > base::TimeDelta()) {
126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
127 delayed_task_run_times_.push(pending_task.delayed_run_time);
128 return task_queue_manager_->PostDelayedTask( 116 return task_queue_manager_->PostDelayedTask(
129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); 117 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
130 } 118 }
131 EnqueueTaskLocked(pending_task); 119 EnqueueTaskLocked(pending_task);
132 return true; 120 return true;
133 } 121 }
134 122
135 bool TaskQueue::IsQueueEmpty() const { 123 bool TaskQueue::IsQueueEmpty() const {
136 if (!work_queue_.empty()) 124 if (!work_queue_.empty())
137 return false; 125 return false;
138 126
139 { 127 {
140 base::AutoLock lock(lock_); 128 base::AutoLock lock(lock_);
141 return incoming_queue_.empty(); 129 return incoming_queue_.empty();
142 } 130 }
143 } 131 }
144 132
145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) { 133 bool TaskQueue::UpdateWorkQueue() {
146 if (!work_queue_.empty()) 134 if (!work_queue_.empty())
147 return true; 135 return true;
148 136
149 { 137 {
150 base::AutoLock lock(lock_); 138 base::AutoLock lock(lock_);
151 if (!delayed_task_run_times_.empty()) {
152 *next_pending_delayed_task =
153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
154 }
155 if (!auto_pump_ || incoming_queue_.empty()) 139 if (!auto_pump_ || incoming_queue_.empty())
156 return false; 140 return false;
157 work_queue_.Swap(&incoming_queue_); 141 work_queue_.Swap(&incoming_queue_);
158 TraceWorkQueueSize(); 142 TraceWorkQueueSize();
159 return true; 143 return true;
160 } 144 }
161 } 145 }
162 146
163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { 147 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
164 base::PendingTask pending_task = work_queue_.front(); 148 base::PendingTask pending_task = work_queue_.front();
(...skipping 14 matching lines...) Expand all
179 EnqueueTaskLocked(pending_task); 163 EnqueueTaskLocked(pending_task);
180 } 164 }
181 165
182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { 166 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
183 lock_.AssertAcquired(); 167 lock_.AssertAcquired();
184 if (!task_queue_manager_) 168 if (!task_queue_manager_)
185 return; 169 return;
186 if (auto_pump_ && incoming_queue_.empty()) 170 if (auto_pump_ && incoming_queue_.empty())
187 task_queue_manager_->MaybePostDoWorkOnMainRunner(); 171 task_queue_manager_->MaybePostDoWorkOnMainRunner();
188 incoming_queue_.push(pending_task); 172 incoming_queue_.push(pending_task);
189
190 if (!pending_task.delayed_run_time.is_null()) {
191 // Update the time of the next pending delayed task.
192 while (!delayed_task_run_times_.empty() &&
193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
194 delayed_task_run_times_.pop();
195 }
196 }
197 } 173 }
198 174
199 void TaskQueue::SetAutoPump(bool auto_pump) { 175 void TaskQueue::SetAutoPump(bool auto_pump) {
200 base::AutoLock lock(lock_); 176 base::AutoLock lock(lock_);
201 if (auto_pump) { 177 if (auto_pump) {
202 auto_pump_ = true; 178 auto_pump_ = true;
203 PumpQueueLocked(); 179 PumpQueueLocked();
204 } else { 180 } else {
205 auto_pump_ = false; 181 auto_pump_ = false;
206 } 182 }
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
262 238
263 } // namespace internal 239 } // namespace internal
264 240
265 TaskQueueManager::TaskQueueManager( 241 TaskQueueManager::TaskQueueManager(
266 size_t task_queue_count, 242 size_t task_queue_count,
267 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, 243 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
268 TaskQueueSelector* selector) 244 TaskQueueSelector* selector)
269 : main_task_runner_(main_task_runner), 245 : main_task_runner_(main_task_runner),
270 selector_(selector), 246 selector_(selector),
271 pending_dowork_count_(0), 247 pending_dowork_count_(0),
272 work_batch_size_(1),
273 time_source_(nullptr),
274 weak_factory_(this) { 248 weak_factory_(this) {
275 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); 249 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
276 TRACE_EVENT_OBJECT_CREATED_WITH_ID( 250 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
277 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", 251 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
278 this); 252 this);
279 253
280 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); 254 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr();
281 for (size_t i = 0; i < task_queue_count; i++) { 255 for (size_t i = 0; i < task_queue_count; i++) {
282 scoped_refptr<internal::TaskQueue> queue( 256 scoped_refptr<internal::TaskQueue> queue(
283 make_scoped_refptr(new internal::TaskQueue(this))); 257 make_scoped_refptr(new internal::TaskQueue(this)));
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
318 internal::TaskQueue* queue = Queue(queue_index); 292 internal::TaskQueue* queue = Queue(queue_index);
319 queue->SetAutoPump(auto_pump); 293 queue->SetAutoPump(auto_pump);
320 } 294 }
321 295
322 void TaskQueueManager::PumpQueue(size_t queue_index) { 296 void TaskQueueManager::PumpQueue(size_t queue_index) {
323 main_thread_checker_.CalledOnValidThread(); 297 main_thread_checker_.CalledOnValidThread();
324 internal::TaskQueue* queue = Queue(queue_index); 298 internal::TaskQueue* queue = Queue(queue_index);
325 queue->PumpQueue(); 299 queue->PumpQueue();
326 } 300 }
327 301
328 bool TaskQueueManager::UpdateWorkQueues( 302 bool TaskQueueManager::UpdateWorkQueues() {
329 base::TimeTicks* next_pending_delayed_task) {
330 // TODO(skyostil): This is not efficient when the number of queues grows very 303 // TODO(skyostil): This is not efficient when the number of queues grows very
331 // large due to the number of locks taken. Consider optimizing when we get 304 // large due to the number of locks taken. Consider optimizing when we get
332 // there. 305 // there.
333 main_thread_checker_.CalledOnValidThread(); 306 main_thread_checker_.CalledOnValidThread();
334 bool has_work = false; 307 bool has_work = false;
335 for (auto& queue : queues_) 308 for (auto& queue : queues_)
336 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task); 309 has_work |= queue->UpdateWorkQueue();
337 return has_work; 310 return has_work;
338 } 311 }
339 312
340 void TaskQueueManager::MaybePostDoWorkOnMainRunner() { 313 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
341 bool on_main_thread = main_task_runner_->BelongsToCurrentThread(); 314 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
342 if (on_main_thread) { 315 if (on_main_thread) {
343 // We only want one pending DoWork posted from the main thread, or we risk 316 // We only want one pending DoWork posted from the main thread, or we risk
344 // an explosion of pending DoWorks which could starve out everything else. 317 // an explosion of pending DoWorks which could starve out everything else.
345 if (pending_dowork_count_ > 0) { 318 if (pending_dowork_count_ > 0) {
346 return; 319 return;
347 } 320 }
348 pending_dowork_count_++; 321 pending_dowork_count_++;
349 } 322 }
350 323
351 main_task_runner_->PostTask( 324 main_task_runner_->PostTask(
352 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_, 325 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_,
353 on_main_thread)); 326 on_main_thread));
354 } 327 }
355 328
356 void TaskQueueManager::DoWork(bool posted_from_main_thread) { 329 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
357 if (posted_from_main_thread) { 330 if (posted_from_main_thread) {
358 pending_dowork_count_--; 331 pending_dowork_count_--;
359 DCHECK_GE(pending_dowork_count_, 0); 332 DCHECK_GE(pending_dowork_count_, 0);
360 } 333 }
361 main_thread_checker_.CalledOnValidThread(); 334 main_thread_checker_.CalledOnValidThread();
335 if (!UpdateWorkQueues())
336 return;
362 337
363 base::TimeTicks next_pending_delayed_task( 338 size_t queue_index;
364 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); 339 if (!SelectWorkQueueToService(&queue_index))
365 for (int i = 0; i < work_batch_size_; i++) { 340 return;
366 if (!UpdateWorkQueues(&next_pending_delayed_task)) 341 MaybePostDoWorkOnMainRunner();
367 return; 342 ProcessTaskFromWorkQueue(queue_index);
368
369 // Interrupt the work batch if we should run the next delayed task.
370 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
371 Now() >= next_pending_delayed_task)
372 return;
373
374 size_t queue_index;
375 if (!SelectWorkQueueToService(&queue_index))
376 return;
377 // Note that this function won't post another call to DoWork if one is
378 // already pending, so it is safe to call it in a loop.
379 MaybePostDoWorkOnMainRunner();
380 ProcessTaskFromWorkQueue(queue_index);
381 }
382 } 343 }
383 344
384 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) { 345 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
385 bool should_run = selector_->SelectWorkQueueToService(out_queue_index); 346 bool should_run = selector_->SelectWorkQueueToService(out_queue_index);
386 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID( 347 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
387 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this, 348 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
388 AsValueWithSelectorResult(should_run, *out_queue_index)); 349 AsValueWithSelectorResult(should_run, *out_queue_index));
389 return should_run; 350 return should_run;
390 } 351 }
391 352
(...skipping 28 matching lines...) Expand all
420 DCHECK(delay > base::TimeDelta()); 381 DCHECK(delay > base::TimeDelta());
421 return main_task_runner_->PostDelayedTask(from_here, task, delay); 382 return main_task_runner_->PostDelayedTask(from_here, task, delay);
422 } 383 }
423 384
424 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { 385 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
425 main_thread_checker_.CalledOnValidThread(); 386 main_thread_checker_.CalledOnValidThread();
426 internal::TaskQueue* queue = Queue(queue_index); 387 internal::TaskQueue* queue = Queue(queue_index);
427 queue->set_name(name); 388 queue->set_name(name);
428 } 389 }
429 390
430 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
431 main_thread_checker_.CalledOnValidThread();
432 DCHECK_GE(work_batch_size, 1);
433 work_batch_size_ = work_batch_size;
434 }
435
436 void TaskQueueManager::SetTimeSourceForTesting(
437 scoped_refptr<cc::TestNowSource> time_source) {
438 main_thread_checker_.CalledOnValidThread();
439 time_source_ = time_source;
440 }
441
442 base::TimeTicks TaskQueueManager::Now() const {
443 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
444 }
445
446 scoped_refptr<base::debug::ConvertableToTraceFormat> 391 scoped_refptr<base::debug::ConvertableToTraceFormat>
447 TaskQueueManager::AsValueWithSelectorResult(bool should_run, 392 TaskQueueManager::AsValueWithSelectorResult(bool should_run,
448 size_t selected_queue) const { 393 size_t selected_queue) const {
449 main_thread_checker_.CalledOnValidThread(); 394 main_thread_checker_.CalledOnValidThread();
450 scoped_refptr<base::debug::TracedValue> state = 395 scoped_refptr<base::debug::TracedValue> state =
451 new base::debug::TracedValue(); 396 new base::debug::TracedValue();
452 state->BeginArray("queues"); 397 state->BeginArray("queues");
453 for (auto& queue : queues_) 398 for (auto& queue : queues_)
454 queue->AsValueInto(state.get()); 399 queue->AsValueInto(state.get());
455 state->EndArray(); 400 state->EndArray();
456 state->BeginDictionary("selector"); 401 state->BeginDictionary("selector");
457 selector_->AsValueInto(state.get()); 402 selector_->AsValueInto(state.get());
458 state->EndDictionary(); 403 state->EndDictionary();
459 if (should_run) 404 if (should_run)
460 state->SetInteger("selected_queue", selected_queue); 405 state->SetInteger("selected_queue", selected_queue);
461 return state; 406 return state;
462 } 407 }
463 408
464 } // namespace content 409 } // namespace content
OLDNEW
« no previous file with comments | « content/renderer/scheduler/task_queue_manager.h ('k') | content/renderer/scheduler/task_queue_manager_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698