Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index 360fb4a537f2d27eb53aa904d83ecd3f33728d2d..61cef6adf6b01c360299b2940f7c473ef3684ac8 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -15,10 +15,10 @@ |
#include <vector> |
#include "base/atomic_sequence_num.h" |
-#include "base/atomicops.h" |
#include "base/callback.h" |
#include "base/compiler_specific.h" |
#include "base/critical_closure.h" |
+#include "base/debug/dump_without_crashing.h" |
#include "base/lazy_instance.h" |
#include "base/logging.h" |
#include "base/macros.h" |
@@ -54,32 +54,26 @@ namespace base { |
namespace { |
-// An enum representing the state of all pools. Any given non-test process |
-// should only ever transition from NONE_ACTIVE to one of the active states. |
-// Transitions between actives states are unexpected. The |
-// REDIRECTED_TO_TASK_SCHEDULER transition occurs when |
-// RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition |
-// occurs when a Worker needs to be created because the first task was posted |
-// and the state is still NONE_ACTIVE. In a test process, a transition to |
-// NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is |
-// called. |
+// An enum representing the state of all pools. A non-test process should only |
+// ever transition from POST_TASK_DISABLED to one of the active states. A test |
+// process may transition from one of the active states to POST_TASK_DISABLED |
+// when DisableForProcessForTesting() is called. |
// |
-// |g_all_pools_state| uses relaxed atomic operations to ensure no data race |
-// between reads/writes, strict memory ordering isn't required per no other |
-// state being inferred from its value. Explicit synchronization (e.g. locks or |
-// events) would be overkill (it's fine for other threads to still see |
-// NONE_ACTIVE after the first Worker was created -- this is not possible for |
-// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no |
-// other threads are active). |
+// External memory synchronization is required to call a method that reads |
+// |g_all_pools_state| after calling a method that modifies it. |
// |
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
// will be phased out completely otherwise). |
-enum AllPoolsState : subtle::Atomic32 { |
- NONE_ACTIVE, |
- WORKER_CREATED, |
+enum class AllPoolsState { |
+ POST_TASK_DISABLED, |
+ USE_WORKER_POOL, |
REDIRECTED_TO_TASK_SCHEDULER, |
}; |
-subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
+ |
+// TODO(fdoray): Change the initial state to POST_TASK_DISABLED. It is initially |
+// USE_WORKER_POOL to avoid a revert of the CL that adds |
+// debug::DumpWithoutCrashing() in case of waterfall failures. |
+AllPoolsState g_all_pools_state = AllPoolsState::USE_WORKER_POOL; |
struct SequencedTask : public TrackingInfo { |
SequencedTask() |
@@ -582,8 +576,7 @@ SequencedWorkerPool::Worker::Worker( |
worker_pool_(std::move(worker_pool)), |
task_shutdown_behavior_(BLOCK_SHUTDOWN), |
is_processing_task_(false) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
Start(); |
} |
@@ -591,8 +584,7 @@ SequencedWorkerPool::Worker::~Worker() { |
} |
void SequencedWorkerPool::Worker::Run() { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
#if defined(OS_WIN) |
win::ScopedCOMInitializer com_initializer; |
@@ -691,6 +683,13 @@ bool SequencedWorkerPool::Inner::PostTask( |
const tracked_objects::Location& from_here, |
const Closure& task, |
TimeDelta delay) { |
+ // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a |
+ // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the |
+ // waterfall. https://crbug.com/622400 |
+ // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
+ if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) |
+ debug::DumpWithoutCrashing(); |
+ |
DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
SequencedTask sequenced(from_here); |
sequenced.sequence_token_id = sequence_token.id_; |
@@ -740,8 +739,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
if (optional_token_name) |
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
- if (subtle::NoBarrier_Load(&g_all_pools_state) == |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
if (!PostTaskToTaskScheduler(sequenced, delay)) |
return false; |
} else { |
@@ -754,8 +752,10 @@ bool SequencedWorkerPool::Inner::PostTask( |
} |
} |
- if (subtle::NoBarrier_Load(&g_all_pools_state) != |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure |
+ // correct behavior if a task is posted to a SequencedWorkerPool before |
+ // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build. |
+ if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
// Actually start the additional thread or signal an existing one outside |
// the lock. |
if (create_thread_id) |
@@ -769,8 +769,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
AutoLock lock_for_dcheck(lock_); |
// Some variables are exposed in both modes for convenience but only really |
// intended for one of them at runtime, confirm exclusive usage here. |
- if (subtle::NoBarrier_Load(&g_all_pools_state) == |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
DCHECK(pending_tasks_.empty()); |
DCHECK_EQ(0, create_thread_id); |
} else { |
@@ -785,8 +784,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
const SequencedTask& sequenced, |
const TimeDelta& delay) { |
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
lock_.AssertAcquired(); |
@@ -820,8 +818,7 @@ scoped_refptr<TaskRunner> |
SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( |
int sequence_token_id, |
const TaskTraits& traits) { |
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
lock_.AssertAcquired(); |
@@ -858,8 +855,7 @@ SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( |
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
AutoLock lock(lock_); |
- if (subtle::NoBarrier_Load(&g_all_pools_state) == |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
if (!runs_tasks_on_verifier_) { |
runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
TaskTraits().WithFileIO().WithPriority(task_priority_)); |
@@ -876,8 +872,7 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
AutoLock lock(lock_); |
- if (subtle::NoBarrier_Load(&g_all_pools_state) == |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
const auto sequenced_task_runner_it = |
sequenced_task_runner_map_.find(sequence_token.id_); |
return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
@@ -892,8 +887,7 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
// See https://code.google.com/p/chromium/issues/detail?id=168415 |
void SequencedWorkerPool::Inner::CleanupForTesting() { |
- DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state), |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
+ DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
AutoLock lock(lock_); |
CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
if (shutdown_called_) |
@@ -924,10 +918,8 @@ void SequencedWorkerPool::Inner::Shutdown( |
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
- if (subtle::NoBarrier_Load(&g_all_pools_state) != |
- AllPoolsState::WORKER_CREATED) { |
+ if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL) |
return; |
- } |
// Tickle the threads. This will wake up a waiting one so it will know that |
// it can exit, which in turn will wake up any other waiting ones. |
@@ -966,8 +958,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
} |
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
{ |
AutoLock lock(lock_); |
DCHECK(thread_being_created_); |
@@ -1099,8 +1090,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
} |
void SequencedWorkerPool::Inner::HandleCleanup() { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
lock_.AssertAcquired(); |
if (cleanup_state_ == CLEANUP_DONE) |
@@ -1167,8 +1157,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
SequencedTask* task, |
TimeDelta* wait_time, |
std::vector<Closure>* delete_these_outside_lock) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
lock_.AssertAcquired(); |
@@ -1256,8 +1245,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
} |
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
lock_.AssertAcquired(); |
@@ -1290,8 +1278,7 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
} |
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
lock_.AssertAcquired(); |
@@ -1306,8 +1293,7 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
int sequence_token_id) const { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
lock_.AssertAcquired(); |
return !sequence_token_id || |
@@ -1316,8 +1302,7 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
} |
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
lock_.AssertAcquired(); |
// How thread creation works: |
@@ -1369,27 +1354,18 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
int thread_number) { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
// Called outside of the lock. |
DCHECK_GT(thread_number, 0); |
- if (subtle::NoBarrier_Load(&g_all_pools_state) != |
- AllPoolsState::WORKER_CREATED) { |
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
- subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED); |
- } |
- |
// The worker is assigned to the list when the thread actually starts, which |
// will manage the memory of the pointer. |
new Worker(worker_pool_, thread_number, thread_name_prefix_); |
} |
void SequencedWorkerPool::Inner::SignalHasWork() { |
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
has_work_cv_.Signal(); |
if (testing_observer_) { |
@@ -1398,8 +1374,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
} |
bool SequencedWorkerPool::Inner::CanShutdown() const { |
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
lock_.AssertAcquired(); |
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
return !thread_being_created_ && |
@@ -1437,26 +1412,32 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
} |
// static |
-void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { |
+void SequencedWorkerPool::EnableForProcess() { |
+ // TODO(fdoray): Uncomment this line. It is initially commented to avoid a |
+ // revert of the CL that adds debug::DumpWithoutCrashing() in case of |
+ // waterfall failures. |
+ // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
+ g_all_pools_state = AllPoolsState::USE_WORKER_POOL; |
+} |
+ |
+// static |
+void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() { |
+ // TODO(fdoray): Uncomment this line. It is initially commented to avoid a |
+ // revert of the CL that adds debug::DumpWithoutCrashing() in case of |
+ // waterfall failures. |
+ // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
DCHECK(TaskScheduler::GetInstance()); |
- // Hitting this DCHECK indicates that a task was posted to a |
- // SequencedWorkerPool before the TaskScheduler was initialized and |
- // redirected, posting task to SequencedWorkerPools needs to at least be |
- // delayed until after that point. |
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
- subtle::NoBarrier_Load(&g_all_pools_state)); |
- subtle::NoBarrier_Store(&g_all_pools_state, |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
+ g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
+} |
+ |
+// static |
+void SequencedWorkerPool::DisableForProcessForTesting() { |
+ g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; |
} |
// static |
-void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { |
- // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER |
- // to stop redirecting tasks. It can also be called when the current state is |
- // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called |
- // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has |
- // been created if this isn't called). |
- subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE); |
+bool SequencedWorkerPool::IsEnabled() { |
+ return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED; |
} |
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
@@ -1595,8 +1576,7 @@ bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
void SequencedWorkerPool::FlushForTesting() { |
DCHECK(!RunsTasksOnCurrentThread()); |
base::ThreadRestrictions::ScopedAllowWait allow_wait; |
- if (subtle::NoBarrier_Load(&g_all_pools_state) == |
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
// TODO(gab): Remove this if http://crbug.com/622400 fails. |
TaskScheduler::GetInstance()->FlushForTesting(); |
} else { |