Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2976)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 2241703002: Add an experiment to redirect SequencedWorkerPool tasks to TaskScheduler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b3_delay_metrics_blocking
Patch Set: self-review Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index ef6047ef478411350d64469fa62d2ca7e9295dc2..13323d776339183a68aa313fd67edbf28f67c189 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -25,6 +25,8 @@
#include "base/strings/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
+#include "base/task_scheduler/post_task.h"
+#include "base/task_scheduler/task_scheduler.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_local.h"
@@ -50,6 +52,20 @@ namespace base {
namespace {
+// An enum representing the state of all pools. Any given process should only
+// ever transition from NONE_ACTIVE to the active states, transitions between
+// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition
+// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
+// and the WORKER_CREATED transition occurs when a Worker needs to be created
+// because the first task was posted and the state is still NONE_ACTIVE.
+// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
+// will be phased out completely otherwise).
+enum class AllPoolsState {
+ NONE_ACTIVE,
+ WORKER_CREATED,
+ REDIRECTED_TO_TASK_SCHEDULER,
+} g_all_pools_state = AllPoolsState::NONE_ACTIVE;
+
struct SequencedTask : public TrackingInfo {
SequencedTask()
: sequence_token_id(0),
@@ -354,6 +370,10 @@ class SequencedWorkerPool::Inner {
CLEANUP_DONE,
};
+ // Helper used by PostTask() to complete the work when redirection is on.
+ // Coalesce upon resolution of http://crbug.com/622400.
+ void PostTaskToTaskScheduler(const SequencedTask& sequenced);
+
// Called from within the lock, this converts the given token name into a
// token ID, creating a new one if necessary.
int LockedGetNamedTokenID(const std::string& name);
@@ -402,7 +422,7 @@ class SequencedWorkerPool::Inner {
// 0 or more. The caller should then call FinishStartingAdditionalThread to
// complete initialization once the lock is released.
//
- // If another thread is not necessary, returne 0;
+ // If another thread is not necessary, return 0;
//
// See the implementedion for more.
int PrepareToStartAdditionalThreadIfHelpful();
@@ -502,10 +522,23 @@ class SequencedWorkerPool::Inner {
TestingObserver* const testing_observer_;
+ // Members below are used for the experimental redirection to TaskScheduler.
+ // TODO(gab): Remove these if http://crbug.com/622400 fails
+ // (SequencedWorkerPool will be phased out completely otherwise).
+
// The TaskPriority to be used for SequencedWorkerPool tasks redirected to the
// TaskScheduler as an experiment (unused otherwise).
const base::TaskPriority task_priority_;
+ // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
+ // SequencedWorkerPool usage to the TaskScheduler.
+ std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_;
+
+ // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
+ // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
+ // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread().
+ mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_;
+
DISALLOW_COPY_AND_ASSIGN(Inner);
};
@@ -519,6 +552,7 @@ SequencedWorkerPool::Worker::Worker(
worker_pool_(std::move(worker_pool)),
task_shutdown_behavior_(BLOCK_SHUTDOWN),
is_processing_task_(false) {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
Start();
}
@@ -526,6 +560,8 @@ SequencedWorkerPool::Worker::~Worker() {
}
void SequencedWorkerPool::Worker::Run() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
#if defined(OS_WIN)
win::ScopedCOMInitializer com_initializer;
#endif
@@ -634,6 +670,7 @@ bool SequencedWorkerPool::Inner::PostTask(
int create_thread_id = 0;
{
AutoLock lock(lock_);
+
danakj 2016/08/15 22:05:49 (you added whitespace here as a result)
gab 2016/09/07 15:00:10 True, but I kind of think it's cleaner that way as
if (shutdown_called_) {
// Don't allow a new task to be posted if it doesn't block shutdown.
if (shutdown_behavior != BLOCK_SHUTDOWN)
@@ -669,36 +706,140 @@ bool SequencedWorkerPool::Inner::PostTask(
if (optional_token_name)
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
- pending_tasks_.insert(sequenced);
- if (shutdown_behavior == BLOCK_SHUTDOWN)
- blocking_shutdown_pending_task_count_++;
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ PostTaskToTaskScheduler(sequenced);
+ } else {
+ pending_tasks_.insert(sequenced);
+
+ if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
+ blocking_shutdown_pending_task_count_++;
+
+ create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
+ }
+ }
- create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
+ if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ // Actually start the additional thread or signal an existing one outside
+ // the lock.
+ if (create_thread_id)
+ FinishStartingAdditionalThread(create_thread_id);
+ else
+ SignalHasWork();
}
- // Actually start the additional thread or signal an existing one now that
- // we're outside the lock.
- if (create_thread_id)
- FinishStartingAdditionalThread(create_thread_id);
- else
- SignalHasWork();
+#if DCHECK_IS_ON()
+ {
+ AutoLock lock_for_dcheck(lock_);
+ // Some variables are exposed in both modes for convenience but only really
+ // intended for one of them at runtime, confirm exclusive usage here.
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ DCHECK(pending_tasks_.empty());
+ DCHECK_EQ(0, create_thread_id);
+ } else {
+ DCHECK(sequenced_task_runner_map_.empty());
+ }
+ }
+#endif // DCHECK_IS_ON()
return true;
}
+void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
+ const SequencedTask& sequenced) {
fdoray 2016/08/25 15:14:01 This method doesn't honor delays :(
gab 2016/09/07 15:00:10 Oops, looks like you're addressing that in https:/
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
+ lock_.AssertAcquired();
+
+ // Confirm that the TaskScheduler's shutdown behaviors use the same
+ // underlying values as SequencedWorkerPool.
+ static_assert(
+ static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
+ static_cast<int>(CONTINUE_ON_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "CONTINUE_ON_SHUTDOWN.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
+ static_cast<int>(SKIP_ON_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "SKIP_ON_SHUTDOWN.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
+ static_cast<int>(BLOCK_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "BLOCK_SHUTDOWN.");
+
+ const TaskShutdownBehavior task_shutdown_behavior =
+ static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
+ const TaskTraits pool_traits =
+ TaskTraits()
+ .WithFileIO()
+ .WithPriority(task_priority_)
+ .WithShutdownBehavior(task_shutdown_behavior);
fdoray 2016/08/25 15:14:01 SequencedWorkerPool allows tasks with different sh
gab 2016/09/07 15:00:10 Yes, let's do that (seems like that's what you're
+
+ // Find or create the TaskScheduler TaskRunner to redirect this task to if
+ // it is posted to a specific sequence.
+ scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr;
+ if (sequenced.sequence_token_id) {
+ sequenced_task_runner =
+ &sequenced_task_runner_map_[sequenced.sequence_token_id];
+ if (!*sequenced_task_runner) {
+ const ExecutionMode execution_mode =
+ max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
+ : ExecutionMode::SEQUENCED;
fdoray 2016/08/25 15:14:01 Should we DCHECK_LE(sequenced_task_runner_map_.siz
gab 2016/09/07 15:00:10 Good point, I think this is the case in practice s
+ *sequenced_task_runner =
+ CreateTaskRunnerWithTraits(pool_traits, execution_mode);
+ }
+ }
+
+ if (sequenced_task_runner) {
+ (*sequenced_task_runner)
+ ->PostTask(sequenced.posted_from, sequenced.task);
+ } else {
+ // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
+ // however a few pools that use only one thread and therefore can currently
+ // legitimatelly assume thread affinity despite using SequencedWorkerPool.
+ // Such pools typically only give access to their TaskRunner which will be
+ // SINGLE_THREADED per nature of the pool having only one thread but this
+ // DCHECK ensures no such pools use SequencedWorkerPool::PostTask()
+ // directly.
+ DCHECK_GT(max_threads_, 1U);
+ base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
+ sequenced.task);
+ }
+}
+
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
AutoLock lock(lock_);
- return ContainsKey(threads_, PlatformThread::CurrentId());
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (!runs_tasks_on_verifier_) {
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
+ TaskTraits().WithFileIO().WithPriority(task_priority_),
+ ExecutionMode::PARALLEL);
fdoray 2016/08/25 15:14:01 Should we have a special case for single-threaded
gab 2016/09/07 15:00:10 Yes, looks like you got this too :-)
+ }
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
+ } else {
+ return ContainsKey(threads_, PlatformThread::CurrentId());
+ }
}
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
SequenceToken sequence_token) const {
AutoLock lock(lock_);
- ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
- if (found == threads_.end())
- return false;
- return found->second->is_processing_task() &&
- sequence_token.Equals(found->second->task_sequence_token());
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ // TODO(gab): This currently only verifies that the current thread is a
+ // thread on which a task bound to |sequence_token| *could* run, but it
+ // doesn't verify that the current is *currently running* a task bound to
+ // |sequence_token|.
+ const auto sequenced_task_runner_it =
+ sequenced_task_runner_map_.find(sequence_token.id_);
+ return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
+ sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
+ } else {
+ ThreadMap::const_iterator found =
+ threads_.find(PlatformThread::CurrentId());
+ if (found == threads_.end())
+ return false;
+ return found->second->is_processing_task() &&
+ sequence_token.Equals(found->second->task_sequence_token());
+ }
}
// See https://code.google.com/p/chromium/issues/detail?id=168415
@@ -732,6 +873,10 @@ void SequencedWorkerPool::Inner::Shutdown(
if (shutdown_called_)
return;
shutdown_called_ = true;
+
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED)
+ return;
+
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
// Tickle the threads. This will wake up a waiting one so it will know that
@@ -771,6 +916,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
{
AutoLock lock(lock_);
DCHECK(thread_being_created_);
@@ -905,6 +1051,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
}
void SequencedWorkerPool::Inner::HandleCleanup() {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
if (cleanup_state_ == CLEANUP_DONE)
return;
@@ -970,6 +1118,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
TimeDelta* wait_time,
std::vector<Closure>* delete_these_outside_lock) {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
// Find the next task with a sequence token that's not currently in use.
@@ -1056,6 +1206,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
}
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
// Mark the task's sequence number as in use.
@@ -1087,6 +1239,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
}
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
@@ -1100,6 +1254,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
int sequence_token_id) const {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
lock_.AssertAcquired();
return !sequence_token_id ||
current_sequences_.find(sequence_token_id) ==
@@ -1107,6 +1263,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
lock_.AssertAcquired();
// How thread creation works:
//
@@ -1157,15 +1315,24 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
int thread_number) {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
// Called outside of the lock.
DCHECK_GT(thread_number, 0);
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED) {
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
+ g_all_pools_state = AllPoolsState::WORKER_CREATED;
+ }
+
// The worker is assigned to the list when the thread actually starts, which
// will manage the memory of the pointer.
new Worker(worker_pool_, thread_number, thread_name_prefix_);
}
void SequencedWorkerPool::Inner::SignalHasWork() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
has_work_cv_.Signal();
if (testing_observer_) {
testing_observer_->OnHasWork();
@@ -1173,6 +1340,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() {
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
return !thread_being_created_ &&
@@ -1209,6 +1377,18 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
return worker->worker_pool();
}
+// static
+void SequencedWorkerPool::
+ RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
+ DCHECK(TaskScheduler::GetInstance());
+ // Hitting this DCHECK indicates that a task was posted to a
+ // SequencedWorkerPool before the TaskScheduler was initialized and
+ // redirected, posting task to SequencedWorkerPools needs to at least be
+ // delayed until after that point.
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
+ g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
+}
+
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix,
base::TaskPriority task_priority)
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698