| Index: base/threading/sequenced_worker_pool.cc
|
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
|
| index 6d4339869a10a80451fa3246a5503c4afbec9e29..360fb4a537f2d27eb53aa904d83ecd3f33728d2d 100644
|
| --- a/base/threading/sequenced_worker_pool.cc
|
| +++ b/base/threading/sequenced_worker_pool.cc
|
| @@ -15,10 +15,10 @@
|
| #include <vector>
|
|
|
| #include "base/atomic_sequence_num.h"
|
| +#include "base/atomicops.h"
|
| #include "base/callback.h"
|
| #include "base/compiler_specific.h"
|
| #include "base/critical_closure.h"
|
| -#include "base/debug/dump_without_crashing.h"
|
| #include "base/lazy_instance.h"
|
| #include "base/logging.h"
|
| #include "base/macros.h"
|
| @@ -54,22 +54,32 @@
|
|
|
| namespace {
|
|
|
| -// An enum representing the state of all pools. A non-test process should only
|
| -// ever transition from POST_TASK_DISABLED to one of the active states. A test
|
| -// process may transition from one of the active states to POST_TASK_DISABLED
|
| -// when DisableForProcessForTesting() is called.
|
| +// An enum representing the state of all pools. Any given non-test process
|
| +// should only ever transition from NONE_ACTIVE to one of the active states.
|
| +// Transitions between actives states are unexpected. The
|
| +// REDIRECTED_TO_TASK_SCHEDULER transition occurs when
|
| +// RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition
|
| +// occurs when a Worker needs to be created because the first task was posted
|
| +// and the state is still NONE_ACTIVE. In a test process, a transition to
|
| +// NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is
|
| +// called.
|
| //
|
| -// External memory synchronization is required to call a method that reads
|
| -// |g_all_pools_state| after calling a method that modifies it.
|
| +// |g_all_pools_state| uses relaxed atomic operations to ensure no data race
|
| +// between reads/writes, strict memory ordering isn't required per no other
|
| +// state being inferred from its value. Explicit synchronization (e.g. locks or
|
| +// events) would be overkill (it's fine for other threads to still see
|
| +// NONE_ACTIVE after the first Worker was created -- this is not possible for
|
| +// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
|
| +// other threads are active).
|
| //
|
| // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
|
| // will be phased out completely otherwise).
|
| -enum class AllPoolsState {
|
| - POST_TASK_DISABLED,
|
| - USE_WORKER_POOL,
|
| +enum AllPoolsState : subtle::Atomic32 {
|
| + NONE_ACTIVE,
|
| + WORKER_CREATED,
|
| REDIRECTED_TO_TASK_SCHEDULER,
|
| };
|
| -AllPoolsState g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
|
| +subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
|
|
|
| struct SequencedTask : public TrackingInfo {
|
| SequencedTask()
|
| @@ -572,7 +582,8 @@
|
| worker_pool_(std::move(worker_pool)),
|
| task_shutdown_behavior_(BLOCK_SHUTDOWN),
|
| is_processing_task_(false) {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| Start();
|
| }
|
|
|
| @@ -580,7 +591,8 @@
|
| }
|
|
|
| void SequencedWorkerPool::Worker::Run() {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| #if defined(OS_WIN)
|
| win::ScopedCOMInitializer com_initializer;
|
| @@ -679,14 +691,6 @@
|
| const tracked_objects::Location& from_here,
|
| const Closure& task,
|
| TimeDelta delay) {
|
| - // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a
|
| - // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the
|
| - // waterfall. https://crbug.com/622400
|
| - // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
|
| -
|
| - if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
|
| - debug::DumpWithoutCrashing();
|
| -
|
| DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
|
| SequencedTask sequenced(from_here);
|
| sequenced.sequence_token_id = sequence_token.id_;
|
| @@ -736,7 +740,8 @@
|
| if (optional_token_name)
|
| sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
|
|
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| if (!PostTaskToTaskScheduler(sequenced, delay))
|
| return false;
|
| } else {
|
| @@ -749,10 +754,8 @@
|
| }
|
| }
|
|
|
| - // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure
|
| - // correct behavior if a task is posted to a SequencedWorkerPool before
|
| - // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build.
|
| - if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) !=
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| // Actually start the additional thread or signal an existing one outside
|
| // the lock.
|
| if (create_thread_id)
|
| @@ -766,7 +769,8 @@
|
| AutoLock lock_for_dcheck(lock_);
|
| // Some variables are exposed in both modes for convenience but only really
|
| // intended for one of them at runtime, confirm exclusive usage here.
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| DCHECK(pending_tasks_.empty());
|
| DCHECK_EQ(0, create_thread_id);
|
| } else {
|
| @@ -781,7 +785,8 @@
|
| bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
|
| const SequencedTask& sequenced,
|
| const TimeDelta& delay) {
|
| - DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -815,7 +820,8 @@
|
| SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
|
| int sequence_token_id,
|
| const TaskTraits& traits) {
|
| - DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -852,7 +858,8 @@
|
|
|
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
|
| AutoLock lock(lock_);
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| if (!runs_tasks_on_verifier_) {
|
| runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
|
| TaskTraits().WithFileIO().WithPriority(task_priority_));
|
| @@ -869,7 +876,8 @@
|
|
|
| AutoLock lock(lock_);
|
|
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| const auto sequenced_task_runner_it =
|
| sequenced_task_runner_map_.find(sequence_token.id_);
|
| return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
|
| @@ -884,7 +892,8 @@
|
|
|
| // See https://code.google.com/p/chromium/issues/detail?id=168415
|
| void SequencedWorkerPool::Inner::CleanupForTesting() {
|
| - DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
|
| + DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state),
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
|
| AutoLock lock(lock_);
|
| CHECK_EQ(CLEANUP_DONE, cleanup_state_);
|
| if (shutdown_called_)
|
| @@ -915,8 +924,10 @@
|
|
|
| max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
|
|
|
| - if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL)
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) !=
|
| + AllPoolsState::WORKER_CREATED) {
|
| return;
|
| + }
|
|
|
| // Tickle the threads. This will wake up a waiting one so it will know that
|
| // it can exit, which in turn will wake up any other waiting ones.
|
| @@ -955,7 +966,8 @@
|
| }
|
|
|
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| {
|
| AutoLock lock(lock_);
|
| DCHECK(thread_being_created_);
|
| @@ -1087,7 +1099,8 @@
|
| }
|
|
|
| void SequencedWorkerPool::Inner::HandleCleanup() {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
| if (cleanup_state_ == CLEANUP_DONE)
|
| @@ -1154,7 +1167,8 @@
|
| SequencedTask* task,
|
| TimeDelta* wait_time,
|
| std::vector<Closure>* delete_these_outside_lock) {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -1242,7 +1256,8 @@
|
| }
|
|
|
| int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -1275,7 +1290,8 @@
|
| }
|
|
|
| void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
|
|
| @@ -1290,7 +1306,8 @@
|
|
|
| bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
|
| int sequence_token_id) const {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
| return !sequence_token_id ||
|
| @@ -1299,7 +1316,8 @@
|
| }
|
|
|
| int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| lock_.AssertAcquired();
|
| // How thread creation works:
|
| @@ -1351,10 +1369,18 @@
|
|
|
| void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
|
| int thread_number) {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| // Called outside of the lock.
|
| DCHECK_GT(thread_number, 0);
|
| +
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) !=
|
| + AllPoolsState::WORKER_CREATED) {
|
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| + subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
|
| + }
|
|
|
| // The worker is assigned to the list when the thread actually starts, which
|
| // will manage the memory of the pointer.
|
| @@ -1362,7 +1388,8 @@
|
| }
|
|
|
| void SequencedWorkerPool::Inner::SignalHasWork() {
|
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
|
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
|
|
| has_work_cv_.Signal();
|
| if (testing_observer_) {
|
| @@ -1371,7 +1398,8 @@
|
| }
|
|
|
| bool SequencedWorkerPool::Inner::CanShutdown() const {
|
| - DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
|
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| lock_.AssertAcquired();
|
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
|
| return !thread_being_created_ &&
|
| @@ -1409,26 +1437,26 @@
|
| }
|
|
|
| // static
|
| -void SequencedWorkerPool::EnableForProcess() {
|
| - DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
|
| - g_all_pools_state = AllPoolsState::USE_WORKER_POOL;
|
| +void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() {
|
| + DCHECK(TaskScheduler::GetInstance());
|
| + // Hitting this DCHECK indicates that a task was posted to a
|
| + // SequencedWorkerPool before the TaskScheduler was initialized and
|
| + // redirected, posting task to SequencedWorkerPools needs to at least be
|
| + // delayed until after that point.
|
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
|
| + subtle::NoBarrier_Load(&g_all_pools_state));
|
| + subtle::NoBarrier_Store(&g_all_pools_state,
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
|
| }
|
|
|
| // static
|
| -void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() {
|
| - DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
|
| - DCHECK(TaskScheduler::GetInstance());
|
| - g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
|
| -}
|
| -
|
| -// static
|
| -void SequencedWorkerPool::DisableForProcessForTesting() {
|
| - g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
|
| -}
|
| -
|
| -// static
|
| -bool SequencedWorkerPool::IsEnabled() {
|
| - return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED;
|
| +void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() {
|
| + // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER
|
| + // to stop redirecting tasks. It can also be called when the current state is
|
| + // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called
|
| + // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has
|
| + // been created if this isn't called).
|
| + subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE);
|
| }
|
|
|
| SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
|
| @@ -1567,7 +1595,8 @@
|
| void SequencedWorkerPool::FlushForTesting() {
|
| DCHECK(!RunsTasksOnCurrentThread());
|
| base::ThreadRestrictions::ScopedAllowWait allow_wait;
|
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| + if (subtle::NoBarrier_Load(&g_all_pools_state) ==
|
| + AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
|
| // TODO(gab): Remove this if http://crbug.com/622400 fails.
|
| TaskScheduler::GetInstance()->FlushForTesting();
|
| } else {
|
|
|