Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(931)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 2445763002: Disallow posting tasks to SequencedWorkerPools by default. (Closed)
Patch Set: self-review Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 360fb4a537f2d27eb53aa904d83ecd3f33728d2d..61cef6adf6b01c360299b2940f7c473ef3684ac8 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -15,10 +15,10 @@
#include <vector>
#include "base/atomic_sequence_num.h"
-#include "base/atomicops.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/critical_closure.h"
+#include "base/debug/dump_without_crashing.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/macros.h"
@@ -54,32 +54,26 @@ namespace base {
namespace {
-// An enum representing the state of all pools. Any given non-test process
-// should only ever transition from NONE_ACTIVE to one of the active states.
-// Transitions between actives states are unexpected. The
-// REDIRECTED_TO_TASK_SCHEDULER transition occurs when
-// RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition
-// occurs when a Worker needs to be created because the first task was posted
-// and the state is still NONE_ACTIVE. In a test process, a transition to
-// NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is
-// called.
+// An enum representing the state of all pools. A non-test process should only
+// ever transition from POST_TASK_DISABLED to one of the active states. A test
+// process may transition from one of the active states to POST_TASK_DISABLED
+// when DisableForProcessForTesting() is called.
//
-// |g_all_pools_state| uses relaxed atomic operations to ensure no data race
-// between reads/writes, strict memory ordering isn't required per no other
-// state being inferred from its value. Explicit synchronization (e.g. locks or
-// events) would be overkill (it's fine for other threads to still see
-// NONE_ACTIVE after the first Worker was created -- this is not possible for
-// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
-// other threads are active).
+// External memory synchronization is required to call a method that reads
+// |g_all_pools_state| after calling a method that modifies it.
//
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
// will be phased out completely otherwise).
-enum AllPoolsState : subtle::Atomic32 {
- NONE_ACTIVE,
- WORKER_CREATED,
+enum class AllPoolsState {
+ POST_TASK_DISABLED,
+ USE_WORKER_POOL,
REDIRECTED_TO_TASK_SCHEDULER,
};
-subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
+
+// TODO(fdoray): Change the initial state to POST_TASK_DISABLED. It is initially
+// USE_WORKER_POOL to avoid a revert of the CL that adds
+// debug::DumpWithoutCrashing() in case of waterfall failures.
+AllPoolsState g_all_pools_state = AllPoolsState::USE_WORKER_POOL;
struct SequencedTask : public TrackingInfo {
SequencedTask()
@@ -582,8 +576,7 @@ SequencedWorkerPool::Worker::Worker(
worker_pool_(std::move(worker_pool)),
task_shutdown_behavior_(BLOCK_SHUTDOWN),
is_processing_task_(false) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
Start();
}
@@ -591,8 +584,7 @@ SequencedWorkerPool::Worker::~Worker() {
}
void SequencedWorkerPool::Worker::Run() {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
#if defined(OS_WIN)
win::ScopedCOMInitializer com_initializer;
@@ -691,6 +683,13 @@ bool SequencedWorkerPool::Inner::PostTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
+ // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a
+ // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the
+ // waterfall. https://crbug.com/622400
+ // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
+ if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
+ debug::DumpWithoutCrashing();
+
DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
SequencedTask sequenced(from_here);
sequenced.sequence_token_id = sequence_token.id_;
@@ -740,8 +739,7 @@ bool SequencedWorkerPool::Inner::PostTask(
if (optional_token_name)
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
- if (subtle::NoBarrier_Load(&g_all_pools_state) ==
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
if (!PostTaskToTaskScheduler(sequenced, delay))
return false;
} else {
@@ -754,8 +752,10 @@ bool SequencedWorkerPool::Inner::PostTask(
}
}
- if (subtle::NoBarrier_Load(&g_all_pools_state) !=
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure
+ // correct behavior if a task is posted to a SequencedWorkerPool before
+ // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build.
+ if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
// Actually start the additional thread or signal an existing one outside
// the lock.
if (create_thread_id)
@@ -769,8 +769,7 @@ bool SequencedWorkerPool::Inner::PostTask(
AutoLock lock_for_dcheck(lock_);
// Some variables are exposed in both modes for convenience but only really
// intended for one of them at runtime, confirm exclusive usage here.
- if (subtle::NoBarrier_Load(&g_all_pools_state) ==
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
DCHECK(pending_tasks_.empty());
DCHECK_EQ(0, create_thread_id);
} else {
@@ -785,8 +784,7 @@ bool SequencedWorkerPool::Inner::PostTask(
bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
const SequencedTask& sequenced,
const TimeDelta& delay) {
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
lock_.AssertAcquired();
@@ -820,8 +818,7 @@ scoped_refptr<TaskRunner>
SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
int sequence_token_id,
const TaskTraits& traits) {
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
lock_.AssertAcquired();
@@ -858,8 +855,7 @@ SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
AutoLock lock(lock_);
- if (subtle::NoBarrier_Load(&g_all_pools_state) ==
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
if (!runs_tasks_on_verifier_) {
runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
TaskTraits().WithFileIO().WithPriority(task_priority_));
@@ -876,8 +872,7 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
AutoLock lock(lock_);
- if (subtle::NoBarrier_Load(&g_all_pools_state) ==
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
const auto sequenced_task_runner_it =
sequenced_task_runner_map_.find(sequence_token.id_);
return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
@@ -892,8 +887,7 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
// See https://code.google.com/p/chromium/issues/detail?id=168415
void SequencedWorkerPool::Inner::CleanupForTesting() {
- DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state),
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
+ DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
AutoLock lock(lock_);
CHECK_EQ(CLEANUP_DONE, cleanup_state_);
if (shutdown_called_)
@@ -924,10 +918,8 @@ void SequencedWorkerPool::Inner::Shutdown(
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
- if (subtle::NoBarrier_Load(&g_all_pools_state) !=
- AllPoolsState::WORKER_CREATED) {
+ if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL)
return;
- }
// Tickle the threads. This will wake up a waiting one so it will know that
// it can exit, which in turn will wake up any other waiting ones.
@@ -966,8 +958,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
{
AutoLock lock(lock_);
DCHECK(thread_being_created_);
@@ -1099,8 +1090,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
}
void SequencedWorkerPool::Inner::HandleCleanup() {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
lock_.AssertAcquired();
if (cleanup_state_ == CLEANUP_DONE)
@@ -1167,8 +1157,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
TimeDelta* wait_time,
std::vector<Closure>* delete_these_outside_lock) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
lock_.AssertAcquired();
@@ -1256,8 +1245,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
}
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
lock_.AssertAcquired();
@@ -1290,8 +1278,7 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
}
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
lock_.AssertAcquired();
@@ -1306,8 +1293,7 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
int sequence_token_id) const {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
lock_.AssertAcquired();
return !sequence_token_id ||
@@ -1316,8 +1302,7 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
lock_.AssertAcquired();
// How thread creation works:
@@ -1369,27 +1354,18 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
int thread_number) {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
// Called outside of the lock.
DCHECK_GT(thread_number, 0);
- if (subtle::NoBarrier_Load(&g_all_pools_state) !=
- AllPoolsState::WORKER_CREATED) {
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
- subtle::NoBarrier_Load(&g_all_pools_state));
- subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
- }
-
// The worker is assigned to the list when the thread actually starts, which
// will manage the memory of the pointer.
new Worker(worker_pool_, thread_number, thread_name_prefix_);
}
void SequencedWorkerPool::Inner::SignalHasWork() {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
has_work_cv_.Signal();
if (testing_observer_) {
@@ -1398,8 +1374,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() {
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED,
- subtle::NoBarrier_Load(&g_all_pools_state));
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
return !thread_being_created_ &&
@@ -1437,26 +1412,32 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
}
// static
-void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() {
+void SequencedWorkerPool::EnableForProcess() {
+ // TODO(fdoray): Uncomment this line. It is initially commented to avoid a
+ // revert of the CL that adds debug::DumpWithoutCrashing() in case of
+ // waterfall failures.
+ // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
+ g_all_pools_state = AllPoolsState::USE_WORKER_POOL;
+}
+
+// static
+void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() {
+ // TODO(fdoray): Uncomment this line. It is initially commented to avoid a
+ // revert of the CL that adds debug::DumpWithoutCrashing() in case of
+ // waterfall failures.
+ // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
DCHECK(TaskScheduler::GetInstance());
- // Hitting this DCHECK indicates that a task was posted to a
- // SequencedWorkerPool before the TaskScheduler was initialized and
- // redirected, posting task to SequencedWorkerPools needs to at least be
- // delayed until after that point.
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
- subtle::NoBarrier_Load(&g_all_pools_state));
- subtle::NoBarrier_Store(&g_all_pools_state,
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
+ g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
+}
+
+// static
+void SequencedWorkerPool::DisableForProcessForTesting() {
+ g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
}
// static
-void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() {
- // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER
- // to stop redirecting tasks. It can also be called when the current state is
- // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called
- // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has
- // been created if this isn't called).
- subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE);
+bool SequencedWorkerPool::IsEnabled() {
+ return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED;
}
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
@@ -1595,8 +1576,7 @@ bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
void SequencedWorkerPool::FlushForTesting() {
DCHECK(!RunsTasksOnCurrentThread());
base::ThreadRestrictions::ScopedAllowWait allow_wait;
- if (subtle::NoBarrier_Load(&g_all_pools_state) ==
- AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
// TODO(gab): Remove this if http://crbug.com/622400 fails.
TaskScheduler::GetInstance()->FlushForTesting();
} else {
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698