OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "platform/scheduler/base/task_queue_impl.h" | 5 #include "platform/scheduler/base/task_queue_impl.h" |
6 | 6 |
7 #include "base/format_macros.h" | 7 #include "base/format_macros.h" |
8 #include "base/memory/ptr_util.h" | 8 #include "base/memory/ptr_util.h" |
9 #include "base/strings/stringprintf.h" | 9 #include "base/strings/stringprintf.h" |
10 #include "base/trace_event/blame_context.h" | 10 #include "base/trace_event/blame_context.h" |
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
147 : task_queue_manager(task_queue_manager), time_domain(time_domain) {} | 147 : task_queue_manager(task_queue_manager), time_domain(time_domain) {} |
148 | 148 |
149 TaskQueueImpl::AnyThread::~AnyThread() {} | 149 TaskQueueImpl::AnyThread::~AnyThread() {} |
150 | 150 |
151 TaskQueueImpl::MainThreadOnly::MainThreadOnly( | 151 TaskQueueImpl::MainThreadOnly::MainThreadOnly( |
152 TaskQueueManager* task_queue_manager, | 152 TaskQueueManager* task_queue_manager, |
153 TaskQueueImpl* task_queue, | 153 TaskQueueImpl* task_queue, |
154 TimeDomain* time_domain) | 154 TimeDomain* time_domain) |
155 : task_queue_manager(task_queue_manager), | 155 : task_queue_manager(task_queue_manager), |
156 time_domain(time_domain), | 156 time_domain(time_domain), |
157 delayed_work_queue(new WorkQueue(task_queue, "delayed")), | 157 delayed_work_queue( |
158 immediate_work_queue(new WorkQueue(task_queue, "immediate")), | 158 new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::DELAYED)), |
| 159 immediate_work_queue(new WorkQueue(task_queue, |
| 160 "immediate", |
| 161 WorkQueue::QueueType::IMMEDIATE)), |
159 set_index(0), | 162 set_index(0), |
160 is_enabled_refcount(0), | 163 is_enabled_refcount(0), |
161 voter_refcount(0), | 164 voter_refcount(0), |
162 blame_context(nullptr), | 165 blame_context(nullptr), |
163 current_fence(0) {} | 166 current_fence(0) {} |
164 | 167 |
165 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() {} | 168 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() {} |
166 | 169 |
167 void TaskQueueImpl::UnregisterTaskQueue() { | 170 void TaskQueueImpl::UnregisterTaskQueue() { |
168 base::AutoLock lock(any_thread_lock_); | 171 base::AutoLock lock(any_thread_lock_); |
169 if (main_thread_only().time_domain) | 172 if (main_thread_only().time_domain) |
170 main_thread_only().time_domain->UnregisterQueue(this); | 173 main_thread_only().time_domain->UnregisterQueue(this); |
171 if (!any_thread().task_queue_manager) | 174 if (!any_thread().task_queue_manager) |
172 return; | 175 return; |
173 any_thread().time_domain = nullptr; | 176 any_thread().time_domain = nullptr; |
174 main_thread_only().time_domain = nullptr; | 177 main_thread_only().time_domain = nullptr; |
175 any_thread().task_queue_manager->UnregisterTaskQueue(this); | 178 any_thread().task_queue_manager->UnregisterTaskQueue(this); |
176 | 179 |
177 any_thread().task_queue_manager = nullptr; | 180 any_thread().task_queue_manager = nullptr; |
178 main_thread_only().task_queue_manager = nullptr; | 181 main_thread_only().task_queue_manager = nullptr; |
179 main_thread_only().delayed_incoming_queue = std::priority_queue<Task>(); | 182 main_thread_only().delayed_incoming_queue = std::priority_queue<Task>(); |
180 any_thread().immediate_incoming_queue.clear(); | 183 any_thread().immediate_incoming_queue.clear(); |
181 main_thread_only().immediate_work_queue.reset(); | 184 main_thread_only().immediate_work_queue.reset(); |
182 main_thread_only().delayed_work_queue.reset(); | 185 main_thread_only().delayed_work_queue.reset(); |
183 } | 186 } |
184 | 187 |
185 bool TaskQueueImpl::RunsTasksOnCurrentThread() const { | 188 bool TaskQueueImpl::RunsTasksOnCurrentThread() const { |
186 base::AutoLock lock(any_thread_lock_); | |
187 return base::PlatformThread::CurrentId() == thread_id_; | 189 return base::PlatformThread::CurrentId() == thread_id_; |
188 } | 190 } |
189 | 191 |
190 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here, | 192 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here, |
191 const base::Closure& task, | 193 const base::Closure& task, |
192 base::TimeDelta delay) { | 194 base::TimeDelta delay) { |
193 if (delay.is_zero()) | 195 if (delay.is_zero()) |
194 return PostImmediateTaskImpl(from_here, task, TaskType::NORMAL); | 196 return PostImmediateTaskImpl(from_here, task, TaskType::NORMAL); |
195 | 197 |
196 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL); | 198 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL); |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
318 } | 320 } |
319 TraceQueueSize(false); | 321 TraceQueueSize(false); |
320 } | 322 } |
321 | 323 |
322 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked( | 324 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked( |
323 const tracked_objects::Location& posted_from, | 325 const tracked_objects::Location& posted_from, |
324 const base::Closure& task, | 326 const base::Closure& task, |
325 base::TimeTicks desired_run_time, | 327 base::TimeTicks desired_run_time, |
326 EnqueueOrder sequence_number, | 328 EnqueueOrder sequence_number, |
327 bool nestable) { | 329 bool nestable) { |
328 if (any_thread().immediate_incoming_queue.empty()) | |
329 any_thread().time_domain->RegisterAsUpdatableTaskQueue(this); | |
330 // If the |immediate_incoming_queue| is empty we need a DoWork posted to make | 330 // If the |immediate_incoming_queue| is empty we need a DoWork posted to make |
331 // it run. | 331 // it run. |
332 if (any_thread().immediate_incoming_queue.empty()) { | 332 if (any_thread().immediate_incoming_queue.empty()) { |
333 // There's no point posting a DoWork for a disabled queue, however we can | 333 // There's no point posting a DoWork for a blocked or disabled queue, |
334 // only tell if it's disabled from the main thread. | 334 // although we can only determine that on the main thread. |
335 if (base::PlatformThread::CurrentId() == thread_id_) { | 335 bool ensure_do_work_posted = |
336 if (IsQueueEnabled() && !BlockedByFenceLocked()) | 336 !RunsTasksOnCurrentThread() || |
337 any_thread().task_queue_manager->MaybeScheduleImmediateWork(FROM_HERE); | 337 (IsQueueEnabled() && !main_thread_only().current_fence); |
338 } else { | 338 any_thread().task_queue_manager->OnQueueHasIncomingImmediateWork( |
339 any_thread().task_queue_manager->MaybeScheduleImmediateWork(FROM_HERE); | 339 this, ensure_do_work_posted); |
340 } | 340 any_thread().time_domain->OnQueueHasImmediateWork(this); |
341 } | 341 } |
342 any_thread().immediate_incoming_queue.emplace_back( | 342 any_thread().immediate_incoming_queue.emplace_back( |
343 posted_from, task, desired_run_time, sequence_number, nestable, | 343 posted_from, task, desired_run_time, sequence_number, nestable, |
344 sequence_number); | 344 sequence_number); |
345 any_thread().task_queue_manager->DidQueueTask( | 345 any_thread().task_queue_manager->DidQueueTask( |
346 any_thread().immediate_incoming_queue.back()); | 346 any_thread().immediate_incoming_queue.back()); |
347 TraceQueueSize(true); | 347 TraceQueueSize(true); |
348 } | 348 } |
349 | 349 |
| 350 void TaskQueueImpl::ReloadImmediateWorkQueueIfEmpty() { |
| 351 if (!main_thread_only().immediate_work_queue->Empty()) |
| 352 return; |
| 353 |
| 354 main_thread_only().immediate_work_queue->ReloadEmptyImmediateQueue(); |
| 355 } |
| 356 |
| 357 WTF::Deque<TaskQueueImpl::Task> TaskQueueImpl::TakeImmediateIncomingQueue() { |
| 358 base::AutoLock lock(any_thread_lock_); |
| 359 WTF::Deque<TaskQueueImpl::Task> queue; |
| 360 queue.swap(any_thread().immediate_incoming_queue); |
| 361 return queue; |
| 362 } |
| 363 |
350 bool TaskQueueImpl::IsEmpty() const { | 364 bool TaskQueueImpl::IsEmpty() const { |
351 if (!main_thread_only().delayed_work_queue->Empty() || | 365 if (!main_thread_only().delayed_work_queue->Empty() || |
352 !main_thread_only().delayed_incoming_queue.empty() || | 366 !main_thread_only().delayed_incoming_queue.empty() || |
353 !main_thread_only().immediate_work_queue->Empty()) { | 367 !main_thread_only().immediate_work_queue->Empty()) { |
354 return false; | 368 return false; |
355 } | 369 } |
356 | 370 |
357 base::AutoLock lock(any_thread_lock_); | 371 base::AutoLock lock(any_thread_lock_); |
358 return any_thread().immediate_incoming_queue.empty(); | 372 return any_thread().immediate_incoming_queue.empty(); |
359 } | 373 } |
(...skipping 29 matching lines...) Expand all Loading... |
389 return !any_thread().immediate_incoming_queue.empty(); | 403 return !any_thread().immediate_incoming_queue.empty(); |
390 } | 404 } |
391 | 405 |
392 base::Optional<base::TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() { | 406 base::Optional<base::TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() { |
393 if (main_thread_only().delayed_incoming_queue.empty()) | 407 if (main_thread_only().delayed_incoming_queue.empty()) |
394 return base::nullopt; | 408 return base::nullopt; |
395 | 409 |
396 return main_thread_only().delayed_incoming_queue.top().delayed_run_time; | 410 return main_thread_only().delayed_incoming_queue.top().delayed_run_time; |
397 } | 411 } |
398 | 412 |
399 void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) { | 413 base::Optional<base::TimeTicks> TaskQueueImpl::WakeUpForDelayedWork( |
| 414 LazyNow* lazy_now) { |
400 // Enqueue all delayed tasks that should be running now, skipping any that | 415 // Enqueue all delayed tasks that should be running now, skipping any that |
401 // have been canceled. | 416 // have been canceled. |
402 while (!main_thread_only().delayed_incoming_queue.empty()) { | 417 while (!main_thread_only().delayed_incoming_queue.empty()) { |
403 Task& task = | 418 Task& task = |
404 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()); | 419 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()); |
405 if (task.task.IsCancelled()) { | 420 if (task.task.IsCancelled()) { |
406 main_thread_only().delayed_incoming_queue.pop(); | 421 main_thread_only().delayed_incoming_queue.pop(); |
407 continue; | 422 continue; |
408 } | 423 } |
409 if (task.delayed_run_time > lazy_now->Now()) | 424 if (task.delayed_run_time > lazy_now->Now()) |
410 break; | 425 break; |
411 task.set_enqueue_order( | 426 task.set_enqueue_order( |
412 main_thread_only().task_queue_manager->GetNextSequenceNumber()); | 427 main_thread_only().task_queue_manager->GetNextSequenceNumber()); |
413 main_thread_only().delayed_work_queue->Push(std::move(task)); | 428 main_thread_only().delayed_work_queue->Push(std::move(task)); |
414 main_thread_only().delayed_incoming_queue.pop(); | 429 main_thread_only().delayed_incoming_queue.pop(); |
415 } | 430 } |
416 | 431 |
417 // Make sure the next wake up is scheduled. | 432 // Make sure the next wake up is scheduled. |
418 if (!main_thread_only().delayed_incoming_queue.empty()) { | 433 if (!main_thread_only().delayed_incoming_queue.empty()) |
419 main_thread_only().time_domain->ScheduleDelayedWork( | 434 return main_thread_only().delayed_incoming_queue.top().delayed_run_time; |
420 this, main_thread_only().delayed_incoming_queue.top().delayed_run_time, | |
421 lazy_now->Now()); | |
422 } | |
423 } | |
424 | 435 |
425 bool TaskQueueImpl::MaybeUpdateImmediateWorkQueues() { | 436 return base::Optional<base::TimeTicks>(); |
426 if (!main_thread_only().task_queue_manager) | |
427 return false; | |
428 | |
429 if (!main_thread_only().immediate_work_queue->Empty()) | |
430 return true; | |
431 | |
432 base::AutoLock lock(any_thread_lock_); | |
433 main_thread_only().immediate_work_queue->SwapLocked( | |
434 any_thread().immediate_incoming_queue); | |
435 // |immediate_work_queue| is now empty so updates are no longer required. | |
436 return false; | |
437 } | 437 } |
438 | 438 |
439 void TaskQueueImpl::TraceQueueSize(bool is_locked) const { | 439 void TaskQueueImpl::TraceQueueSize(bool is_locked) const { |
440 bool is_tracing; | 440 bool is_tracing; |
441 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_, | 441 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_, |
442 &is_tracing); | 442 &is_tracing); |
443 if (!is_tracing) | 443 if (!is_tracing) |
444 return; | 444 return; |
445 | 445 |
446 // It's only safe to access the work queues from the main thread. | 446 // It's only safe to access the work queues from the main thread. |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
569 // another way of asserting that UnregisterTaskQueue has not been called. | 569 // another way of asserting that UnregisterTaskQueue has not been called. |
570 DCHECK(any_thread().time_domain); | 570 DCHECK(any_thread().time_domain); |
571 if (!any_thread().time_domain) | 571 if (!any_thread().time_domain) |
572 return; | 572 return; |
573 DCHECK(main_thread_checker_.CalledOnValidThread()); | 573 DCHECK(main_thread_checker_.CalledOnValidThread()); |
574 if (time_domain == main_thread_only().time_domain) | 574 if (time_domain == main_thread_only().time_domain) |
575 return; | 575 return; |
576 | 576 |
577 any_thread().time_domain = time_domain; | 577 any_thread().time_domain = time_domain; |
578 } | 578 } |
579 // We rely here on TimeDomain::MigrateQueue being thread-safe to use with | 579 |
580 // TimeDomain::Register/UnregisterAsUpdatableTaskQueue. | 580 base::TimeTicks wake_up_time = |
581 main_thread_only().time_domain->MigrateQueue(this, time_domain); | 581 main_thread_only().time_domain->MigrateQueue(this, time_domain); |
582 main_thread_only().time_domain = time_domain; | 582 main_thread_only().time_domain = time_domain; |
| 583 |
| 584 if (!wake_up_time.is_null()) |
| 585 time_domain->ScheduleDelayedWork(this, wake_up_time, time_domain->Now()); |
583 } | 586 } |
584 | 587 |
585 TimeDomain* TaskQueueImpl::GetTimeDomain() const { | 588 TimeDomain* TaskQueueImpl::GetTimeDomain() const { |
586 if (base::PlatformThread::CurrentId() == thread_id_) | 589 if (base::PlatformThread::CurrentId() == thread_id_) |
587 return main_thread_only().time_domain; | 590 return main_thread_only().time_domain; |
588 | 591 |
589 base::AutoLock lock(any_thread_lock_); | 592 base::AutoLock lock(any_thread_lock_); |
590 return any_thread().time_domain; | 593 return any_thread().time_domain; |
591 } | 594 } |
592 | 595 |
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
665 } | 668 } |
666 | 669 |
667 base::AutoLock lock(any_thread_lock_); | 670 base::AutoLock lock(any_thread_lock_); |
668 if (any_thread().immediate_incoming_queue.empty()) | 671 if (any_thread().immediate_incoming_queue.empty()) |
669 return true; | 672 return true; |
670 | 673 |
671 return any_thread().immediate_incoming_queue.front().enqueue_order() > | 674 return any_thread().immediate_incoming_queue.front().enqueue_order() > |
672 main_thread_only().current_fence; | 675 main_thread_only().current_fence; |
673 } | 676 } |
674 | 677 |
675 bool TaskQueueImpl::BlockedByFenceLocked() const { | 678 bool TaskQueueImpl::ImmediateTaskCouldRun() const { |
676 if (!main_thread_only().current_fence) | 679 if (!IsQueueEnabled()) |
677 return false; | 680 return false; |
678 | 681 |
679 if (!main_thread_only().immediate_work_queue->BlockedByFence() || | 682 if (!main_thread_only().current_fence) |
680 !main_thread_only().delayed_work_queue->BlockedByFence()) { | |
681 return false; | |
682 } | |
683 | |
684 if (any_thread().immediate_incoming_queue.empty()) | |
685 return true; | 683 return true; |
686 | 684 |
687 return any_thread().immediate_incoming_queue.front().enqueue_order() > | 685 base::AutoLock lock(any_thread_lock_); |
| 686 // If |immediate_incoming_queue| is empty then any task posted is guaranteed |
| 687 // to be blocked by the fence. |
| 688 if (any_thread().immediate_incoming_queue.empty()) |
| 689 return false; |
| 690 |
| 691 return any_thread().immediate_incoming_queue.front().enqueue_order() < |
688 main_thread_only().current_fence; | 692 main_thread_only().current_fence; |
689 } | 693 } |
690 | 694 |
691 EnqueueOrder TaskQueueImpl::GetFenceForTest() const { | 695 EnqueueOrder TaskQueueImpl::GetFenceForTest() const { |
692 return main_thread_only().current_fence; | 696 return main_thread_only().current_fence; |
693 } | 697 } |
694 | 698 |
695 // static | 699 // static |
696 void TaskQueueImpl::QueueAsValueInto(const WTF::Deque<Task>& queue, | 700 void TaskQueueImpl::QueueAsValueInto(const WTF::Deque<Task>& queue, |
697 base::trace_event::TracedValue* state) { | 701 base::trace_event::TracedValue* state) { |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
792 bool is_enabled = IsQueueEnabled(); | 796 bool is_enabled = IsQueueEnabled(); |
793 if (was_enabled != is_enabled) | 797 if (was_enabled != is_enabled) |
794 EnableOrDisableWithSelector(is_enabled); | 798 EnableOrDisableWithSelector(is_enabled); |
795 } | 799 } |
796 | 800 |
797 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) { | 801 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) { |
798 if (!main_thread_only().task_queue_manager) | 802 if (!main_thread_only().task_queue_manager) |
799 return; | 803 return; |
800 | 804 |
801 if (enable) { | 805 if (enable) { |
802 // Note it's the job of the selector to tell the TaskQueueManager if | 806 // Note the selector calls TaskQueueManager::OnTaskQueueEnabled which posts |
803 // a DoWork needs posting. | 807 // a DoWork if needed. |
804 main_thread_only().task_queue_manager->selector_.EnableQueue(this); | 808 main_thread_only().task_queue_manager->selector_.EnableQueue(this); |
805 } else { | 809 } else { |
806 main_thread_only().task_queue_manager->selector_.DisableQueue(this); | 810 main_thread_only().task_queue_manager->selector_.DisableQueue(this); |
807 } | 811 } |
808 } | 812 } |
809 | 813 |
810 std::unique_ptr<TaskQueueImpl::QueueEnabledVoter> | 814 std::unique_ptr<TaskQueueImpl::QueueEnabledVoter> |
811 TaskQueueImpl::CreateQueueEnabledVoter() { | 815 TaskQueueImpl::CreateQueueEnabledVoter() { |
812 main_thread_only().voter_refcount++; | 816 main_thread_only().voter_refcount++; |
813 main_thread_only().is_enabled_refcount++; | 817 main_thread_only().is_enabled_refcount++; |
814 return base::MakeUnique<QueueEnabledVoterImpl>(this); | 818 return base::MakeUnique<QueueEnabledVoterImpl>(this); |
815 } | 819 } |
816 | 820 |
817 void TaskQueueImpl::SweepCanceledDelayedTasks(base::TimeTicks now) { | 821 void TaskQueueImpl::SweepCanceledDelayedTasks(base::TimeTicks now) { |
818 if (main_thread_only().delayed_incoming_queue.empty()) | 822 if (main_thread_only().delayed_incoming_queue.empty()) |
819 return; | 823 return; |
820 | 824 |
821 base::TimeTicks first_task_runtime = | 825 base::TimeTicks first_task_runtime = |
822 main_thread_only().delayed_incoming_queue.top().delayed_run_time; | 826 main_thread_only().delayed_incoming_queue.top().delayed_run_time; |
823 | 827 |
824 // TODO(alexclarke): Let this remove all tasks once the DoWork refactor has | 828 // Remove canceled tasks. |
825 // landed. | |
826 std::priority_queue<Task> remaining_tasks; | 829 std::priority_queue<Task> remaining_tasks; |
827 while (!main_thread_only().delayed_incoming_queue.empty()) { | 830 while (!main_thread_only().delayed_incoming_queue.empty()) { |
828 if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled() || | 831 if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled()) { |
829 main_thread_only().delayed_incoming_queue.top().delayed_run_time == | |
830 first_task_runtime) { | |
831 remaining_tasks.push(std::move( | 832 remaining_tasks.push(std::move( |
832 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()))); | 833 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()))); |
833 } | 834 } |
834 main_thread_only().delayed_incoming_queue.pop(); | 835 main_thread_only().delayed_incoming_queue.pop(); |
835 } | 836 } |
| 837 |
836 main_thread_only().delayed_incoming_queue = std::move(remaining_tasks); | 838 main_thread_only().delayed_incoming_queue = std::move(remaining_tasks); |
| 839 |
| 840 // Re-schedule delayed call to WakeUpForDelayedWork if needed. |
| 841 if (main_thread_only().delayed_incoming_queue.empty()) { |
| 842 main_thread_only().time_domain->CancelDelayedWork(this); |
| 843 } else if (first_task_runtime != |
| 844 main_thread_only().delayed_incoming_queue.top().delayed_run_time) { |
| 845 main_thread_only().time_domain->ScheduleDelayedWork( |
| 846 this, main_thread_only().delayed_incoming_queue.top().delayed_run_time, |
| 847 main_thread_only().time_domain->Now()); |
| 848 } |
| 849 } |
| 850 |
| 851 void TaskQueueImpl::PushImmediateIncomingTaskForTest( |
| 852 TaskQueueImpl::Task&& task) { |
| 853 base::AutoLock lock(any_thread_lock_); |
| 854 any_thread().immediate_incoming_queue.push_back(std::move(task)); |
837 } | 855 } |
838 | 856 |
839 } // namespace internal | 857 } // namespace internal |
840 } // namespace scheduler | 858 } // namespace scheduler |
841 } // namespace blink | 859 } // namespace blink |
OLD | NEW |