| Index: base/threading/sequenced_worker_pool.cc
|
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
|
| index 376c54f3dd4bcd9892e955d45c5042b0a6b980bd..fb71efeaea89c2f901a2e87ecebca02e25f45c39 100644
|
| --- a/base/threading/sequenced_worker_pool.cc
|
| +++ b/base/threading/sequenced_worker_pool.cc
|
| @@ -14,6 +14,7 @@
|
| #include <vector>
|
|
|
| #include "base/atomic_sequence_num.h"
|
| +#include "base/atomicops.h"
|
| #include "base/callback.h"
|
| #include "base/compiler_specific.h"
|
| #include "base/critical_closure.h"
|
| @@ -58,13 +59,21 @@ namespace {
|
| // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
|
| // and the WORKER_CREATED transition occurs when a Worker needs to be created
|
| // because the first task was posted and the state is still NONE_ACTIVE.
|
| +// |g_all_pools_state| uses relaxed atomic operations to ensure no data race
|
| +// between reads/writes, strict memory ordering isn't required per no other
|
| +// state being inferred from its value. Explicit synchronization (e.g. locks or
|
| +// events) would be overkill (it's fine for other threads to still see
|
| +// NONE_ACTIVE after the first Worker was created -- this is not possible for
|
| +// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
|
| +// other threads are active).
|
| // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
|
| // will be phased out completely otherwise).
|
| -enum class AllPoolsState {
|
| +enum AllPoolsState : subtle::Atomic32 {
|
| NONE_ACTIVE,
|
| WORKER_CREATED,
|
| REDIRECTED_TO_TASK_SCHEDULER,
|
| -} g_all_pools_state = AllPoolsState::NONE_ACTIVE;
|
| +};
|
| +subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
|
|
|
| struct SequencedTask : public TrackingInfo {
|
| SequencedTask()
|
| @@ -552,7 +561,8 @@ SequencedWorkerPool::Worker::Worker(
|
| worker_pool_(std::move(worker_pool)),
|
| task_shutdown_behavior_(BLOCK_SHUTDOWN),
|
| is_processing_task_(false) {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| Start();
|
| }
|
|
|
| @@ -560,7 +570,8 @@ SequencedWorkerPool::Worker::~Worker() {
|
| }
|
|
|
| void SequencedWorkerPool::Worker::Run() {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| #if defined(OS_WIN)
|
| win::ScopedCOMInitializer com_initializer;
|
| @@ -706,7 +717,8 @@ bool SequencedWorkerPool::Inner::PostTask(
|
| if (optional_token_name)
|
| sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
|
|
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| PostTaskToTaskScheduler(sequenced);
|
| } else {
|
| pending_tasks_.insert(sequenced);
|
| @@ -718,7 +730,8 @@ bool SequencedWorkerPool::Inner::PostTask(
|
| }
|
| }
|
|
|
| - if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) !=
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| // Actually start the additional thread or signal an existing one outside
|
| // the lock.
|
| if (create_thread_id)
|
| @@ -732,7 +745,8 @@ bool SequencedWorkerPool::Inner::PostTask(
|
| AutoLock lock_for_dcheck(lock_);
|
| // Some variables are exposed in both modes for convenience but only really
|
| // intended for one of them at runtime, confirm exclusive usage here.
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| DCHECK(pending_tasks_.empty());
|
| DCHECK_EQ(0, create_thread_id);
|
| } else {
|
| @@ -746,7 +760,8 @@ bool SequencedWorkerPool::Inner::PostTask(
|
|
|
| void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
|
| const SequencedTask& sequenced) {
|
| - DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -808,7 +823,8 @@ void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
|
|
|
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
|
| AutoLock lock(lock_);
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| if (!runs_tasks_on_verifier_) {
|
| runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
|
| TaskTraits().WithFileIO().WithPriority(task_priority_),
|
| @@ -823,7 +839,8 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
|
| bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
|
| SequenceToken sequence_token) const {
|
| AutoLock lock(lock_);
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| // TODO(gab): This currently only verifies that the current thread is a
|
| // thread on which a task bound to |sequence_token| *could* run, but it
|
| // doesn't verify that the current is *currently running* a task bound to
|
| @@ -874,7 +891,8 @@ void SequencedWorkerPool::Inner::Shutdown(
|
| return;
|
| shutdown_called_ = true;
|
|
|
| - if (g_all_pools_state != AllPoolsState::WORKER_CREATED)
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) !=
|
| + AllPoolsState::WORKER_CREATED)
|
| return;
|
|
|
| max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
|
| @@ -916,7 +934,8 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
|
| }
|
|
|
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| {
|
| AutoLock lock(lock_);
|
| DCHECK(thread_being_created_);
|
| @@ -1051,7 +1070,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| }
|
|
|
| void SequencedWorkerPool::Inner::HandleCleanup() {
|
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
| if (cleanup_state_ == CLEANUP_DONE)
|
| @@ -1118,7 +1138,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
|
| SequencedTask* task,
|
| TimeDelta* wait_time,
|
| std::vector<Closure>* delete_these_outside_lock) {
|
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -1206,7 +1227,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
|
| }
|
|
|
| int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
|
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -1239,7 +1261,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
|
| }
|
|
|
| void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
|
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -1254,7 +1277,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
|
|
|
| bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
|
| int sequence_token_id) const {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
| return !sequence_token_id ||
|
| @@ -1263,7 +1287,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
|
| }
|
|
|
| int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
| // How thread creation works:
|
| @@ -1315,14 +1340,17 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
|
|
|
| void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
|
| int thread_number) {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| // Called outside of the lock.
|
| DCHECK_GT(thread_number, 0);
|
|
|
| - if (g_all_pools_state != AllPoolsState::WORKER_CREATED) {
|
| - DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
|
| - g_all_pools_state = AllPoolsState::WORKER_CREATED;
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) !=
|
| + AllPoolsState::WORKER_CREATED) {
|
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| + subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
|
| }
|
|
|
| // The worker is assigned to the list when the thread actually starts, which
|
| @@ -1331,7 +1359,8 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
|
| }
|
|
|
| void SequencedWorkerPool::Inner::SignalHasWork() {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| has_work_cv_.Signal();
|
| if (testing_observer_) {
|
| @@ -1340,7 +1369,8 @@ void SequencedWorkerPool::Inner::SignalHasWork() {
|
| }
|
|
|
| bool SequencedWorkerPool::Inner::CanShutdown() const {
|
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| lock_.AssertAcquired();
|
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
|
| return !thread_being_created_ &&
|
| @@ -1385,8 +1415,10 @@ void SequencedWorkerPool::
|
| // SequencedWorkerPool before the TaskScheduler was initialized and
|
| // redirected, posting task to SequencedWorkerPools needs to at least be
|
| // delayed until after that point.
|
| - DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
|
| - g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
|
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| + subtle::NoBarrier_Store(&g_all_pools_state,
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
|
| }
|
|
|
| SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
|
|
|