Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index 376c54f3dd4bcd9892e955d45c5042b0a6b980bd..fb71efeaea89c2f901a2e87ecebca02e25f45c39 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -14,6 +14,7 @@ |
#include <vector> |
#include "base/atomic_sequence_num.h" |
+#include "base/atomicops.h" |
#include "base/callback.h" |
#include "base/compiler_specific.h" |
#include "base/critical_closure.h" |
@@ -58,13 +59,21 @@ namespace { |
// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
// and the WORKER_CREATED transition occurs when a Worker needs to be created |
// because the first task was posted and the state is still NONE_ACTIVE. |
+// |g_all_pools_state| uses relaxed atomic operations to ensure no data race |
+// between reads/writes, strict memory ordering isn't required per no other |
+// state being inferred from its value. Explicit synchronization (e.g. locks or |
+// events) would be overkill (it's fine for other threads to still see |
+// NONE_ACTIVE after the first Worker was created -- this is not possible for |
+// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no |
+// other threads are active). |
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
// will be phased out completely otherwise). |
-enum class AllPoolsState { |
+enum AllPoolsState : subtle::Atomic32 { |
NONE_ACTIVE, |
WORKER_CREATED, |
REDIRECTED_TO_TASK_SCHEDULER, |
-} g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
+}; |
+subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
struct SequencedTask : public TrackingInfo { |
SequencedTask() |
@@ -552,7 +561,8 @@ SequencedWorkerPool::Worker::Worker( |
worker_pool_(std::move(worker_pool)), |
task_shutdown_behavior_(BLOCK_SHUTDOWN), |
is_processing_task_(false) { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
Start(); |
} |
@@ -560,7 +570,8 @@ SequencedWorkerPool::Worker::~Worker() { |
} |
void SequencedWorkerPool::Worker::Run() { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
#if defined(OS_WIN) |
win::ScopedCOMInitializer com_initializer; |
@@ -706,7 +717,8 @@ bool SequencedWorkerPool::Inner::PostTask( |
if (optional_token_name) |
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (subtle::NoBarrier_Load(&g_all_pools_state) == |
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
PostTaskToTaskScheduler(sequenced); |
} else { |
pending_tasks_.insert(sequenced); |
@@ -718,7 +730,8 @@ bool SequencedWorkerPool::Inner::PostTask( |
} |
} |
- if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (subtle::NoBarrier_Load(&g_all_pools_state) != |
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
// Actually start the additional thread or signal an existing one outside |
// the lock. |
if (create_thread_id) |
@@ -732,7 +745,8 @@ bool SequencedWorkerPool::Inner::PostTask( |
AutoLock lock_for_dcheck(lock_); |
// Some variables are exposed in both modes for convenience but only really |
// intended for one of them at runtime, confirm exclusive usage here. |
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (subtle::NoBarrier_Load(&g_all_pools_state) == |
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
DCHECK(pending_tasks_.empty()); |
DCHECK_EQ(0, create_thread_id); |
} else { |
@@ -746,7 +760,8 @@ bool SequencedWorkerPool::Inner::PostTask( |
void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
const SequencedTask& sequenced) { |
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
@@ -808,7 +823,8 @@ void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
AutoLock lock(lock_); |
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (subtle::NoBarrier_Load(&g_all_pools_state) == |
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
if (!runs_tasks_on_verifier_) { |
runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
TaskTraits().WithFileIO().WithPriority(task_priority_), |
@@ -823,7 +839,8 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
SequenceToken sequence_token) const { |
AutoLock lock(lock_); |
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (subtle::NoBarrier_Load(&g_all_pools_state) == |
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
// TODO(gab): This currently only verifies that the current thread is a |
// thread on which a task bound to |sequence_token| *could* run, but it |
// doesn't verify that the current is *currently running* a task bound to |
@@ -874,7 +891,8 @@ void SequencedWorkerPool::Inner::Shutdown( |
return; |
shutdown_called_ = true; |
- if (g_all_pools_state != AllPoolsState::WORKER_CREATED) |
+ if (subtle::NoBarrier_Load(&g_all_pools_state) != |
+ AllPoolsState::WORKER_CREATED) |
return; |
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
@@ -916,7 +934,8 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
} |
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
{ |
AutoLock lock(lock_); |
DCHECK(thread_being_created_); |
@@ -1051,7 +1070,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
} |
void SequencedWorkerPool::Inner::HandleCleanup() { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
if (cleanup_state_ == CLEANUP_DONE) |
@@ -1118,7 +1138,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
SequencedTask* task, |
TimeDelta* wait_time, |
std::vector<Closure>* delete_these_outside_lock) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
@@ -1206,7 +1227,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
} |
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
@@ -1239,7 +1261,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
} |
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
@@ -1254,7 +1277,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
int sequence_token_id) const { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
return !sequence_token_id || |
@@ -1263,7 +1287,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
} |
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
// How thread creation works: |
@@ -1315,14 +1340,17 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
int thread_number) { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
// Called outside of the lock. |
DCHECK_GT(thread_number, 0); |
- if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { |
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
- g_all_pools_state = AllPoolsState::WORKER_CREATED; |
+ if (subtle::NoBarrier_Load(&g_all_pools_state) != |
+ AllPoolsState::WORKER_CREATED) { |
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
+ subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED); |
} |
// The worker is assigned to the list when the thread actually starts, which |
@@ -1331,7 +1359,8 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
} |
void SequencedWorkerPool::Inner::SignalHasWork() { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
has_work_cv_.Signal(); |
if (testing_observer_) { |
@@ -1340,7 +1369,8 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
} |
bool SequencedWorkerPool::Inner::CanShutdown() const { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
lock_.AssertAcquired(); |
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
return !thread_being_created_ && |
@@ -1385,8 +1415,10 @@ void SequencedWorkerPool:: |
// SequencedWorkerPool before the TaskScheduler was initialized and |
// redirected, posting task to SequencedWorkerPools needs to at least be |
// delayed until after that point. |
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
- g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
+ subtle::NoBarrier_Load(&g_all_pools_state)); |
+ subtle::NoBarrier_Store(&g_all_pools_state, |
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
} |
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |