Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
| index 2d473bf0fca602917115580e032a56c4b40e718d..51c8244abfdc82736a36365618fdbfa3f87f3807 100644 |
| --- a/base/threading/sequenced_worker_pool.cc |
| +++ b/base/threading/sequenced_worker_pool.cc |
| @@ -15,10 +15,10 @@ |
| #include <vector> |
| #include "base/atomic_sequence_num.h" |
| -#include "base/atomicops.h" |
| #include "base/callback.h" |
| #include "base/compiler_specific.h" |
| #include "base/critical_closure.h" |
| +#include "base/debug/dump_without_crashing.h" |
| #include "base/lazy_instance.h" |
| #include "base/logging.h" |
| #include "base/macros.h" |
| @@ -54,32 +54,22 @@ namespace base { |
| namespace { |
| -// An enum representing the state of all pools. Any given non-test process |
| -// should only ever transition from NONE_ACTIVE to one of the active states. |
| -// Transitions between actives states are unexpected. The |
| -// REDIRECTED_TO_TASK_SCHEDULER transition occurs when |
| -// RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition |
| -// occurs when a Worker needs to be created because the first task was posted |
| -// and the state is still NONE_ACTIVE. In a test process, a transition to |
| -// NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is |
| -// called. |
| +// An enum representing the state of all pools. A non-test process should only |
| +// ever transition from POST_TASK_DISABLED to one of the active states. A test |
| +// process may transition from one of the active states to POST_TASK_DISABLED |
| +// when DisableForProcessForTesting() is called. |
| // |
| -// |g_all_pools_state| uses relaxed atomic operations to ensure no data race |
| -// between reads/writes, strict memory ordering isn't required per no other |
| -// state being inferred from its value. Explicit synchronization (e.g. locks or |
| -// events) would be overkill (it's fine for other threads to still see |
| -// NONE_ACTIVE after the first Worker was created -- this is not possible for |
| -// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no |
| -// other threads are active). |
| +// External memory synchronization is required to call a method that reads |
| +// |g_all_pools_state| after calling a method that modifies it. |
| // |
| // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
| // will be phased out completely otherwise). |
| -enum AllPoolsState : subtle::Atomic32 { |
| - NONE_ACTIVE, |
| - WORKER_CREATED, |
| +enum class AllPoolsState { |
| + POST_TASK_DISABLED, |
| + USE_WORKER_POOL, |
| REDIRECTED_TO_TASK_SCHEDULER, |
| }; |
| -subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
| +AllPoolsState g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; |
| struct SequencedTask : public TrackingInfo { |
| SequencedTask() |
| @@ -582,8 +572,7 @@ SequencedWorkerPool::Worker::Worker( |
| worker_pool_(std::move(worker_pool)), |
| task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| is_processing_task_(false) { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| Start(); |
| } |
| @@ -591,8 +580,7 @@ SequencedWorkerPool::Worker::~Worker() { |
| } |
| void SequencedWorkerPool::Worker::Run() { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| #if defined(OS_WIN) |
| win::ScopedCOMInitializer com_initializer; |
| @@ -691,6 +679,10 @@ bool SequencedWorkerPool::Inner::PostTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| + DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
| + if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) |
| + debug::DumpWithoutCrashing(); |
| + |
| DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
| SequencedTask sequenced(from_here); |
| sequenced.sequence_token_id = sequence_token.id_; |
| @@ -740,8 +732,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
| if (optional_token_name) |
| sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| if (!PostTaskToTaskScheduler(sequenced, delay)) |
| return false; |
| } else { |
| @@ -754,8 +745,10 @@ bool SequencedWorkerPool::Inner::PostTask( |
| } |
| } |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure |
| + // correct behavior if a task is posted to a SequencedWorkerPool before |
| + // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build. |
| + if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| // Actually start the additional thread or signal an existing one outside |
| // the lock. |
| if (create_thread_id) |
| @@ -769,8 +762,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
| AutoLock lock_for_dcheck(lock_); |
| // Some variables are exposed in both modes for convenience but only really |
| // intended for one of them at runtime, confirm exclusive usage here. |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| DCHECK(pending_tasks_.empty()); |
| DCHECK_EQ(0, create_thread_id); |
| } else { |
| @@ -785,8 +777,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
| bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| const SequencedTask& sequenced, |
| const TimeDelta& delay) { |
| - DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| lock_.AssertAcquired(); |
| @@ -820,8 +811,7 @@ scoped_refptr<TaskRunner> |
| SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( |
| int sequence_token_id, |
| const TaskTraits& traits) { |
| - DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| lock_.AssertAcquired(); |
| @@ -858,8 +848,7 @@ SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( |
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| AutoLock lock(lock_); |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| if (!runs_tasks_on_verifier_) { |
| runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| TaskTraits().WithFileIO().WithPriority(task_priority_), |
| @@ -877,8 +866,7 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| AutoLock lock(lock_); |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| const auto sequenced_task_runner_it = |
| sequenced_task_runner_map_.find(sequence_token.id_); |
| return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
| @@ -893,8 +881,7 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| void SequencedWorkerPool::Inner::CleanupForTesting() { |
| - DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state), |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
| + DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
| AutoLock lock(lock_); |
| CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| if (shutdown_called_) |
| @@ -925,10 +912,8 @@ void SequencedWorkerPool::Inner::Shutdown( |
| max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| - AllPoolsState::WORKER_CREATED) { |
| + if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL) |
| return; |
| - } |
| // Tickle the threads. This will wake up a waiting one so it will know that |
| // it can exit, which in turn will wake up any other waiting ones. |
| @@ -967,8 +952,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
| } |
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| { |
| AutoLock lock(lock_); |
| DCHECK(thread_being_created_); |
| @@ -1100,8 +1084,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| } |
| void SequencedWorkerPool::Inner::HandleCleanup() { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| lock_.AssertAcquired(); |
| if (cleanup_state_ == CLEANUP_DONE) |
| @@ -1168,8 +1151,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| SequencedTask* task, |
| TimeDelta* wait_time, |
| std::vector<Closure>* delete_these_outside_lock) { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| lock_.AssertAcquired(); |
| @@ -1257,8 +1239,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| } |
| int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| lock_.AssertAcquired(); |
| @@ -1291,8 +1272,7 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| } |
| void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| lock_.AssertAcquired(); |
| @@ -1307,8 +1287,7 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| int sequence_token_id) const { |
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| lock_.AssertAcquired(); |
| return !sequence_token_id || |
| @@ -1317,8 +1296,7 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| } |
| int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| lock_.AssertAcquired(); |
| // How thread creation works: |
| @@ -1370,27 +1348,18 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| int thread_number) { |
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| // Called outside of the lock. |
| DCHECK_GT(thread_number, 0); |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| - AllPoolsState::WORKER_CREATED) { |
| - DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| - subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED); |
| - } |
| - |
| // The worker is assigned to the list when the thread actually starts, which |
| // will manage the memory of the pointer. |
| new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| } |
| void SequencedWorkerPool::Inner::SignalHasWork() { |
| - DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| has_work_cv_.Signal(); |
| if (testing_observer_) { |
| @@ -1399,8 +1368,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
| } |
| bool SequencedWorkerPool::Inner::CanShutdown() const { |
| - DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| lock_.AssertAcquired(); |
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| return !thread_being_created_ && |
| @@ -1438,26 +1406,20 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| } |
| // static |
| -void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { |
| - DCHECK(TaskScheduler::GetInstance()); |
| - // Hitting this DCHECK indicates that a task was posted to a |
| - // SequencedWorkerPool before the TaskScheduler was initialized and |
| - // redirected, posting task to SequencedWorkerPools needs to at least be |
| - // delayed until after that point. |
| - DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
| - subtle::NoBarrier_Load(&g_all_pools_state)); |
| - subtle::NoBarrier_Store(&g_all_pools_state, |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
| +void SequencedWorkerPool::EnableForProcess() { |
|
gab
2016/11/01 16:09:47
DCHECK_EQ(POST_TASK_DISABLED, ...)?
fdoray
2016/11/01 20:40:28
Done.
|
| + if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) |
| + g_all_pools_state = AllPoolsState::USE_WORKER_POOL; |
| +} |
| + |
| +// static |
| +void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() { |
| + DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
|
gab
2016/11/01 16:09:47
Also
DCHECK(TaskScheduler::GetInstance());
as pe
fdoray
2016/11/01 20:40:28
Done.
|
| + g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
| } |
| // static |
| -void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { |
| - // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER |
| - // to stop redirecting tasks. It can also be called when the current state is |
| - // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called |
| - // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has |
| - // been created if this isn't called). |
| - subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE); |
| +void SequencedWorkerPool::DisableForProcessForTesting() { |
| + g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; |
| } |
| SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| @@ -1596,8 +1558,7 @@ bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| void SequencedWorkerPool::FlushForTesting() { |
| DCHECK(!RunsTasksOnCurrentThread()); |
| base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| - if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| - AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| // TODO(gab): Remove this if http://crbug.com/622400 fails. |
| TaskScheduler::GetInstance()->FlushForTesting(); |
| } else { |