Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2422)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 2295323004: Fix |g_all_pools_state| data race by using atomics. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 376c54f3dd4bcd9892e955d45c5042b0a6b980bd..fb71efeaea89c2f901a2e87ecebca02e25f45c39 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -14,6 +14,7 @@
#include <vector>
#include "base/atomic_sequence_num.h"
+#include "base/atomicops.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/critical_closure.h"
@@ -58,13 +59,21 @@ namespace {
// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
// and the WORKER_CREATED transition occurs when a Worker needs to be created
// because the first task was posted and the state is still NONE_ACTIVE.
+// |g_all_pools_state| uses relaxed atomic operations to ensure no data race
+// between reads/writes, strict memory ordering isn't required per no other
+// state being inferred from its value. Explicit synchronization (e.g. locks or
+// events) would be overkill (it's fine for other threads to still see
+// NONE_ACTIVE after the first Worker was created -- this is not possible for
+// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
+// other threads are active).
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
// will be phased out completely otherwise).
-enum class AllPoolsState {
+enum AllPoolsState : subtle::Atomic32 {
NONE_ACTIVE,
WORKER_CREATED,
REDIRECTED_TO_TASK_SCHEDULER,
-} g_all_pools_state = AllPoolsState::NONE_ACTIVE;
+};
+subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
struct SequencedTask : public TrackingInfo {
SequencedTask()
@@ -552,7 +561,8 @@ SequencedWorkerPool::Worker::Worker(
worker_pool_(std::move(worker_pool)),
task_shutdown_behavior_(BLOCK_SHUTDOWN),
is_processing_task_(false) {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
Start();
}
@@ -560,7 +570,8 @@ SequencedWorkerPool::Worker::~Worker() {
}
void SequencedWorkerPool::Worker::Run() {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
#if defined(OS_WIN)
win::ScopedCOMInitializer com_initializer;
@@ -706,7 +717,8 @@ bool SequencedWorkerPool::Inner::PostTask(
if (optional_token_name)
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
PostTaskToTaskScheduler(sequenced);
} else {
pending_tasks_.insert(sequenced);
@@ -718,7 +730,8 @@ bool SequencedWorkerPool::Inner::PostTask(
}
}
- if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) !=
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
// Actually start the additional thread or signal an existing one outside
// the lock.
if (create_thread_id)
@@ -732,7 +745,8 @@ bool SequencedWorkerPool::Inner::PostTask(
AutoLock lock_for_dcheck(lock_);
// Some variables are exposed in both modes for convenience but only really
// intended for one of them at runtime, confirm exclusive usage here.
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
DCHECK(pending_tasks_.empty());
DCHECK_EQ(0, create_thread_id);
} else {
@@ -746,7 +760,8 @@ bool SequencedWorkerPool::Inner::PostTask(
void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
const SequencedTask& sequenced) {
- DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -808,7 +823,8 @@ void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
AutoLock lock(lock_);
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
if (!runs_tasks_on_verifier_) {
runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
TaskTraits().WithFileIO().WithPriority(task_priority_),
@@ -823,7 +839,8 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
SequenceToken sequence_token) const {
AutoLock lock(lock_);
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (subtle::NoBarrier_Load(&g_all_pools_state) ==
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
// TODO(gab): This currently only verifies that the current thread is a
// thread on which a task bound to |sequence_token| *could* run, but it
// doesn't verify that the current is *currently running* a task bound to
@@ -874,7 +891,8 @@ void SequencedWorkerPool::Inner::Shutdown(
return;
shutdown_called_ = true;
- if (g_all_pools_state != AllPoolsState::WORKER_CREATED)
+ if (subtle::NoBarrier_Load(&g_all_pools_state) !=
+ AllPoolsState::WORKER_CREATED)
return;
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
@@ -916,7 +934,8 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
{
AutoLock lock(lock_);
DCHECK(thread_being_created_);
@@ -1051,7 +1070,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
}
void SequencedWorkerPool::Inner::HandleCleanup() {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
if (cleanup_state_ == CLEANUP_DONE)
@@ -1118,7 +1138,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
TimeDelta* wait_time,
std::vector<Closure>* delete_these_outside_lock) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -1206,7 +1227,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
}
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -1239,7 +1261,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
}
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
@@ -1254,7 +1277,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
int sequence_token_id) const {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
return !sequence_token_id ||
@@ -1263,7 +1287,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
// How thread creation works:
@@ -1315,14 +1340,17 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
int thread_number) {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
// Called outside of the lock.
DCHECK_GT(thread_number, 0);
- if (g_all_pools_state != AllPoolsState::WORKER_CREATED) {
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
- g_all_pools_state = AllPoolsState::WORKER_CREATED;
+ if (subtle::NoBarrier_Load(&g_all_pools_state) !=
+ AllPoolsState::WORKER_CREATED) {
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
+ subtle::NoBarrier_Load(&g_all_pools_state));
+ subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
}
// The worker is assigned to the list when the thread actually starts, which
@@ -1331,7 +1359,8 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
}
void SequencedWorkerPool::Inner::SignalHasWork() {
- DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
+ subtle::NoBarrier_Load(&g_all_pools_state));
has_work_cv_.Signal();
if (testing_observer_) {
@@ -1340,7 +1369,8 @@ void SequencedWorkerPool::Inner::SignalHasWork() {
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
- DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED,
+ subtle::NoBarrier_Load(&g_all_pools_state));
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
return !thread_being_created_ &&
@@ -1385,8 +1415,10 @@ void SequencedWorkerPool::
// SequencedWorkerPool before the TaskScheduler was initialized and
// redirected, posting task to SequencedWorkerPools needs to at least be
// delayed until after that point.
- DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
- g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
+ subtle::NoBarrier_Load(&g_all_pools_state));
+ subtle::NoBarrier_Store(&g_all_pools_state,
+ AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
}
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698