OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
11 #include <memory> | 11 #include <memory> |
12 #include <set> | 12 #include <set> |
13 #include <unordered_map> | 13 #include <unordered_map> |
14 #include <utility> | 14 #include <utility> |
15 #include <vector> | 15 #include <vector> |
16 | 16 |
17 #include "base/atomic_sequence_num.h" | 17 #include "base/atomic_sequence_num.h" |
18 #include "base/atomicops.h" | |
19 #include "base/callback.h" | 18 #include "base/callback.h" |
20 #include "base/compiler_specific.h" | 19 #include "base/compiler_specific.h" |
21 #include "base/critical_closure.h" | 20 #include "base/critical_closure.h" |
| 21 #include "base/debug/dump_without_crashing.h" |
22 #include "base/lazy_instance.h" | 22 #include "base/lazy_instance.h" |
23 #include "base/logging.h" | 23 #include "base/logging.h" |
24 #include "base/macros.h" | 24 #include "base/macros.h" |
25 #include "base/memory/ptr_util.h" | 25 #include "base/memory/ptr_util.h" |
26 #include "base/stl_util.h" | 26 #include "base/stl_util.h" |
27 #include "base/strings/stringprintf.h" | 27 #include "base/strings/stringprintf.h" |
28 #include "base/synchronization/condition_variable.h" | 28 #include "base/synchronization/condition_variable.h" |
29 #include "base/synchronization/lock.h" | 29 #include "base/synchronization/lock.h" |
30 #include "base/task_scheduler/post_task.h" | 30 #include "base/task_scheduler/post_task.h" |
31 #include "base/task_scheduler/task_scheduler.h" | 31 #include "base/task_scheduler/task_scheduler.h" |
(...skipping 15 matching lines...) Expand all Loading... |
47 #endif | 47 #endif |
48 | 48 |
49 #if !defined(OS_NACL) | 49 #if !defined(OS_NACL) |
50 #include "base/metrics/histogram_macros.h" | 50 #include "base/metrics/histogram_macros.h" |
51 #endif | 51 #endif |
52 | 52 |
53 namespace base { | 53 namespace base { |
54 | 54 |
55 namespace { | 55 namespace { |
56 | 56 |
57 // An enum representing the state of all pools. Any given non-test process | 57 // An enum representing the state of all pools. A non-test process should only |
58 // should only ever transition from NONE_ACTIVE to one of the active states. | 58 // ever transition from POST_TASK_DISABLED to one of the active states. A test |
59 // Transitions between actives states are unexpected. The | 59 // process may transition from one of the active states to POST_TASK_DISABLED |
60 // REDIRECTED_TO_TASK_SCHEDULER transition occurs when | 60 // when DisableForProcessForTesting() is called. |
61 // RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition | |
62 // occurs when a Worker needs to be created because the first task was posted | |
63 // and the state is still NONE_ACTIVE. In a test process, a transition to | |
64 // NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is | |
65 // called. | |
66 // | 61 // |
67 // |g_all_pools_state| uses relaxed atomic operations to ensure no data race | 62 // External memory synchronization is required to call a method that reads |
68 // between reads/writes, strict memory ordering isn't required per no other | 63 // |g_all_pools_state| after calling a method that modifies it. |
69 // state being inferred from its value. Explicit synchronization (e.g. locks or | |
70 // events) would be overkill (it's fine for other threads to still see | |
71 // NONE_ACTIVE after the first Worker was created -- this is not possible for | |
72 // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no | |
73 // other threads are active). | |
74 // | 64 // |
75 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool | 65 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
76 // will be phased out completely otherwise). | 66 // will be phased out completely otherwise). |
77 enum AllPoolsState : subtle::Atomic32 { | 67 enum class AllPoolsState { |
78 NONE_ACTIVE, | 68 POST_TASK_DISABLED, |
79 WORKER_CREATED, | 69 USE_WORKER_POOL, |
80 REDIRECTED_TO_TASK_SCHEDULER, | 70 REDIRECTED_TO_TASK_SCHEDULER, |
81 }; | 71 }; |
82 subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; | 72 |
| 73 // TODO(fdoray): Change the initial state to POST_TASK_DISABLED. It is initially |
| 74 // USE_WORKER_POOL to avoid a revert of the CL that adds |
| 75 // debug::DumpWithoutCrashing() in case of waterfall failures. |
| 76 AllPoolsState g_all_pools_state = AllPoolsState::USE_WORKER_POOL; |
83 | 77 |
84 struct SequencedTask : public TrackingInfo { | 78 struct SequencedTask : public TrackingInfo { |
85 SequencedTask() | 79 SequencedTask() |
86 : sequence_token_id(0), | 80 : sequence_token_id(0), |
87 trace_id(0), | 81 trace_id(0), |
88 sequence_task_number(0), | 82 sequence_task_number(0), |
89 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} | 83 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
90 | 84 |
91 explicit SequencedTask(const tracked_objects::Location& from_here) | 85 explicit SequencedTask(const tracked_objects::Location& from_here) |
92 : base::TrackingInfo(from_here, TimeTicks()), | 86 : base::TrackingInfo(from_here, TimeTicks()), |
(...skipping 482 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
575 // Worker definitions --------------------------------------------------------- | 569 // Worker definitions --------------------------------------------------------- |
576 | 570 |
577 SequencedWorkerPool::Worker::Worker( | 571 SequencedWorkerPool::Worker::Worker( |
578 scoped_refptr<SequencedWorkerPool> worker_pool, | 572 scoped_refptr<SequencedWorkerPool> worker_pool, |
579 int thread_number, | 573 int thread_number, |
580 const std::string& prefix) | 574 const std::string& prefix) |
581 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), | 575 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), |
582 worker_pool_(std::move(worker_pool)), | 576 worker_pool_(std::move(worker_pool)), |
583 task_shutdown_behavior_(BLOCK_SHUTDOWN), | 577 task_shutdown_behavior_(BLOCK_SHUTDOWN), |
584 is_processing_task_(false) { | 578 is_processing_task_(false) { |
585 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 579 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
586 subtle::NoBarrier_Load(&g_all_pools_state)); | |
587 Start(); | 580 Start(); |
588 } | 581 } |
589 | 582 |
590 SequencedWorkerPool::Worker::~Worker() { | 583 SequencedWorkerPool::Worker::~Worker() { |
591 } | 584 } |
592 | 585 |
593 void SequencedWorkerPool::Worker::Run() { | 586 void SequencedWorkerPool::Worker::Run() { |
594 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 587 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
595 subtle::NoBarrier_Load(&g_all_pools_state)); | |
596 | 588 |
597 #if defined(OS_WIN) | 589 #if defined(OS_WIN) |
598 win::ScopedCOMInitializer com_initializer; | 590 win::ScopedCOMInitializer com_initializer; |
599 #endif | 591 #endif |
600 | 592 |
601 // Store a pointer to this worker in thread local storage for static function | 593 // Store a pointer to this worker in thread local storage for static function |
602 // access. | 594 // access. |
603 DCHECK(!lazy_tls_ptr_.Get().Get()); | 595 DCHECK(!lazy_tls_ptr_.Get().Get()); |
604 lazy_tls_ptr_.Get().Set(this); | 596 lazy_tls_ptr_.Get().Set(this); |
605 | 597 |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
684 return SequenceToken(LockedGetNamedTokenID(name)); | 676 return SequenceToken(LockedGetNamedTokenID(name)); |
685 } | 677 } |
686 | 678 |
687 bool SequencedWorkerPool::Inner::PostTask( | 679 bool SequencedWorkerPool::Inner::PostTask( |
688 const std::string* optional_token_name, | 680 const std::string* optional_token_name, |
689 SequenceToken sequence_token, | 681 SequenceToken sequence_token, |
690 WorkerShutdown shutdown_behavior, | 682 WorkerShutdown shutdown_behavior, |
691 const tracked_objects::Location& from_here, | 683 const tracked_objects::Location& from_here, |
692 const Closure& task, | 684 const Closure& task, |
693 TimeDelta delay) { | 685 TimeDelta delay) { |
| 686 // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a |
| 687 // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the |
| 688 // waterfall. https://crbug.com/622400 |
| 689 // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
| 690 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) |
| 691 debug::DumpWithoutCrashing(); |
| 692 |
694 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); | 693 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
695 SequencedTask sequenced(from_here); | 694 SequencedTask sequenced(from_here); |
696 sequenced.sequence_token_id = sequence_token.id_; | 695 sequenced.sequence_token_id = sequence_token.id_; |
697 sequenced.shutdown_behavior = shutdown_behavior; | 696 sequenced.shutdown_behavior = shutdown_behavior; |
698 sequenced.posted_from = from_here; | 697 sequenced.posted_from = from_here; |
699 sequenced.task = | 698 sequenced.task = |
700 shutdown_behavior == BLOCK_SHUTDOWN ? | 699 shutdown_behavior == BLOCK_SHUTDOWN ? |
701 base::MakeCriticalClosure(task) : task; | 700 base::MakeCriticalClosure(task) : task; |
702 sequenced.time_to_run = TimeTicks::Now() + delay; | 701 sequenced.time_to_run = TimeTicks::Now() + delay; |
703 | 702 |
(...skipping 29 matching lines...) Expand all Loading... |
733 "SequencedWorkerPool::Inner::PostTask", | 732 "SequencedWorkerPool::Inner::PostTask", |
734 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), | 733 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
735 TRACE_EVENT_FLAG_FLOW_OUT); | 734 TRACE_EVENT_FLAG_FLOW_OUT); |
736 | 735 |
737 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); | 736 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
738 | 737 |
739 // Now that we have the lock, apply the named token rules. | 738 // Now that we have the lock, apply the named token rules. |
740 if (optional_token_name) | 739 if (optional_token_name) |
741 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); | 740 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
742 | 741 |
743 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 742 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
744 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
745 if (!PostTaskToTaskScheduler(sequenced, delay)) | 743 if (!PostTaskToTaskScheduler(sequenced, delay)) |
746 return false; | 744 return false; |
747 } else { | 745 } else { |
748 pending_tasks_.insert(sequenced); | 746 pending_tasks_.insert(sequenced); |
749 | 747 |
750 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) | 748 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
751 blocking_shutdown_pending_task_count_++; | 749 blocking_shutdown_pending_task_count_++; |
752 | 750 |
753 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 751 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
754 } | 752 } |
755 } | 753 } |
756 | 754 |
757 if (subtle::NoBarrier_Load(&g_all_pools_state) != | 755 // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure |
758 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 756 // correct behavior if a task is posted to a SequencedWorkerPool before |
| 757 // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build. |
| 758 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
759 // Actually start the additional thread or signal an existing one outside | 759 // Actually start the additional thread or signal an existing one outside |
760 // the lock. | 760 // the lock. |
761 if (create_thread_id) | 761 if (create_thread_id) |
762 FinishStartingAdditionalThread(create_thread_id); | 762 FinishStartingAdditionalThread(create_thread_id); |
763 else | 763 else |
764 SignalHasWork(); | 764 SignalHasWork(); |
765 } | 765 } |
766 | 766 |
767 #if DCHECK_IS_ON() | 767 #if DCHECK_IS_ON() |
768 { | 768 { |
769 AutoLock lock_for_dcheck(lock_); | 769 AutoLock lock_for_dcheck(lock_); |
770 // Some variables are exposed in both modes for convenience but only really | 770 // Some variables are exposed in both modes for convenience but only really |
771 // intended for one of them at runtime, confirm exclusive usage here. | 771 // intended for one of them at runtime, confirm exclusive usage here. |
772 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 772 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
773 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
774 DCHECK(pending_tasks_.empty()); | 773 DCHECK(pending_tasks_.empty()); |
775 DCHECK_EQ(0, create_thread_id); | 774 DCHECK_EQ(0, create_thread_id); |
776 } else { | 775 } else { |
777 DCHECK(sequenced_task_runner_map_.empty()); | 776 DCHECK(sequenced_task_runner_map_.empty()); |
778 } | 777 } |
779 } | 778 } |
780 #endif // DCHECK_IS_ON() | 779 #endif // DCHECK_IS_ON() |
781 | 780 |
782 return true; | 781 return true; |
783 } | 782 } |
784 | 783 |
785 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( | 784 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
786 const SequencedTask& sequenced, | 785 const SequencedTask& sequenced, |
787 const TimeDelta& delay) { | 786 const TimeDelta& delay) { |
788 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 787 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
789 subtle::NoBarrier_Load(&g_all_pools_state)); | |
790 | 788 |
791 lock_.AssertAcquired(); | 789 lock_.AssertAcquired(); |
792 | 790 |
793 // Confirm that the TaskScheduler's shutdown behaviors use the same | 791 // Confirm that the TaskScheduler's shutdown behaviors use the same |
794 // underlying values as SequencedWorkerPool. | 792 // underlying values as SequencedWorkerPool. |
795 static_assert( | 793 static_assert( |
796 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == | 794 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
797 static_cast<int>(CONTINUE_ON_SHUTDOWN), | 795 static_cast<int>(CONTINUE_ON_SHUTDOWN), |
798 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | 796 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
799 "CONTINUE_ON_SHUTDOWN."); | 797 "CONTINUE_ON_SHUTDOWN."); |
(...skipping 13 matching lines...) Expand all Loading... |
813 .WithPriority(task_priority_) | 811 .WithPriority(task_priority_) |
814 .WithShutdownBehavior(task_shutdown_behavior); | 812 .WithShutdownBehavior(task_shutdown_behavior); |
815 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) | 813 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) |
816 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); | 814 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); |
817 } | 815 } |
818 | 816 |
819 scoped_refptr<TaskRunner> | 817 scoped_refptr<TaskRunner> |
820 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( | 818 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( |
821 int sequence_token_id, | 819 int sequence_token_id, |
822 const TaskTraits& traits) { | 820 const TaskTraits& traits) { |
823 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 821 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
824 subtle::NoBarrier_Load(&g_all_pools_state)); | |
825 | 822 |
826 lock_.AssertAcquired(); | 823 lock_.AssertAcquired(); |
827 | 824 |
828 static_assert( | 825 static_assert( |
829 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, | 826 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, |
830 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " | 827 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " |
831 "used as an index in |unsequenced_task_runners_|."); | 828 "used as an index in |unsequenced_task_runners_|."); |
832 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, | 829 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, |
833 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " | 830 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " |
834 "be used as an index in |unsequenced_task_runners_|."); | 831 "be used as an index in |unsequenced_task_runners_|."); |
(...skipping 16 matching lines...) Expand all Loading... |
851 task_runner = sequence_token_id | 848 task_runner = sequence_token_id |
852 ? CreateSequencedTaskRunnerWithTraits(traits) | 849 ? CreateSequencedTaskRunnerWithTraits(traits) |
853 : CreateTaskRunnerWithTraits(traits); | 850 : CreateTaskRunnerWithTraits(traits); |
854 } | 851 } |
855 | 852 |
856 return task_runner; | 853 return task_runner; |
857 } | 854 } |
858 | 855 |
859 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 856 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
860 AutoLock lock(lock_); | 857 AutoLock lock(lock_); |
861 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 858 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
862 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
863 if (!runs_tasks_on_verifier_) { | 859 if (!runs_tasks_on_verifier_) { |
864 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( | 860 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
865 TaskTraits().WithFileIO().WithPriority(task_priority_)); | 861 TaskTraits().WithFileIO().WithPriority(task_priority_)); |
866 } | 862 } |
867 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); | 863 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
868 } else { | 864 } else { |
869 return ContainsKey(threads_, PlatformThread::CurrentId()); | 865 return ContainsKey(threads_, PlatformThread::CurrentId()); |
870 } | 866 } |
871 } | 867 } |
872 | 868 |
873 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 869 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
874 SequenceToken sequence_token) const { | 870 SequenceToken sequence_token) const { |
875 DCHECK(sequence_token.IsValid()); | 871 DCHECK(sequence_token.IsValid()); |
876 | 872 |
877 AutoLock lock(lock_); | 873 AutoLock lock(lock_); |
878 | 874 |
879 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 875 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
880 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
881 const auto sequenced_task_runner_it = | 876 const auto sequenced_task_runner_it = |
882 sequenced_task_runner_map_.find(sequence_token.id_); | 877 sequenced_task_runner_map_.find(sequence_token.id_); |
883 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && | 878 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
884 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); | 879 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
885 } else { | 880 } else { |
886 ThreadMap::const_iterator found = | 881 ThreadMap::const_iterator found = |
887 threads_.find(PlatformThread::CurrentId()); | 882 threads_.find(PlatformThread::CurrentId()); |
888 return found != threads_.end() && found->second->is_processing_task() && | 883 return found != threads_.end() && found->second->is_processing_task() && |
889 sequence_token.Equals(found->second->task_sequence_token()); | 884 sequence_token.Equals(found->second->task_sequence_token()); |
890 } | 885 } |
891 } | 886 } |
892 | 887 |
893 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 888 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
894 void SequencedWorkerPool::Inner::CleanupForTesting() { | 889 void SequencedWorkerPool::Inner::CleanupForTesting() { |
895 DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state), | 890 DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
896 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); | |
897 AutoLock lock(lock_); | 891 AutoLock lock(lock_); |
898 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 892 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
899 if (shutdown_called_) | 893 if (shutdown_called_) |
900 return; | 894 return; |
901 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | 895 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
902 return; | 896 return; |
903 cleanup_state_ = CLEANUP_REQUESTED; | 897 cleanup_state_ = CLEANUP_REQUESTED; |
904 cleanup_idlers_ = 0; | 898 cleanup_idlers_ = 0; |
905 has_work_cv_.Signal(); | 899 has_work_cv_.Signal(); |
906 while (cleanup_state_ != CLEANUP_DONE) | 900 while (cleanup_state_ != CLEANUP_DONE) |
(...skipping 10 matching lines...) Expand all Loading... |
917 { | 911 { |
918 AutoLock lock(lock_); | 912 AutoLock lock(lock_); |
919 // Cleanup and Shutdown should not be called concurrently. | 913 // Cleanup and Shutdown should not be called concurrently. |
920 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 914 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
921 if (shutdown_called_) | 915 if (shutdown_called_) |
922 return; | 916 return; |
923 shutdown_called_ = true; | 917 shutdown_called_ = true; |
924 | 918 |
925 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 919 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
926 | 920 |
927 if (subtle::NoBarrier_Load(&g_all_pools_state) != | 921 if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL) |
928 AllPoolsState::WORKER_CREATED) { | |
929 return; | 922 return; |
930 } | |
931 | 923 |
932 // Tickle the threads. This will wake up a waiting one so it will know that | 924 // Tickle the threads. This will wake up a waiting one so it will know that |
933 // it can exit, which in turn will wake up any other waiting ones. | 925 // it can exit, which in turn will wake up any other waiting ones. |
934 SignalHasWork(); | 926 SignalHasWork(); |
935 | 927 |
936 // There are no pending or running tasks blocking shutdown, we're done. | 928 // There are no pending or running tasks blocking shutdown, we're done. |
937 if (CanShutdown()) | 929 if (CanShutdown()) |
938 return; | 930 return; |
939 } | 931 } |
940 | 932 |
(...skipping 18 matching lines...) Expand all Loading... |
959 TimeTicks::Now() - shutdown_wait_begin); | 951 TimeTicks::Now() - shutdown_wait_begin); |
960 #endif | 952 #endif |
961 } | 953 } |
962 | 954 |
963 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { | 955 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
964 AutoLock lock(lock_); | 956 AutoLock lock(lock_); |
965 return shutdown_called_; | 957 return shutdown_called_; |
966 } | 958 } |
967 | 959 |
968 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 960 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
969 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 961 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
970 subtle::NoBarrier_Load(&g_all_pools_state)); | |
971 { | 962 { |
972 AutoLock lock(lock_); | 963 AutoLock lock(lock_); |
973 DCHECK(thread_being_created_); | 964 DCHECK(thread_being_created_); |
974 thread_being_created_ = false; | 965 thread_being_created_ = false; |
975 auto result = threads_.insert( | 966 auto result = threads_.insert( |
976 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); | 967 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); |
977 DCHECK(result.second); | 968 DCHECK(result.second); |
978 | 969 |
979 while (true) { | 970 while (true) { |
980 #if defined(OS_MACOSX) | 971 #if defined(OS_MACOSX) |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1092 | 1083 |
1093 // We noticed we should exit. Wake up the next worker so it knows it should | 1084 // We noticed we should exit. Wake up the next worker so it knows it should |
1094 // exit as well (because the Shutdown() code only signals once). | 1085 // exit as well (because the Shutdown() code only signals once). |
1095 SignalHasWork(); | 1086 SignalHasWork(); |
1096 | 1087 |
1097 // Possibly unblock shutdown. | 1088 // Possibly unblock shutdown. |
1098 can_shutdown_cv_.Signal(); | 1089 can_shutdown_cv_.Signal(); |
1099 } | 1090 } |
1100 | 1091 |
1101 void SequencedWorkerPool::Inner::HandleCleanup() { | 1092 void SequencedWorkerPool::Inner::HandleCleanup() { |
1102 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1093 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1103 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1104 | 1094 |
1105 lock_.AssertAcquired(); | 1095 lock_.AssertAcquired(); |
1106 if (cleanup_state_ == CLEANUP_DONE) | 1096 if (cleanup_state_ == CLEANUP_DONE) |
1107 return; | 1097 return; |
1108 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1098 if (cleanup_state_ == CLEANUP_REQUESTED) { |
1109 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1099 // We win, we get to do the cleanup as soon as the others wise up and idle. |
1110 cleanup_state_ = CLEANUP_STARTING; | 1100 cleanup_state_ = CLEANUP_STARTING; |
1111 while (thread_being_created_ || | 1101 while (thread_being_created_ || |
1112 cleanup_idlers_ != threads_.size() - 1) { | 1102 cleanup_idlers_ != threads_.size() - 1) { |
1113 has_work_cv_.Signal(); | 1103 has_work_cv_.Signal(); |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1160 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1150 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
1161 lock_.AssertAcquired(); | 1151 lock_.AssertAcquired(); |
1162 // We assume that we never create enough tasks to wrap around. | 1152 // We assume that we never create enough tasks to wrap around. |
1163 return next_sequence_task_number_++; | 1153 return next_sequence_task_number_++; |
1164 } | 1154 } |
1165 | 1155 |
1166 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1156 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
1167 SequencedTask* task, | 1157 SequencedTask* task, |
1168 TimeDelta* wait_time, | 1158 TimeDelta* wait_time, |
1169 std::vector<Closure>* delete_these_outside_lock) { | 1159 std::vector<Closure>* delete_these_outside_lock) { |
1170 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1160 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1171 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1172 | 1161 |
1173 lock_.AssertAcquired(); | 1162 lock_.AssertAcquired(); |
1174 | 1163 |
1175 // Find the next task with a sequence token that's not currently in use. | 1164 // Find the next task with a sequence token that's not currently in use. |
1176 // If the token is in use, that means another thread is running something | 1165 // If the token is in use, that means another thread is running something |
1177 // in that sequence, and we can't run it without going out-of-order. | 1166 // in that sequence, and we can't run it without going out-of-order. |
1178 // | 1167 // |
1179 // This algorithm is simple and fair, but inefficient in some cases. For | 1168 // This algorithm is simple and fair, but inefficient in some cases. For |
1180 // example, say somebody schedules 1000 slow tasks with the same sequence | 1169 // example, say somebody schedules 1000 slow tasks with the same sequence |
1181 // number. We'll have to go through all those tasks each time we feel like | 1170 // number. We'll have to go through all those tasks each time we feel like |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1249 } | 1238 } |
1250 | 1239 |
1251 status = GET_WORK_FOUND; | 1240 status = GET_WORK_FOUND; |
1252 break; | 1241 break; |
1253 } | 1242 } |
1254 | 1243 |
1255 return status; | 1244 return status; |
1256 } | 1245 } |
1257 | 1246 |
1258 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { | 1247 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
1259 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1248 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1260 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1261 | 1249 |
1262 lock_.AssertAcquired(); | 1250 lock_.AssertAcquired(); |
1263 | 1251 |
1264 // Mark the task's sequence number as in use. | 1252 // Mark the task's sequence number as in use. |
1265 if (task.sequence_token_id) | 1253 if (task.sequence_token_id) |
1266 current_sequences_.insert(task.sequence_token_id); | 1254 current_sequences_.insert(task.sequence_token_id); |
1267 | 1255 |
1268 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN | 1256 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN |
1269 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread | 1257 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread |
1270 // completes. | 1258 // completes. |
(...skipping 12 matching lines...) Expand all Loading... |
1283 // if there is one waiting to pick up the next task. | 1271 // if there is one waiting to pick up the next task. |
1284 // | 1272 // |
1285 // Note that we really need to do this *before* running the task, not | 1273 // Note that we really need to do this *before* running the task, not |
1286 // after. Otherwise, if more than one task is posted, the creation of the | 1274 // after. Otherwise, if more than one task is posted, the creation of the |
1287 // second thread (since we only create one at a time) will be blocked by | 1275 // second thread (since we only create one at a time) will be blocked by |
1288 // the execution of the first task, which could be arbitrarily long. | 1276 // the execution of the first task, which could be arbitrarily long. |
1289 return PrepareToStartAdditionalThreadIfHelpful(); | 1277 return PrepareToStartAdditionalThreadIfHelpful(); |
1290 } | 1278 } |
1291 | 1279 |
1292 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 1280 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
1293 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1281 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1294 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1295 | 1282 |
1296 lock_.AssertAcquired(); | 1283 lock_.AssertAcquired(); |
1297 | 1284 |
1298 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { | 1285 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
1299 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 1286 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
1300 blocking_shutdown_thread_count_--; | 1287 blocking_shutdown_thread_count_--; |
1301 } | 1288 } |
1302 | 1289 |
1303 if (task.sequence_token_id) | 1290 if (task.sequence_token_id) |
1304 current_sequences_.erase(task.sequence_token_id); | 1291 current_sequences_.erase(task.sequence_token_id); |
1305 } | 1292 } |
1306 | 1293 |
1307 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 1294 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
1308 int sequence_token_id) const { | 1295 int sequence_token_id) const { |
1309 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1296 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
1310 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1311 | 1297 |
1312 lock_.AssertAcquired(); | 1298 lock_.AssertAcquired(); |
1313 return !sequence_token_id || | 1299 return !sequence_token_id || |
1314 current_sequences_.find(sequence_token_id) == | 1300 current_sequences_.find(sequence_token_id) == |
1315 current_sequences_.end(); | 1301 current_sequences_.end(); |
1316 } | 1302 } |
1317 | 1303 |
1318 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { | 1304 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
1319 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1305 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
1320 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1321 | 1306 |
1322 lock_.AssertAcquired(); | 1307 lock_.AssertAcquired(); |
1323 // How thread creation works: | 1308 // How thread creation works: |
1324 // | 1309 // |
1325 // We'de like to avoid creating threads with the lock held. However, we | 1310 // We'de like to avoid creating threads with the lock held. However, we |
1326 // need to be sure that we have an accurate accounting of the threads for | 1311 // need to be sure that we have an accurate accounting of the threads for |
1327 // proper Joining and deltion on shutdown. | 1312 // proper Joining and deltion on shutdown. |
1328 // | 1313 // |
1329 // We need to figure out if we need another thread with the lock held, which | 1314 // We need to figure out if we need another thread with the lock held, which |
1330 // is what this function does. It then marks us as in the process of creating | 1315 // is what this function does. It then marks us as in the process of creating |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1362 thread_being_created_ = true; | 1347 thread_being_created_ = true; |
1363 return static_cast<int>(threads_.size() + 1); | 1348 return static_cast<int>(threads_.size() + 1); |
1364 } | 1349 } |
1365 } | 1350 } |
1366 } | 1351 } |
1367 return 0; | 1352 return 0; |
1368 } | 1353 } |
1369 | 1354 |
1370 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 1355 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
1371 int thread_number) { | 1356 int thread_number) { |
1372 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1357 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1373 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1374 | 1358 |
1375 // Called outside of the lock. | 1359 // Called outside of the lock. |
1376 DCHECK_GT(thread_number, 0); | 1360 DCHECK_GT(thread_number, 0); |
1377 | 1361 |
1378 if (subtle::NoBarrier_Load(&g_all_pools_state) != | |
1379 AllPoolsState::WORKER_CREATED) { | |
1380 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, | |
1381 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1382 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED); | |
1383 } | |
1384 | |
1385 // The worker is assigned to the list when the thread actually starts, which | 1362 // The worker is assigned to the list when the thread actually starts, which |
1386 // will manage the memory of the pointer. | 1363 // will manage the memory of the pointer. |
1387 new Worker(worker_pool_, thread_number, thread_name_prefix_); | 1364 new Worker(worker_pool_, thread_number, thread_name_prefix_); |
1388 } | 1365 } |
1389 | 1366 |
1390 void SequencedWorkerPool::Inner::SignalHasWork() { | 1367 void SequencedWorkerPool::Inner::SignalHasWork() { |
1391 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1368 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
1392 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1393 | 1369 |
1394 has_work_cv_.Signal(); | 1370 has_work_cv_.Signal(); |
1395 if (testing_observer_) { | 1371 if (testing_observer_) { |
1396 testing_observer_->OnHasWork(); | 1372 testing_observer_->OnHasWork(); |
1397 } | 1373 } |
1398 } | 1374 } |
1399 | 1375 |
1400 bool SequencedWorkerPool::Inner::CanShutdown() const { | 1376 bool SequencedWorkerPool::Inner::CanShutdown() const { |
1401 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1377 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1402 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1403 lock_.AssertAcquired(); | 1378 lock_.AssertAcquired(); |
1404 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 1379 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
1405 return !thread_being_created_ && | 1380 return !thread_being_created_ && |
1406 blocking_shutdown_thread_count_ == 0 && | 1381 blocking_shutdown_thread_count_ == 0 && |
1407 blocking_shutdown_pending_task_count_ == 0; | 1382 blocking_shutdown_pending_task_count_ == 0; |
1408 } | 1383 } |
1409 | 1384 |
1410 base::StaticAtomicSequenceNumber | 1385 base::StaticAtomicSequenceNumber |
1411 SequencedWorkerPool::Inner::g_last_sequence_number_; | 1386 SequencedWorkerPool::Inner::g_last_sequence_number_; |
1412 | 1387 |
(...skipping 17 matching lines...) Expand all Loading... |
1430 scoped_refptr<SequencedWorkerPool> | 1405 scoped_refptr<SequencedWorkerPool> |
1431 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1406 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
1432 Worker* worker = Worker::GetForCurrentThread(); | 1407 Worker* worker = Worker::GetForCurrentThread(); |
1433 if (!worker) | 1408 if (!worker) |
1434 return nullptr; | 1409 return nullptr; |
1435 | 1410 |
1436 return worker->worker_pool(); | 1411 return worker->worker_pool(); |
1437 } | 1412 } |
1438 | 1413 |
1439 // static | 1414 // static |
1440 void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { | 1415 void SequencedWorkerPool::EnableForProcess() { |
1441 DCHECK(TaskScheduler::GetInstance()); | 1416 // TODO(fdoray): Uncomment this line. It is initially commented to avoid a |
1442 // Hitting this DCHECK indicates that a task was posted to a | 1417 // revert of the CL that adds debug::DumpWithoutCrashing() in case of |
1443 // SequencedWorkerPool before the TaskScheduler was initialized and | 1418 // waterfall failures. |
1444 // redirected, posting task to SequencedWorkerPools needs to at least be | 1419 // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
1445 // delayed until after that point. | 1420 g_all_pools_state = AllPoolsState::USE_WORKER_POOL; |
1446 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, | |
1447 subtle::NoBarrier_Load(&g_all_pools_state)); | |
1448 subtle::NoBarrier_Store(&g_all_pools_state, | |
1449 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); | |
1450 } | 1421 } |
1451 | 1422 |
1452 // static | 1423 // static |
1453 void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { | 1424 void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() { |
1454 // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER | 1425 // TODO(fdoray): Uncomment this line. It is initially commented to avoid a |
1455 // to stop redirecting tasks. It can also be called when the current state is | 1426 // revert of the CL that adds debug::DumpWithoutCrashing() in case of |
1456 // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called | 1427 // waterfall failures. |
1457 // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has | 1428 // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
1458 // been created if this isn't called). | 1429 DCHECK(TaskScheduler::GetInstance()); |
1459 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE); | 1430 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
| 1431 } |
| 1432 |
| 1433 // static |
| 1434 void SequencedWorkerPool::DisableForProcessForTesting() { |
| 1435 g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; |
| 1436 } |
| 1437 |
| 1438 // static |
| 1439 bool SequencedWorkerPool::IsEnabled() { |
| 1440 return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED; |
1460 } | 1441 } |
1461 | 1442 |
1462 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1443 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1463 const std::string& thread_name_prefix, | 1444 const std::string& thread_name_prefix, |
1464 base::TaskPriority task_priority) | 1445 base::TaskPriority task_priority) |
1465 : constructor_task_runner_(SequencedTaskRunnerHandle::Get()), | 1446 : constructor_task_runner_(SequencedTaskRunnerHandle::Get()), |
1466 inner_(new Inner(this, | 1447 inner_(new Inner(this, |
1467 max_threads, | 1448 max_threads, |
1468 thread_name_prefix, | 1449 thread_name_prefix, |
1469 task_priority, | 1450 task_priority, |
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1588 return PostDelayedWorkerTask(from_here, task, delay); | 1569 return PostDelayedWorkerTask(from_here, task, delay); |
1589 } | 1570 } |
1590 | 1571 |
1591 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1572 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
1592 return inner_->RunsTasksOnCurrentThread(); | 1573 return inner_->RunsTasksOnCurrentThread(); |
1593 } | 1574 } |
1594 | 1575 |
1595 void SequencedWorkerPool::FlushForTesting() { | 1576 void SequencedWorkerPool::FlushForTesting() { |
1596 DCHECK(!RunsTasksOnCurrentThread()); | 1577 DCHECK(!RunsTasksOnCurrentThread()); |
1597 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 1578 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
1598 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 1579 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
1599 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
1600 // TODO(gab): Remove this if http://crbug.com/622400 fails. | 1580 // TODO(gab): Remove this if http://crbug.com/622400 fails. |
1601 TaskScheduler::GetInstance()->FlushForTesting(); | 1581 TaskScheduler::GetInstance()->FlushForTesting(); |
1602 } else { | 1582 } else { |
1603 inner_->CleanupForTesting(); | 1583 inner_->CleanupForTesting(); |
1604 } | 1584 } |
1605 } | 1585 } |
1606 | 1586 |
1607 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1587 void SequencedWorkerPool::SignalHasWorkForTesting() { |
1608 inner_->SignalHasWorkForTesting(); | 1588 inner_->SignalHasWorkForTesting(); |
1609 } | 1589 } |
1610 | 1590 |
1611 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1591 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1612 DCHECK(constructor_task_runner_->RunsTasksOnCurrentThread()); | 1592 DCHECK(constructor_task_runner_->RunsTasksOnCurrentThread()); |
1613 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1593 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1614 } | 1594 } |
1615 | 1595 |
1616 bool SequencedWorkerPool::IsShutdownInProgress() { | 1596 bool SequencedWorkerPool::IsShutdownInProgress() { |
1617 return inner_->IsShutdownInProgress(); | 1597 return inner_->IsShutdownInProgress(); |
1618 } | 1598 } |
1619 | 1599 |
1620 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1600 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1621 SequenceToken sequence_token) const { | 1601 SequenceToken sequence_token) const { |
1622 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1602 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1623 } | 1603 } |
1624 | 1604 |
1625 } // namespace base | 1605 } // namespace base |
OLD | NEW |