Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2476)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 2517443002: Revert of Disallow posting tasks to SequencedWorkerPools by default. (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index e318d719d4b726c44b8bf650426dc3d95d495542..360fb4a537f2d27eb53aa904d83ecd3f33728d2d 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -15,10 +15,10 @@
#include <vector>
#include "base/atomic_sequence_num.h"
+#include "base/atomicops.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/critical_closure.h"
-#include "base/debug/dump_without_crashing.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/macros.h"
@@ -54,22 +54,32 @@
namespace {
-// An enum representing the state of all pools. A non-test process should only
-// ever transition from POST_TASK_DISABLED to one of the active states. A test
-// process may transition from one of the active states to POST_TASK_DISABLED
-// when DisableForProcessForTesting() is called.
+// An enum representing the state of all pools. Any given non-test process
+// should only ever transition from NONE_ACTIVE to one of the active states.
+// Transitions between actives states are unexpected. The
+// REDIRECTED_TO_TASK_SCHEDULER transition occurs when
+// RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition
+// occurs when a Worker needs to be created because the first task was posted
+// and the state is still NONE_ACTIVE. In a test process, a transition to
+// NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is
+// called.
//
-// External memory synchronization is required to call a method that reads
-// |g_all_pools_state| after calling a method that modifies it.
+// |g_all_pools_state| uses relaxed atomic operations to ensure no data race
+// between reads/writes, strict memory ordering isn't required per no other
+// state being inferred from its value. Explicit synchronization (e.g. locks or
+// events) would be overkill (it's fine for other threads to still see
+// NONE_ACTIVE after the first Worker was created -- this is not possible for
+// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
+// other threads are active).
//
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
// will be phased out completely otherwise).
-enum class AllPoolsState {
- POST_TASK_DISABLED,
- USE_WORKER_POOL,
+enum AllPoolsState : subtle::Atomic32 {
+ NONE_ACTIVE,
+ WORKER_CREATED,
REDIRECTED_TO_TASK_SCHEDULER,
};
-AllPoolsState g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
+subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
struct SequencedTask : public TrackingInfo {
SequencedTask()
@@ -572,7 +582,8 @@
worker_pool_(std::move(worker_pool)),
task_shutdown_behavior_(BLOCK_SHUTDOWN),
is_processing_task_(false) {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
Start();
}
@@ -580,7 +591,8 @@
}
void SequencedWorkerPool::Worker::Run() {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
#if defined(OS_WIN)
win::ScopedCOMInitializer com_initializer;
@@ -679,10 +691,6 @@
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
- DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
- if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
- debug::DumpWithoutCrashing();
-
DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
SequencedTask sequenced(from_here);
sequenced.sequence_token_id = sequence_token.id_;
@@ -732,7 +740,8 @@
if (optional_token_name)
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
if (!PostTaskToTaskScheduler(sequenced, delay))
return false;
} else {
@@ -745,10 +754,8 @@
}
}
- // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure
- // correct behavior if a task is posted to a SequencedWorkerPool before
- // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build.
- if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) !=
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
// Actually start the additional thread or signal an existing one outside
// the lock.
if (create_thread_id)
@@ -762,7 +769,8 @@
AutoLock lock_for_dcheck(lock_);
// Some variables are exposed in both modes for convenience but only really
// intended for one of them at runtime, confirm exclusive usage here.
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
DCHECK(pending_tasks_.empty());
DCHECK_EQ(0, create_thread_id);
} else {
@@ -777,7 +785,8 @@
bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
const SequencedTask& sequenced,
const TimeDelta& delay) {
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -811,7 +820,8 @@
SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
int sequence_token_id,
const TaskTraits& traits) {
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -848,7 +858,8 @@
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
AutoLock lock(lock_);
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
if (!runs_tasks_on_verifier_) {
runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
TaskTraits().WithFileIO().WithPriority(task_priority_));
@@ -865,7 +876,8 @@
AutoLock lock(lock_);
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
const auto sequenced_task_runner_it =
sequenced_task_runner_map_.find(sequence_token.id_);
return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
@@ -880,7 +892,8 @@
// See https://code.google.com/p/chromium/issues/detail?id=168415
void SequencedWorkerPool::Inner::CleanupForTesting() {
- DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
+ DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state),
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
AutoLock lock(lock_);
CHECK_EQ(CLEANUP_DONE, cleanup_state_);
if (shutdown_called_)
@@ -911,8 +924,10 @@
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
- if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL)
+ if (subtle::NoBarrier_Load(&g_all_pools_state) !=
+ AllPoolsState::WORKER_CREATED) {
return;
+ }
// Tickle the threads. This will wake up a waiting one so it will know that
// it can exit, which in turn will wake up any other waiting ones.
@@ -951,7 +966,8 @@
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
{
AutoLock lock(lock_);
DCHECK(thread_being_created_);
@@ -1083,7 +1099,8 @@
}
void SequencedWorkerPool::Inner::HandleCleanup() {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
if (cleanup_state_ == CLEANUP_DONE)
@@ -1150,7 +1167,8 @@
SequencedTask* task,
TimeDelta* wait_time,
std::vector<Closure>* delete_these_outside_lock) {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -1238,7 +1256,8 @@
}
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -1271,7 +1290,8 @@
}
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -1286,7 +1306,8 @@
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
int sequence_token_id) const {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
return !sequence_token_id ||
@@ -1295,7 +1316,8 @@
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
// How thread creation works:
@@ -1347,10 +1369,18 @@
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
int thread_number) {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
// Called outside of the lock.
DCHECK_GT(thread_number, 0);
+
+ if (subtle::NoBarrier_Load(&g_all_pools_state) !=
+ AllPoolsState::WORKER_CREATED) {
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
+ subtle::NoBarrier_Load(&g_all_pools_state));
+ subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
+ }
// The worker is assigned to the list when the thread actually starts, which
// will manage the memory of the pointer.
@@ -1358,7 +1388,8 @@
}
void SequencedWorkerPool::Inner::SignalHasWork() {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
has_work_cv_.Signal();
if (testing_observer_) {
@@ -1367,7 +1398,8 @@
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
- DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
return !thread_being_created_ &&
@@ -1405,26 +1437,26 @@
}
// static
-void SequencedWorkerPool::EnableForProcess() {
- DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
- g_all_pools_state = AllPoolsState::USE_WORKER_POOL;
+void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() {
+ DCHECK(TaskScheduler::GetInstance());
+ // Hitting this DCHECK indicates that a task was posted to a
+ // SequencedWorkerPool before the TaskScheduler was initialized and
+ // redirected, posting task to SequencedWorkerPools needs to at least be
+ // delayed until after that point.
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
+ subtle::NoBarrier_Load(&g_all_pools_state));
+ subtle::NoBarrier_Store(&g_all_pools_state,
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
}
// static
-void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() {
- DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
- DCHECK(TaskScheduler::GetInstance());
- g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
-}
-
-// static
-void SequencedWorkerPool::DisableForProcessForTesting() {
- g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
-}
-
-// static
-bool SequencedWorkerPool::IsEnabled() {
- return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED;
+void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() {
+ // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER
+ // to stop redirecting tasks. It can also be called when the current state is
+ // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called
+ // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has
+ // been created if this isn't called).
+ subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE);
}
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
@@ -1563,7 +1595,8 @@
void SequencedWorkerPool::FlushForTesting() {
DCHECK(!RunsTasksOnCurrentThread());
base::ThreadRestrictions::ScopedAllowWait allow_wait;
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
// TODO(gab): Remove this if http://crbug.com/622400 fails.
TaskScheduler::GetInstance()->FlushForTesting();
} else {
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698