Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(246)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2295323004: Fix |g_all_pools_state| data race by using atomics. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <utility> 13 #include <utility>
14 #include <vector> 14 #include <vector>
15 15
16 #include "base/atomic_sequence_num.h" 16 #include "base/atomic_sequence_num.h"
17 #include "base/atomicops.h"
17 #include "base/callback.h" 18 #include "base/callback.h"
18 #include "base/compiler_specific.h" 19 #include "base/compiler_specific.h"
19 #include "base/critical_closure.h" 20 #include "base/critical_closure.h"
20 #include "base/lazy_instance.h" 21 #include "base/lazy_instance.h"
21 #include "base/logging.h" 22 #include "base/logging.h"
22 #include "base/macros.h" 23 #include "base/macros.h"
23 #include "base/memory/ptr_util.h" 24 #include "base/memory/ptr_util.h"
24 #include "base/stl_util.h" 25 #include "base/stl_util.h"
25 #include "base/strings/stringprintf.h" 26 #include "base/strings/stringprintf.h"
26 #include "base/synchronization/condition_variable.h" 27 #include "base/synchronization/condition_variable.h"
(...skipping 24 matching lines...) Expand all
51 namespace base { 52 namespace base {
52 53
53 namespace { 54 namespace {
54 55
55 // An enum representing the state of all pools. Any given process should only 56 // An enum representing the state of all pools. Any given process should only
56 // ever transition from NONE_ACTIVE to the active states, transitions between 57 // ever transition from NONE_ACTIVE to the active states, transitions between
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition 58 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called 59 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created 60 // and the WORKER_CREATED transition occurs when a Worker needs to be created
60 // because the first task was posted and the state is still NONE_ACTIVE. 61 // because the first task was posted and the state is still NONE_ACTIVE.
62 // |g_all_pools_state| uses relaxed atomic operations to ensure no data race
63 // between reads/writes, strict memory ordering isn't required per no other
64 // state being inferred from its value. Explicit synchronization (e.g. locks or
65 // events) would be overkill (it's fine for other threads to still see
66 // NONE_ACTIVE after the first Worker was created -- this is not possible for
67 // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
68 // other threads are active).
61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool 69 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
62 // will be phased out completely otherwise). 70 // will be phased out completely otherwise).
63 enum class AllPoolsState { 71 enum AllPoolsState : subtle::Atomic32 {
64 NONE_ACTIVE, 72 NONE_ACTIVE,
65 WORKER_CREATED, 73 WORKER_CREATED,
66 REDIRECTED_TO_TASK_SCHEDULER, 74 REDIRECTED_TO_TASK_SCHEDULER,
67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE; 75 };
76 subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
68 77
69 struct SequencedTask : public TrackingInfo { 78 struct SequencedTask : public TrackingInfo {
70 SequencedTask() 79 SequencedTask()
71 : sequence_token_id(0), 80 : sequence_token_id(0),
72 trace_id(0), 81 trace_id(0),
73 sequence_task_number(0), 82 sequence_task_number(0),
74 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 83 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
75 84
76 explicit SequencedTask(const tracked_objects::Location& from_here) 85 explicit SequencedTask(const tracked_objects::Location& from_here)
77 : base::TrackingInfo(from_here, TimeTicks()), 86 : base::TrackingInfo(from_here, TimeTicks()),
(...skipping 467 matching lines...) Expand 10 before | Expand all | Expand 10 after
545 // Worker definitions --------------------------------------------------------- 554 // Worker definitions ---------------------------------------------------------
546 555
547 SequencedWorkerPool::Worker::Worker( 556 SequencedWorkerPool::Worker::Worker(
548 scoped_refptr<SequencedWorkerPool> worker_pool, 557 scoped_refptr<SequencedWorkerPool> worker_pool,
549 int thread_number, 558 int thread_number,
550 const std::string& prefix) 559 const std::string& prefix)
551 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 560 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
552 worker_pool_(std::move(worker_pool)), 561 worker_pool_(std::move(worker_pool)),
553 task_shutdown_behavior_(BLOCK_SHUTDOWN), 562 task_shutdown_behavior_(BLOCK_SHUTDOWN),
554 is_processing_task_(false) { 563 is_processing_task_(false) {
555 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 564 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
565 subtle::NoBarrier_Load(&g_all_pools_state));
556 Start(); 566 Start();
557 } 567 }
558 568
559 SequencedWorkerPool::Worker::~Worker() { 569 SequencedWorkerPool::Worker::~Worker() {
560 } 570 }
561 571
562 void SequencedWorkerPool::Worker::Run() { 572 void SequencedWorkerPool::Worker::Run() {
563 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 573 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
574 subtle::NoBarrier_Load(&g_all_pools_state));
564 575
565 #if defined(OS_WIN) 576 #if defined(OS_WIN)
566 win::ScopedCOMInitializer com_initializer; 577 win::ScopedCOMInitializer com_initializer;
567 #endif 578 #endif
568 579
569 // Store a pointer to this worker in thread local storage for static function 580 // Store a pointer to this worker in thread local storage for static function
570 // access. 581 // access.
571 DCHECK(!lazy_tls_ptr_.Get().Get()); 582 DCHECK(!lazy_tls_ptr_.Get().Get());
572 lazy_tls_ptr_.Get().Set(this); 583 lazy_tls_ptr_.Get().Set(this);
573 584
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
699 "SequencedWorkerPool::Inner::PostTask", 710 "SequencedWorkerPool::Inner::PostTask",
700 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 711 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
701 TRACE_EVENT_FLAG_FLOW_OUT); 712 TRACE_EVENT_FLAG_FLOW_OUT);
702 713
703 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 714 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
704 715
705 // Now that we have the lock, apply the named token rules. 716 // Now that we have the lock, apply the named token rules.
706 if (optional_token_name) 717 if (optional_token_name)
707 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 718 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
708 719
709 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 720 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
721 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
710 PostTaskToTaskScheduler(sequenced); 722 PostTaskToTaskScheduler(sequenced);
711 } else { 723 } else {
712 pending_tasks_.insert(sequenced); 724 pending_tasks_.insert(sequenced);
713 725
714 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) 726 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
715 blocking_shutdown_pending_task_count_++; 727 blocking_shutdown_pending_task_count_++;
716 728
717 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 729 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
718 } 730 }
719 } 731 }
720 732
721 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 733 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
734 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
722 // Actually start the additional thread or signal an existing one outside 735 // Actually start the additional thread or signal an existing one outside
723 // the lock. 736 // the lock.
724 if (create_thread_id) 737 if (create_thread_id)
725 FinishStartingAdditionalThread(create_thread_id); 738 FinishStartingAdditionalThread(create_thread_id);
726 else 739 else
727 SignalHasWork(); 740 SignalHasWork();
728 } 741 }
729 742
730 #if DCHECK_IS_ON() 743 #if DCHECK_IS_ON()
731 { 744 {
732 AutoLock lock_for_dcheck(lock_); 745 AutoLock lock_for_dcheck(lock_);
733 // Some variables are exposed in both modes for convenience but only really 746 // Some variables are exposed in both modes for convenience but only really
734 // intended for one of them at runtime, confirm exclusive usage here. 747 // intended for one of them at runtime, confirm exclusive usage here.
735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 748 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
749 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
736 DCHECK(pending_tasks_.empty()); 750 DCHECK(pending_tasks_.empty());
737 DCHECK_EQ(0, create_thread_id); 751 DCHECK_EQ(0, create_thread_id);
738 } else { 752 } else {
739 DCHECK(sequenced_task_runner_map_.empty()); 753 DCHECK(sequenced_task_runner_map_.empty());
740 } 754 }
741 } 755 }
742 #endif // DCHECK_IS_ON() 756 #endif // DCHECK_IS_ON()
743 757
744 return true; 758 return true;
745 } 759 }
746 760
747 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( 761 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
748 const SequencedTask& sequenced) { 762 const SequencedTask& sequenced) {
749 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 763 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
764 subtle::NoBarrier_Load(&g_all_pools_state));
750 765
751 lock_.AssertAcquired(); 766 lock_.AssertAcquired();
752 767
753 // Confirm that the TaskScheduler's shutdown behaviors use the same 768 // Confirm that the TaskScheduler's shutdown behaviors use the same
754 // underlying values as SequencedWorkerPool. 769 // underlying values as SequencedWorkerPool.
755 static_assert( 770 static_assert(
756 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 771 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
757 static_cast<int>(CONTINUE_ON_SHUTDOWN), 772 static_cast<int>(CONTINUE_ON_SHUTDOWN),
758 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 773 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
759 "CONTINUE_ON_SHUTDOWN."); 774 "CONTINUE_ON_SHUTDOWN.");
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
801 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() 816 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask()
802 // directly. 817 // directly.
803 DCHECK_GT(max_threads_, 1U); 818 DCHECK_GT(max_threads_, 1U);
804 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, 819 base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
805 sequenced.task); 820 sequenced.task);
806 } 821 }
807 } 822 }
808 823
809 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 824 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
810 AutoLock lock(lock_); 825 AutoLock lock(lock_);
811 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 826 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
827 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
812 if (!runs_tasks_on_verifier_) { 828 if (!runs_tasks_on_verifier_) {
813 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( 829 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
814 TaskTraits().WithFileIO().WithPriority(task_priority_), 830 TaskTraits().WithFileIO().WithPriority(task_priority_),
815 ExecutionMode::PARALLEL); 831 ExecutionMode::PARALLEL);
816 } 832 }
817 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); 833 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
818 } else { 834 } else {
819 return ContainsKey(threads_, PlatformThread::CurrentId()); 835 return ContainsKey(threads_, PlatformThread::CurrentId());
820 } 836 }
821 } 837 }
822 838
823 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 839 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
824 SequenceToken sequence_token) const { 840 SequenceToken sequence_token) const {
825 AutoLock lock(lock_); 841 AutoLock lock(lock_);
826 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 842 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
843 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
827 // TODO(gab): This currently only verifies that the current thread is a 844 // TODO(gab): This currently only verifies that the current thread is a
828 // thread on which a task bound to |sequence_token| *could* run, but it 845 // thread on which a task bound to |sequence_token| *could* run, but it
829 // doesn't verify that the current is *currently running* a task bound to 846 // doesn't verify that the current is *currently running* a task bound to
830 // |sequence_token|. 847 // |sequence_token|.
831 const auto sequenced_task_runner_it = 848 const auto sequenced_task_runner_it =
832 sequenced_task_runner_map_.find(sequence_token.id_); 849 sequenced_task_runner_map_.find(sequence_token.id_);
833 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && 850 return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
834 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); 851 sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
835 } else { 852 } else {
836 ThreadMap::const_iterator found = 853 ThreadMap::const_iterator found =
(...skipping 30 matching lines...) Expand all
867 int max_new_blocking_tasks_after_shutdown) { 884 int max_new_blocking_tasks_after_shutdown) {
868 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 885 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
869 { 886 {
870 AutoLock lock(lock_); 887 AutoLock lock(lock_);
871 // Cleanup and Shutdown should not be called concurrently. 888 // Cleanup and Shutdown should not be called concurrently.
872 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 889 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
873 if (shutdown_called_) 890 if (shutdown_called_)
874 return; 891 return;
875 shutdown_called_ = true; 892 shutdown_called_ = true;
876 893
877 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) 894 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
895 AllPoolsState::WORKER_CREATED)
878 return; 896 return;
879 897
880 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 898 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
881 899
882 // Tickle the threads. This will wake up a waiting one so it will know that 900 // Tickle the threads. This will wake up a waiting one so it will know that
883 // it can exit, which in turn will wake up any other waiting ones. 901 // it can exit, which in turn will wake up any other waiting ones.
884 SignalHasWork(); 902 SignalHasWork();
885 903
886 // There are no pending or running tasks blocking shutdown, we're done. 904 // There are no pending or running tasks blocking shutdown, we're done.
887 if (CanShutdown()) 905 if (CanShutdown())
(...skipping 21 matching lines...) Expand all
909 TimeTicks::Now() - shutdown_wait_begin); 927 TimeTicks::Now() - shutdown_wait_begin);
910 #endif 928 #endif
911 } 929 }
912 930
913 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 931 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
914 AutoLock lock(lock_); 932 AutoLock lock(lock_);
915 return shutdown_called_; 933 return shutdown_called_;
916 } 934 }
917 935
918 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 936 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
919 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 937 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
938 subtle::NoBarrier_Load(&g_all_pools_state));
920 { 939 {
921 AutoLock lock(lock_); 940 AutoLock lock(lock_);
922 DCHECK(thread_being_created_); 941 DCHECK(thread_being_created_);
923 thread_being_created_ = false; 942 thread_being_created_ = false;
924 auto result = threads_.insert( 943 auto result = threads_.insert(
925 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 944 std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
926 DCHECK(result.second); 945 DCHECK(result.second);
927 946
928 while (true) { 947 while (true) {
929 #if defined(OS_MACOSX) 948 #if defined(OS_MACOSX)
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
1044 1063
1045 // We noticed we should exit. Wake up the next worker so it knows it should 1064 // We noticed we should exit. Wake up the next worker so it knows it should
1046 // exit as well (because the Shutdown() code only signals once). 1065 // exit as well (because the Shutdown() code only signals once).
1047 SignalHasWork(); 1066 SignalHasWork();
1048 1067
1049 // Possibly unblock shutdown. 1068 // Possibly unblock shutdown.
1050 can_shutdown_cv_.Signal(); 1069 can_shutdown_cv_.Signal();
1051 } 1070 }
1052 1071
1053 void SequencedWorkerPool::Inner::HandleCleanup() { 1072 void SequencedWorkerPool::Inner::HandleCleanup() {
1054 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); 1073 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1074 subtle::NoBarrier_Load(&g_all_pools_state));
1055 1075
1056 lock_.AssertAcquired(); 1076 lock_.AssertAcquired();
1057 if (cleanup_state_ == CLEANUP_DONE) 1077 if (cleanup_state_ == CLEANUP_DONE)
1058 return; 1078 return;
1059 if (cleanup_state_ == CLEANUP_REQUESTED) { 1079 if (cleanup_state_ == CLEANUP_REQUESTED) {
1060 // We win, we get to do the cleanup as soon as the others wise up and idle. 1080 // We win, we get to do the cleanup as soon as the others wise up and idle.
1061 cleanup_state_ = CLEANUP_STARTING; 1081 cleanup_state_ = CLEANUP_STARTING;
1062 while (thread_being_created_ || 1082 while (thread_being_created_ ||
1063 cleanup_idlers_ != threads_.size() - 1) { 1083 cleanup_idlers_ != threads_.size() - 1) {
1064 has_work_cv_.Signal(); 1084 has_work_cv_.Signal();
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
1111 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1131 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
1112 lock_.AssertAcquired(); 1132 lock_.AssertAcquired();
1113 // We assume that we never create enough tasks to wrap around. 1133 // We assume that we never create enough tasks to wrap around.
1114 return next_sequence_task_number_++; 1134 return next_sequence_task_number_++;
1115 } 1135 }
1116 1136
1117 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1137 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
1118 SequencedTask* task, 1138 SequencedTask* task,
1119 TimeDelta* wait_time, 1139 TimeDelta* wait_time,
1120 std::vector<Closure>* delete_these_outside_lock) { 1140 std::vector<Closure>* delete_these_outside_lock) {
1121 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); 1141 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1142 subtle::NoBarrier_Load(&g_all_pools_state));
1122 1143
1123 lock_.AssertAcquired(); 1144 lock_.AssertAcquired();
1124 1145
1125 // Find the next task with a sequence token that's not currently in use. 1146 // Find the next task with a sequence token that's not currently in use.
1126 // If the token is in use, that means another thread is running something 1147 // If the token is in use, that means another thread is running something
1127 // in that sequence, and we can't run it without going out-of-order. 1148 // in that sequence, and we can't run it without going out-of-order.
1128 // 1149 //
1129 // This algorithm is simple and fair, but inefficient in some cases. For 1150 // This algorithm is simple and fair, but inefficient in some cases. For
1130 // example, say somebody schedules 1000 slow tasks with the same sequence 1151 // example, say somebody schedules 1000 slow tasks with the same sequence
1131 // number. We'll have to go through all those tasks each time we feel like 1152 // number. We'll have to go through all those tasks each time we feel like
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
1199 } 1220 }
1200 1221
1201 status = GET_WORK_FOUND; 1222 status = GET_WORK_FOUND;
1202 break; 1223 break;
1203 } 1224 }
1204 1225
1205 return status; 1226 return status;
1206 } 1227 }
1207 1228
1208 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1229 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1209 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); 1230 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1231 subtle::NoBarrier_Load(&g_all_pools_state));
1210 1232
1211 lock_.AssertAcquired(); 1233 lock_.AssertAcquired();
1212 1234
1213 // Mark the task's sequence number as in use. 1235 // Mark the task's sequence number as in use.
1214 if (task.sequence_token_id) 1236 if (task.sequence_token_id)
1215 current_sequences_.insert(task.sequence_token_id); 1237 current_sequences_.insert(task.sequence_token_id);
1216 1238
1217 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1239 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1218 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1240 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1219 // completes. 1241 // completes.
(...skipping 12 matching lines...) Expand all
1232 // if there is one waiting to pick up the next task. 1254 // if there is one waiting to pick up the next task.
1233 // 1255 //
1234 // Note that we really need to do this *before* running the task, not 1256 // Note that we really need to do this *before* running the task, not
1235 // after. Otherwise, if more than one task is posted, the creation of the 1257 // after. Otherwise, if more than one task is posted, the creation of the
1236 // second thread (since we only create one at a time) will be blocked by 1258 // second thread (since we only create one at a time) will be blocked by
1237 // the execution of the first task, which could be arbitrarily long. 1259 // the execution of the first task, which could be arbitrarily long.
1238 return PrepareToStartAdditionalThreadIfHelpful(); 1260 return PrepareToStartAdditionalThreadIfHelpful();
1239 } 1261 }
1240 1262
1241 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1263 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1242 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); 1264 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1265 subtle::NoBarrier_Load(&g_all_pools_state));
1243 1266
1244 lock_.AssertAcquired(); 1267 lock_.AssertAcquired();
1245 1268
1246 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1269 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1247 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1270 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1248 blocking_shutdown_thread_count_--; 1271 blocking_shutdown_thread_count_--;
1249 } 1272 }
1250 1273
1251 if (task.sequence_token_id) 1274 if (task.sequence_token_id)
1252 current_sequences_.erase(task.sequence_token_id); 1275 current_sequences_.erase(task.sequence_token_id);
1253 } 1276 }
1254 1277
1255 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1278 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1256 int sequence_token_id) const { 1279 int sequence_token_id) const {
1257 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1280 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1281 subtle::NoBarrier_Load(&g_all_pools_state));
1258 1282
1259 lock_.AssertAcquired(); 1283 lock_.AssertAcquired();
1260 return !sequence_token_id || 1284 return !sequence_token_id ||
1261 current_sequences_.find(sequence_token_id) == 1285 current_sequences_.find(sequence_token_id) ==
1262 current_sequences_.end(); 1286 current_sequences_.end();
1263 } 1287 }
1264 1288
1265 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1289 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1266 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1290 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1291 subtle::NoBarrier_Load(&g_all_pools_state));
1267 1292
1268 lock_.AssertAcquired(); 1293 lock_.AssertAcquired();
1269 // How thread creation works: 1294 // How thread creation works:
1270 // 1295 //
1271 // We'de like to avoid creating threads with the lock held. However, we 1296 // We'de like to avoid creating threads with the lock held. However, we
1272 // need to be sure that we have an accurate accounting of the threads for 1297 // need to be sure that we have an accurate accounting of the threads for
1273 // proper Joining and deltion on shutdown. 1298 // proper Joining and deltion on shutdown.
1274 // 1299 //
1275 // We need to figure out if we need another thread with the lock held, which 1300 // We need to figure out if we need another thread with the lock held, which
1276 // is what this function does. It then marks us as in the process of creating 1301 // is what this function does. It then marks us as in the process of creating
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1308 thread_being_created_ = true; 1333 thread_being_created_ = true;
1309 return static_cast<int>(threads_.size() + 1); 1334 return static_cast<int>(threads_.size() + 1);
1310 } 1335 }
1311 } 1336 }
1312 } 1337 }
1313 return 0; 1338 return 0;
1314 } 1339 }
1315 1340
1316 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1341 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1317 int thread_number) { 1342 int thread_number) {
1318 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1343 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1344 subtle::NoBarrier_Load(&g_all_pools_state));
1319 1345
1320 // Called outside of the lock. 1346 // Called outside of the lock.
1321 DCHECK_GT(thread_number, 0); 1347 DCHECK_GT(thread_number, 0);
1322 1348
1323 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { 1349 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
1324 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); 1350 AllPoolsState::WORKER_CREATED) {
1325 g_all_pools_state = AllPoolsState::WORKER_CREATED; 1351 DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
1352 subtle::NoBarrier_Load(&g_all_pools_state));
1353 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
1326 } 1354 }
1327 1355
1328 // The worker is assigned to the list when the thread actually starts, which 1356 // The worker is assigned to the list when the thread actually starts, which
1329 // will manage the memory of the pointer. 1357 // will manage the memory of the pointer.
1330 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1358 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1331 } 1359 }
1332 1360
1333 void SequencedWorkerPool::Inner::SignalHasWork() { 1361 void SequencedWorkerPool::Inner::SignalHasWork() {
1334 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1362 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1363 subtle::NoBarrier_Load(&g_all_pools_state));
1335 1364
1336 has_work_cv_.Signal(); 1365 has_work_cv_.Signal();
1337 if (testing_observer_) { 1366 if (testing_observer_) {
1338 testing_observer_->OnHasWork(); 1367 testing_observer_->OnHasWork();
1339 } 1368 }
1340 } 1369 }
1341 1370
1342 bool SequencedWorkerPool::Inner::CanShutdown() const { 1371 bool SequencedWorkerPool::Inner::CanShutdown() const {
1343 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); 1372 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1373 subtle::NoBarrier_Load(&g_all_pools_state));
1344 lock_.AssertAcquired(); 1374 lock_.AssertAcquired();
1345 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1375 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1346 return !thread_being_created_ && 1376 return !thread_being_created_ &&
1347 blocking_shutdown_thread_count_ == 0 && 1377 blocking_shutdown_thread_count_ == 0 &&
1348 blocking_shutdown_pending_task_count_ == 0; 1378 blocking_shutdown_pending_task_count_ == 0;
1349 } 1379 }
1350 1380
1351 base::StaticAtomicSequenceNumber 1381 base::StaticAtomicSequenceNumber
1352 SequencedWorkerPool::Inner::g_last_sequence_number_; 1382 SequencedWorkerPool::Inner::g_last_sequence_number_;
1353 1383
(...skipping 24 matching lines...) Expand all
1378 } 1408 }
1379 1409
1380 // static 1410 // static
1381 void SequencedWorkerPool:: 1411 void SequencedWorkerPool::
1382 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { 1412 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
1383 DCHECK(TaskScheduler::GetInstance()); 1413 DCHECK(TaskScheduler::GetInstance());
1384 // Hitting this DCHECK indicates that a task was posted to a 1414 // Hitting this DCHECK indicates that a task was posted to a
1385 // SequencedWorkerPool before the TaskScheduler was initialized and 1415 // SequencedWorkerPool before the TaskScheduler was initialized and
1386 // redirected, posting task to SequencedWorkerPools needs to at least be 1416 // redirected, posting task to SequencedWorkerPools needs to at least be
1387 // delayed until after that point. 1417 // delayed until after that point.
1388 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); 1418 DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
1389 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; 1419 subtle::NoBarrier_Load(&g_all_pools_state));
1420 subtle::NoBarrier_Store(&g_all_pools_state,
1421 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
1390 } 1422 }
1391 1423
1392 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1424 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1393 const std::string& thread_name_prefix, 1425 const std::string& thread_name_prefix,
1394 base::TaskPriority task_priority) 1426 base::TaskPriority task_priority)
1395 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1427 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1396 inner_(new Inner(this, 1428 inner_(new Inner(this,
1397 max_threads, 1429 max_threads,
1398 thread_name_prefix, 1430 thread_name_prefix,
1399 task_priority, 1431 task_priority,
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after
1538 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1570 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1539 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1571 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1540 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1572 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1541 } 1573 }
1542 1574
1543 bool SequencedWorkerPool::IsShutdownInProgress() { 1575 bool SequencedWorkerPool::IsShutdownInProgress() {
1544 return inner_->IsShutdownInProgress(); 1576 return inner_->IsShutdownInProgress();
1545 } 1577 }
1546 1578
1547 } // namespace base 1579 } // namespace base
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698