Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(651)

Side by Side Diff: third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc

Issue 2546423002: [Try # 3] Scheduler refactoring to virtually eliminate redundant DoWorks (Closed)
Patch Set: Add an extra dcheck Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "platform/scheduler/base/task_queue_impl.h" 5 #include "platform/scheduler/base/task_queue_impl.h"
6 6
7 #include "base/format_macros.h" 7 #include "base/format_macros.h"
8 #include "base/memory/ptr_util.h" 8 #include "base/memory/ptr_util.h"
9 #include "base/strings/stringprintf.h" 9 #include "base/strings/stringprintf.h"
10 #include "base/trace_event/blame_context.h" 10 #include "base/trace_event/blame_context.h"
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 : task_queue_manager(task_queue_manager), time_domain(time_domain) {} 147 : task_queue_manager(task_queue_manager), time_domain(time_domain) {}
148 148
149 TaskQueueImpl::AnyThread::~AnyThread() {} 149 TaskQueueImpl::AnyThread::~AnyThread() {}
150 150
151 TaskQueueImpl::MainThreadOnly::MainThreadOnly( 151 TaskQueueImpl::MainThreadOnly::MainThreadOnly(
152 TaskQueueManager* task_queue_manager, 152 TaskQueueManager* task_queue_manager,
153 TaskQueueImpl* task_queue, 153 TaskQueueImpl* task_queue,
154 TimeDomain* time_domain) 154 TimeDomain* time_domain)
155 : task_queue_manager(task_queue_manager), 155 : task_queue_manager(task_queue_manager),
156 time_domain(time_domain), 156 time_domain(time_domain),
157 delayed_work_queue(new WorkQueue(task_queue, "delayed")), 157 delayed_work_queue(
158 immediate_work_queue(new WorkQueue(task_queue, "immediate")), 158 new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::DELAYED)),
159 immediate_work_queue(new WorkQueue(task_queue,
160 "immediate",
161 WorkQueue::QueueType::IMMEDIATE)),
159 set_index(0), 162 set_index(0),
160 is_enabled_refcount(0), 163 is_enabled_refcount(0),
161 voter_refcount(0), 164 voter_refcount(0),
162 blame_context(nullptr), 165 blame_context(nullptr),
163 current_fence(0) {} 166 current_fence(0) {}
164 167
165 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() {} 168 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() {}
166 169
167 void TaskQueueImpl::UnregisterTaskQueue() { 170 void TaskQueueImpl::UnregisterTaskQueue() {
168 base::AutoLock lock(any_thread_lock_); 171 base::AutoLock lock(any_thread_lock_);
169 if (main_thread_only().time_domain) 172 if (main_thread_only().time_domain)
170 main_thread_only().time_domain->UnregisterQueue(this); 173 main_thread_only().time_domain->UnregisterQueue(this);
171 if (!any_thread().task_queue_manager) 174 if (!any_thread().task_queue_manager)
172 return; 175 return;
173 any_thread().time_domain = nullptr; 176 any_thread().time_domain = nullptr;
174 main_thread_only().time_domain = nullptr; 177 main_thread_only().time_domain = nullptr;
175 any_thread().task_queue_manager->UnregisterTaskQueue(this); 178 any_thread().task_queue_manager->UnregisterTaskQueue(this);
176 179
177 any_thread().task_queue_manager = nullptr; 180 any_thread().task_queue_manager = nullptr;
178 main_thread_only().task_queue_manager = nullptr; 181 main_thread_only().task_queue_manager = nullptr;
179 main_thread_only().delayed_incoming_queue = std::priority_queue<Task>(); 182 main_thread_only().delayed_incoming_queue = std::priority_queue<Task>();
180 any_thread().immediate_incoming_queue.clear(); 183 any_thread().immediate_incoming_queue.clear();
181 main_thread_only().immediate_work_queue.reset(); 184 main_thread_only().immediate_work_queue.reset();
182 main_thread_only().delayed_work_queue.reset(); 185 main_thread_only().delayed_work_queue.reset();
183 } 186 }
184 187
185 bool TaskQueueImpl::RunsTasksOnCurrentThread() const { 188 bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
186 base::AutoLock lock(any_thread_lock_);
187 return base::PlatformThread::CurrentId() == thread_id_; 189 return base::PlatformThread::CurrentId() == thread_id_;
188 } 190 }
189 191
190 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here, 192 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here,
191 const base::Closure& task, 193 const base::Closure& task,
192 base::TimeDelta delay) { 194 base::TimeDelta delay) {
193 if (delay.is_zero()) 195 if (delay.is_zero())
194 return PostImmediateTaskImpl(from_here, task, TaskType::NORMAL); 196 return PostImmediateTaskImpl(from_here, task, TaskType::NORMAL);
195 197
196 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL); 198 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
318 } 320 }
319 TraceQueueSize(false); 321 TraceQueueSize(false);
320 } 322 }
321 323
322 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked( 324 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(
323 const tracked_objects::Location& posted_from, 325 const tracked_objects::Location& posted_from,
324 const base::Closure& task, 326 const base::Closure& task,
325 base::TimeTicks desired_run_time, 327 base::TimeTicks desired_run_time,
326 EnqueueOrder sequence_number, 328 EnqueueOrder sequence_number,
327 bool nestable) { 329 bool nestable) {
328 if (any_thread().immediate_incoming_queue.empty())
329 any_thread().time_domain->RegisterAsUpdatableTaskQueue(this);
330 // If the |immediate_incoming_queue| is empty we need a DoWork posted to make 330 // If the |immediate_incoming_queue| is empty we need a DoWork posted to make
331 // it run. 331 // it run.
332 if (any_thread().immediate_incoming_queue.empty()) { 332 if (any_thread().immediate_incoming_queue.empty()) {
333 // There's no point posting a DoWork for a disabled queue, however we can 333 // There's no point posting a DoWork for a blocked or disabled queue,
334 // only tell if it's disabled from the main thread. 334 // although we can only determine that on the main thread.
335 if (base::PlatformThread::CurrentId() == thread_id_) { 335 bool ensure_do_work_posted =
336 if (IsQueueEnabled() && !BlockedByFenceLocked()) 336 !RunsTasksOnCurrentThread() ||
337 any_thread().task_queue_manager->MaybeScheduleImmediateWork(FROM_HERE); 337 (IsQueueEnabled() && !main_thread_only().current_fence);
338 } else { 338 any_thread().task_queue_manager->OnQueueHasIncomingImmediateWork(
339 any_thread().task_queue_manager->MaybeScheduleImmediateWork(FROM_HERE); 339 this, ensure_do_work_posted);
340 } 340 any_thread().time_domain->OnQueueHasImmediateWork(this);
341 } 341 }
342 any_thread().immediate_incoming_queue.emplace_back( 342 any_thread().immediate_incoming_queue.emplace_back(
343 posted_from, task, desired_run_time, sequence_number, nestable, 343 posted_from, task, desired_run_time, sequence_number, nestable,
344 sequence_number); 344 sequence_number);
345 any_thread().task_queue_manager->DidQueueTask( 345 any_thread().task_queue_manager->DidQueueTask(
346 any_thread().immediate_incoming_queue.back()); 346 any_thread().immediate_incoming_queue.back());
347 TraceQueueSize(true); 347 TraceQueueSize(true);
348 } 348 }
349 349
350 void TaskQueueImpl::ReloadImmediateWorkQueueIfEmpty() {
351 if (!main_thread_only().immediate_work_queue->Empty())
352 return;
353
354 main_thread_only().immediate_work_queue->ReloadEmptyImmediateQueue();
355 }
356
357 WTF::Deque<TaskQueueImpl::Task> TaskQueueImpl::TakeImmediateIncomingQueue() {
358 base::AutoLock lock(any_thread_lock_);
359 WTF::Deque<TaskQueueImpl::Task> queue;
360 queue.swap(any_thread().immediate_incoming_queue);
361 return queue;
362 }
363
350 bool TaskQueueImpl::IsEmpty() const { 364 bool TaskQueueImpl::IsEmpty() const {
351 if (!main_thread_only().delayed_work_queue->Empty() || 365 if (!main_thread_only().delayed_work_queue->Empty() ||
352 !main_thread_only().delayed_incoming_queue.empty() || 366 !main_thread_only().delayed_incoming_queue.empty() ||
353 !main_thread_only().immediate_work_queue->Empty()) { 367 !main_thread_only().immediate_work_queue->Empty()) {
354 return false; 368 return false;
355 } 369 }
356 370
357 base::AutoLock lock(any_thread_lock_); 371 base::AutoLock lock(any_thread_lock_);
358 return any_thread().immediate_incoming_queue.empty(); 372 return any_thread().immediate_incoming_queue.empty();
359 } 373 }
(...skipping 29 matching lines...) Expand all
389 return !any_thread().immediate_incoming_queue.empty(); 403 return !any_thread().immediate_incoming_queue.empty();
390 } 404 }
391 405
392 base::Optional<base::TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() { 406 base::Optional<base::TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() {
393 if (main_thread_only().delayed_incoming_queue.empty()) 407 if (main_thread_only().delayed_incoming_queue.empty())
394 return base::nullopt; 408 return base::nullopt;
395 409
396 return main_thread_only().delayed_incoming_queue.top().delayed_run_time; 410 return main_thread_only().delayed_incoming_queue.top().delayed_run_time;
397 } 411 }
398 412
399 void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) { 413 base::Optional<base::TimeTicks> TaskQueueImpl::WakeUpForDelayedWork(
414 LazyNow* lazy_now) {
400 // Enqueue all delayed tasks that should be running now, skipping any that 415 // Enqueue all delayed tasks that should be running now, skipping any that
401 // have been canceled. 416 // have been canceled.
402 while (!main_thread_only().delayed_incoming_queue.empty()) { 417 while (!main_thread_only().delayed_incoming_queue.empty()) {
403 Task& task = 418 Task& task =
404 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()); 419 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top());
405 if (task.task.IsCancelled()) { 420 if (task.task.IsCancelled()) {
406 main_thread_only().delayed_incoming_queue.pop(); 421 main_thread_only().delayed_incoming_queue.pop();
407 continue; 422 continue;
408 } 423 }
409 if (task.delayed_run_time > lazy_now->Now()) 424 if (task.delayed_run_time > lazy_now->Now())
410 break; 425 break;
411 task.set_enqueue_order( 426 task.set_enqueue_order(
412 main_thread_only().task_queue_manager->GetNextSequenceNumber()); 427 main_thread_only().task_queue_manager->GetNextSequenceNumber());
413 main_thread_only().delayed_work_queue->Push(std::move(task)); 428 main_thread_only().delayed_work_queue->Push(std::move(task));
414 main_thread_only().delayed_incoming_queue.pop(); 429 main_thread_only().delayed_incoming_queue.pop();
415 } 430 }
416 431
417 // Make sure the next wake up is scheduled. 432 // Make sure the next wake up is scheduled.
418 if (!main_thread_only().delayed_incoming_queue.empty()) { 433 if (!main_thread_only().delayed_incoming_queue.empty())
419 main_thread_only().time_domain->ScheduleDelayedWork( 434 return main_thread_only().delayed_incoming_queue.top().delayed_run_time;
420 this, main_thread_only().delayed_incoming_queue.top().delayed_run_time,
421 lazy_now->Now());
422 }
423 }
424 435
425 bool TaskQueueImpl::MaybeUpdateImmediateWorkQueues() { 436 return base::Optional<base::TimeTicks>();
426 if (!main_thread_only().task_queue_manager)
427 return false;
428
429 if (!main_thread_only().immediate_work_queue->Empty())
430 return true;
431
432 base::AutoLock lock(any_thread_lock_);
433 main_thread_only().immediate_work_queue->SwapLocked(
434 any_thread().immediate_incoming_queue);
435 // |immediate_work_queue| is now empty so updates are no longer required.
436 return false;
437 } 437 }
438 438
439 void TaskQueueImpl::TraceQueueSize(bool is_locked) const { 439 void TaskQueueImpl::TraceQueueSize(bool is_locked) const {
440 bool is_tracing; 440 bool is_tracing;
441 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_, 441 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_,
442 &is_tracing); 442 &is_tracing);
443 if (!is_tracing) 443 if (!is_tracing)
444 return; 444 return;
445 445
446 // It's only safe to access the work queues from the main thread. 446 // It's only safe to access the work queues from the main thread.
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
569 // another way of asserting that UnregisterTaskQueue has not been called. 569 // another way of asserting that UnregisterTaskQueue has not been called.
570 DCHECK(any_thread().time_domain); 570 DCHECK(any_thread().time_domain);
571 if (!any_thread().time_domain) 571 if (!any_thread().time_domain)
572 return; 572 return;
573 DCHECK(main_thread_checker_.CalledOnValidThread()); 573 DCHECK(main_thread_checker_.CalledOnValidThread());
574 if (time_domain == main_thread_only().time_domain) 574 if (time_domain == main_thread_only().time_domain)
575 return; 575 return;
576 576
577 any_thread().time_domain = time_domain; 577 any_thread().time_domain = time_domain;
578 } 578 }
579 // We rely here on TimeDomain::MigrateQueue being thread-safe to use with 579
580 // TimeDomain::Register/UnregisterAsUpdatableTaskQueue. 580 base::TimeTicks wake_up_time =
581 main_thread_only().time_domain->MigrateQueue(this, time_domain); 581 main_thread_only().time_domain->MigrateQueue(this, time_domain);
582 main_thread_only().time_domain = time_domain; 582 main_thread_only().time_domain = time_domain;
583
584 if (!wake_up_time.is_null())
585 time_domain->ScheduleDelayedWork(this, wake_up_time, time_domain->Now());
583 } 586 }
584 587
585 TimeDomain* TaskQueueImpl::GetTimeDomain() const { 588 TimeDomain* TaskQueueImpl::GetTimeDomain() const {
586 if (base::PlatformThread::CurrentId() == thread_id_) 589 if (base::PlatformThread::CurrentId() == thread_id_)
587 return main_thread_only().time_domain; 590 return main_thread_only().time_domain;
588 591
589 base::AutoLock lock(any_thread_lock_); 592 base::AutoLock lock(any_thread_lock_);
590 return any_thread().time_domain; 593 return any_thread().time_domain;
591 } 594 }
592 595
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
665 } 668 }
666 669
667 base::AutoLock lock(any_thread_lock_); 670 base::AutoLock lock(any_thread_lock_);
668 if (any_thread().immediate_incoming_queue.empty()) 671 if (any_thread().immediate_incoming_queue.empty())
669 return true; 672 return true;
670 673
671 return any_thread().immediate_incoming_queue.front().enqueue_order() > 674 return any_thread().immediate_incoming_queue.front().enqueue_order() >
672 main_thread_only().current_fence; 675 main_thread_only().current_fence;
673 } 676 }
674 677
675 bool TaskQueueImpl::BlockedByFenceLocked() const { 678 bool TaskQueueImpl::ImmediateTaskCouldRun() const {
676 if (!main_thread_only().current_fence) 679 if (!IsQueueEnabled())
677 return false; 680 return false;
678 681
679 if (!main_thread_only().immediate_work_queue->BlockedByFence() || 682 if (!main_thread_only().current_fence)
680 !main_thread_only().delayed_work_queue->BlockedByFence()) {
681 return false;
682 }
683
684 if (any_thread().immediate_incoming_queue.empty())
685 return true; 683 return true;
686 684
687 return any_thread().immediate_incoming_queue.front().enqueue_order() > 685 base::AutoLock lock(any_thread_lock_);
686 // If |immediate_incoming_queue| is empty then any task posted is guaranteed
687 // to be blocked by the fence.
688 if (any_thread().immediate_incoming_queue.empty())
689 return false;
690
691 return any_thread().immediate_incoming_queue.front().enqueue_order() <
688 main_thread_only().current_fence; 692 main_thread_only().current_fence;
689 } 693 }
690 694
691 EnqueueOrder TaskQueueImpl::GetFenceForTest() const { 695 EnqueueOrder TaskQueueImpl::GetFenceForTest() const {
692 return main_thread_only().current_fence; 696 return main_thread_only().current_fence;
693 } 697 }
694 698
695 // static 699 // static
696 void TaskQueueImpl::QueueAsValueInto(const WTF::Deque<Task>& queue, 700 void TaskQueueImpl::QueueAsValueInto(const WTF::Deque<Task>& queue,
697 base::trace_event::TracedValue* state) { 701 base::trace_event::TracedValue* state) {
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
792 bool is_enabled = IsQueueEnabled(); 796 bool is_enabled = IsQueueEnabled();
793 if (was_enabled != is_enabled) 797 if (was_enabled != is_enabled)
794 EnableOrDisableWithSelector(is_enabled); 798 EnableOrDisableWithSelector(is_enabled);
795 } 799 }
796 800
797 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) { 801 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
798 if (!main_thread_only().task_queue_manager) 802 if (!main_thread_only().task_queue_manager)
799 return; 803 return;
800 804
801 if (enable) { 805 if (enable) {
802 // Note it's the job of the selector to tell the TaskQueueManager if 806 // Note the selector calls TaskQueueManager::OnTaskQueueEnabled which posts
803 // a DoWork needs posting. 807 // a DoWork if needed.
804 main_thread_only().task_queue_manager->selector_.EnableQueue(this); 808 main_thread_only().task_queue_manager->selector_.EnableQueue(this);
805 } else { 809 } else {
806 main_thread_only().task_queue_manager->selector_.DisableQueue(this); 810 main_thread_only().task_queue_manager->selector_.DisableQueue(this);
807 } 811 }
808 } 812 }
809 813
810 std::unique_ptr<TaskQueueImpl::QueueEnabledVoter> 814 std::unique_ptr<TaskQueueImpl::QueueEnabledVoter>
811 TaskQueueImpl::CreateQueueEnabledVoter() { 815 TaskQueueImpl::CreateQueueEnabledVoter() {
812 main_thread_only().voter_refcount++; 816 main_thread_only().voter_refcount++;
813 main_thread_only().is_enabled_refcount++; 817 main_thread_only().is_enabled_refcount++;
814 return base::MakeUnique<QueueEnabledVoterImpl>(this); 818 return base::MakeUnique<QueueEnabledVoterImpl>(this);
815 } 819 }
816 820
817 void TaskQueueImpl::SweepCanceledDelayedTasks(base::TimeTicks now) { 821 void TaskQueueImpl::SweepCanceledDelayedTasks(base::TimeTicks now) {
818 if (main_thread_only().delayed_incoming_queue.empty()) 822 if (main_thread_only().delayed_incoming_queue.empty())
819 return; 823 return;
820 824
821 base::TimeTicks first_task_runtime = 825 base::TimeTicks first_task_runtime =
822 main_thread_only().delayed_incoming_queue.top().delayed_run_time; 826 main_thread_only().delayed_incoming_queue.top().delayed_run_time;
823 827
824 // TODO(alexclarke): Let this remove all tasks once the DoWork refactor has 828 // Remove canceled tasks.
825 // landed.
826 std::priority_queue<Task> remaining_tasks; 829 std::priority_queue<Task> remaining_tasks;
827 while (!main_thread_only().delayed_incoming_queue.empty()) { 830 while (!main_thread_only().delayed_incoming_queue.empty()) {
828 if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled() || 831 if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled()) {
829 main_thread_only().delayed_incoming_queue.top().delayed_run_time ==
830 first_task_runtime) {
831 remaining_tasks.push(std::move( 832 remaining_tasks.push(std::move(
832 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()))); 833 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top())));
833 } 834 }
834 main_thread_only().delayed_incoming_queue.pop(); 835 main_thread_only().delayed_incoming_queue.pop();
835 } 836 }
837
836 main_thread_only().delayed_incoming_queue = std::move(remaining_tasks); 838 main_thread_only().delayed_incoming_queue = std::move(remaining_tasks);
839
840 // Re-schedule delayed call to WakeUpForDelayedWork if needed.
841 if (main_thread_only().delayed_incoming_queue.empty()) {
842 main_thread_only().time_domain->CancelDelayedWork(this);
843 } else if (first_task_runtime !=
844 main_thread_only().delayed_incoming_queue.top().delayed_run_time) {
845 main_thread_only().time_domain->ScheduleDelayedWork(
846 this, main_thread_only().delayed_incoming_queue.top().delayed_run_time,
847 main_thread_only().time_domain->Now());
848 }
849 }
850
851 void TaskQueueImpl::PushImmediateIncomingTaskForTest(
852 TaskQueueImpl::Task&& task) {
853 base::AutoLock lock(any_thread_lock_);
854 any_thread().immediate_incoming_queue.push_back(std::move(task));
837 } 855 }
838 856
839 } // namespace internal 857 } // namespace internal
840 } // namespace scheduler 858 } // namespace scheduler
841 } // namespace blink 859 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698