Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3408)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 2241703002: Add an experiment to redirect SequencedWorkerPool tasks to TaskScheduler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b3_delay_metrics_blocking
Patch Set: Split PostTask into helpers and add bug comments. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index d8d10f34e9f2998d79ba601a46fa6dfe0a4fdab4..680cec102ee080d68dbd376cd216772178d847bb 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -25,6 +25,8 @@
#include "base/strings/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
+#include "base/task_scheduler/post_task.h"
+#include "base/task_scheduler/task_scheduler.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_local.h"
@@ -50,6 +52,20 @@ namespace base {
namespace {
+// An enum representing the state of all pools. Any given process should only
+// ever transition from NONE_ACTIVE to the active states, transitions between
+// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition
+// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
+// and the WORKER_CREATED transition occurs when a Worker needs to be created
+// because the first task was posted and the state is still NONE_ACTIVE.
+// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
+// will be phased out completely otherwise).
+enum class AllPoolsState {
+ NONE_ACTIVE,
+ WORKER_CREATED,
+ REDIRECTED_TO_TASK_SCHEDULER,
+} g_all_pools_state = AllPoolsState::NONE_ACTIVE;
+
struct SequencedTask : public TrackingInfo {
SequencedTask()
: sequence_token_id(0),
@@ -354,6 +370,11 @@ class SequencedWorkerPool::Inner {
CLEANUP_DONE,
};
+ // Helpers used by PostTask() to complete the work depending on whether
+ // redirection is on. Coalesce upon resolution of http://crbug.com/622400.
+ void PostTaskToTaskScheduler(const SequencedTask& sequenced);
+ void PostTaskToPool(const SequencedTask& sequenced);
+
// Called from within the lock, this converts the given token name into a
// token ID, creating a new one if necessary.
int LockedGetNamedTokenID(const std::string& name);
@@ -502,10 +523,23 @@ class SequencedWorkerPool::Inner {
TestingObserver* const testing_observer_;
+ // Members below are used for the experimental redirection to TaskScheduler.
+ // TODO(gab): Remove these if http://crbug.com/622400 fails
+ // (SequencedWorkerPool will be phased out completely otherwise).
+
// The TaskPriority to be used for SequencedWorkerPool tasks redirected to the
// TaskScheduler as an experiment (unused otherwise).
const base::TaskPriority task_priority_;
+ // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
+ // SequencedWorkerPool usage to the TaskScheduler.
+ std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_;
+
+ // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
+ // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
+ // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread().
+ mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_;
+
DISALLOW_COPY_AND_ASSIGN(Inner);
};
@@ -519,6 +553,7 @@ SequencedWorkerPool::Worker::Worker(
worker_pool_(std::move(worker_pool)),
task_shutdown_behavior_(BLOCK_SHUTDOWN),
is_processing_task_(false) {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
Start();
}
@@ -526,6 +561,8 @@ SequencedWorkerPool::Worker::~Worker() {
}
void SequencedWorkerPool::Worker::Run() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
danakj 2016/08/15 21:02:52 Run is happening before RedirectSequencedWorkerPoo
gab 2016/08/15 21:32:44 No, this documents that Run() should never happen
+
#if defined(OS_WIN)
win::ScopedCOMInitializer com_initializer;
#endif
@@ -631,74 +668,180 @@ bool SequencedWorkerPool::Inner::PostTask(
base::MakeCriticalClosure(task) : task;
sequenced.time_to_run = TimeTicks::Now() + delay;
- int create_thread_id = 0;
- {
- AutoLock lock(lock_);
- if (shutdown_called_) {
- // Don't allow a new task to be posted if it doesn't block shutdown.
- if (shutdown_behavior != BLOCK_SHUTDOWN)
- return false;
-
- // If the current thread is running a task, and that task doesn't block
- // shutdown, then it shouldn't be allowed to post any more tasks.
- ThreadMap::const_iterator found =
- threads_.find(PlatformThread::CurrentId());
- if (found != threads_.end() && found->second->is_processing_task() &&
- found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
- return false;
- }
+ AutoLock lock(lock_);
- if (max_blocking_tasks_after_shutdown_ <= 0) {
- DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
- return false;
- }
- max_blocking_tasks_after_shutdown_ -= 1;
+ if (shutdown_called_) {
+ // Don't allow a new task to be posted if it doesn't block shutdown.
+ if (shutdown_behavior != BLOCK_SHUTDOWN)
+ return false;
+
+ // If the current thread is running a task, and that task doesn't block
+ // shutdown, then it shouldn't be allowed to post any more tasks.
+ ThreadMap::const_iterator found =
+ threads_.find(PlatformThread::CurrentId());
+ if (found != threads_.end() && found->second->is_processing_task() &&
+ found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
+ return false;
}
- // The trace_id is used for identifying the task in about:tracing.
- sequenced.trace_id = trace_id_++;
+ if (max_blocking_tasks_after_shutdown_ <= 0) {
+ DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
+ return false;
+ }
+ max_blocking_tasks_after_shutdown_ -= 1;
+ }
- TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
- "SequencedWorkerPool::Inner::PostTask",
- TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
- TRACE_EVENT_FLAG_FLOW_OUT);
+ // The trace_id is used for identifying the task in about:tracing.
+ sequenced.trace_id = trace_id_++;
- sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
+ TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
+ "SequencedWorkerPool::Inner::PostTask",
+ TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
+ TRACE_EVENT_FLAG_FLOW_OUT);
- // Now that we have the lock, apply the named token rules.
- if (optional_token_name)
- sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
+ sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
- pending_tasks_.insert(sequenced);
- if (shutdown_behavior == BLOCK_SHUTDOWN)
- blocking_shutdown_pending_task_count_++;
+ // Now that we have the lock, apply the named token rules.
+ if (optional_token_name)
+ sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
- create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ PostTaskToTaskScheduler(sequenced);
+ } else {
+ PostTaskToPool(sequenced);
}
- // Actually start the additional thread or signal an existing one now that
- // we're outside the lock.
- if (create_thread_id)
- FinishStartingAdditionalThread(create_thread_id);
+ // Some variables are exposed in both modes for convenience but only really
+ // intended for one of them at runtime, confirm exclusive usage here.
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER)
+ DCHECK(pending_tasks_.empty());
else
- SignalHasWork();
+ DCHECK(sequenced_task_runner_map_.empty());
return true;
}
+void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
+ const SequencedTask& sequenced) {
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
+ lock_.AssertAcquired();
+
+ // Confirm that the TaskScheduler's shutdown behaviors use the same
+ // underlying values as SequencedWorkerPool.
+ static_assert(
+ static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
+ static_cast<int>(CONTINUE_ON_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "CONTINUE_ON_SHUTDOWN.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
+ static_cast<int>(SKIP_ON_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "SKIP_ON_SHUTDOWN.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
+ static_cast<int>(BLOCK_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "BLOCK_SHUTDOWN.");
+
+ const TaskShutdownBehavior task_shutdown_behavior =
+ static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
+ const TaskTraits pool_traits =
+ TaskTraits()
+ .WithFileIO()
+ .WithPriority(task_priority_)
+ .WithShutdownBehavior(task_shutdown_behavior);
+
+ // Find or create the TaskScheduler TaskRunner to redirect this task to if
+ // it is posted to a specific sequence.
+ scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr;
+ if (sequenced.sequence_token_id) {
+ sequenced_task_runner =
+ &sequenced_task_runner_map_[sequenced.sequence_token_id];
+ if (!*sequenced_task_runner) {
+ const ExecutionMode execution_mode =
+ max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
+ : ExecutionMode::SEQUENCED;
+ *sequenced_task_runner =
+ CreateTaskRunnerWithTraits(pool_traits, execution_mode);
+ }
+ }
+
+ if (sequenced_task_runner) {
+ (*sequenced_task_runner)
+ ->PostTask(sequenced.posted_from, sequenced.task);
+ } else {
+ // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
+ // however a few pools that use only one thread and therefore can
danakj 2016/08/15 21:02:52 nit: "few pools" extra space
gab 2016/08/15 21:32:44 Done.
+ // currently legitimatelly assuming thread affinity despite using
danakj 2016/08/15 21:02:52 "legitimately assume"
gab 2016/08/15 21:32:44 Done.
+ // SequencedWorkerPool. Such pools typically only give access to their
+ // TaskRunner which will be SINGLE_THREADED per nature of the pool
+ // having only one thread but this DCHECK ensures no such pools use
+ // SequencedWorkerPool::PostTask() directly.
+ DCHECK_GT(max_threads_, 1U);
+ base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
+ sequenced.task);
+ }
+}
+
+void SequencedWorkerPool::Inner::PostTaskToPool(
+ const SequencedTask& sequenced) {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
+ lock_.AssertAcquired();
+
+ pending_tasks_.insert(sequenced);
+
+ if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
+ blocking_shutdown_pending_task_count_++;
+
+ int create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
+
+ {
+ AutoUnlock unlock(lock_);
brettw 2016/08/15 21:02:21 Sorry to add another pass here. I'm really relucta
danakj 2016/08/15 21:02:52 hm, this will relock the lock when you leave the b
gab 2016/08/15 21:32:44 Re-inlined PostTaskToPool() -- as without this sec
gab 2016/08/15 21:32:44 N/A after re-inlining PostTaskToPool per Brett's c
+
+ // Actually start the additional thread or signal an existing one outside
+ // the lock.
+ if (create_thread_id)
+ FinishStartingAdditionalThread(create_thread_id);
+ else
+ SignalHasWork();
+ }
+}
+
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
- AutoLock lock(lock_);
- return ContainsKey(threads_, PlatformThread::CurrentId());
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (!runs_tasks_on_verifier_) {
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
+ TaskTraits().WithFileIO().WithPriority(task_priority_),
+ ExecutionMode::PARALLEL);
+ }
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
+ } else {
+ AutoLock lock(lock_);
+ return ContainsKey(threads_, PlatformThread::CurrentId());
+ }
}
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
SequenceToken sequence_token) const {
- AutoLock lock(lock_);
- ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
- if (found == threads_.end())
- return false;
- return found->second->is_processing_task() &&
- sequence_token.Equals(found->second->task_sequence_token());
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ // TODO(gab): This currently only verifies that the current thread is a
+ // thread on which a task bound to |sequence_token| *could* run, but it
+ // doesn't verify that the current is *currently running* a task bound to
+ // |sequence_token|.
+ const auto sequenced_task_runner_it =
+ sequenced_task_runner_map_.find(sequence_token.id_);
+ return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
+ sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
+ } else {
+ AutoLock lock(lock_);
+ ThreadMap::const_iterator found =
+ threads_.find(PlatformThread::CurrentId());
+ if (found == threads_.end())
+ return false;
+ return found->second->is_processing_task() &&
+ sequence_token.Equals(found->second->task_sequence_token());
+ }
}
// See https://code.google.com/p/chromium/issues/detail?id=168415
@@ -732,6 +875,10 @@ void SequencedWorkerPool::Inner::Shutdown(
if (shutdown_called_)
return;
shutdown_called_ = true;
+
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED)
+ return;
+
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
// Tickle the threads. This will wake up a waiting one so it will know that
@@ -771,6 +918,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
danakj 2016/08/15 21:02:52 Why are some of these != REDIRECTED instead of ==
gab 2016/08/15 21:32:44 We never explicitly set POOL (a.k.a. WORKER_CREATE
danakj 2016/08/15 22:05:49 Hm ok. I would personally prefer == checks wheneve
gab 2016/08/20 15:21:31 Unless the first Worker is created while only the
{
AutoLock lock(lock_);
DCHECK(thread_being_created_);
@@ -905,6 +1053,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
}
void SequencedWorkerPool::Inner::HandleCleanup() {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
if (cleanup_state_ == CLEANUP_DONE)
return;
@@ -970,6 +1120,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
TimeDelta* wait_time,
std::vector<Closure>* delete_these_outside_lock) {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
// Find the next task with a sequence token that's not currently in use.
@@ -1056,6 +1208,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
}
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
// Mark the task's sequence number as in use.
@@ -1087,6 +1241,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
}
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
+
lock_.AssertAcquired();
if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
@@ -1100,6 +1256,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
int sequence_token_id) const {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
lock_.AssertAcquired();
return !sequence_token_id ||
current_sequences_.find(sequence_token_id) ==
@@ -1107,6 +1265,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
lock_.AssertAcquired();
// How thread creation works:
//
@@ -1157,15 +1317,24 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
int thread_number) {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
// Called outside of the lock.
DCHECK_GT(thread_number, 0);
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED) {
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
+ g_all_pools_state = AllPoolsState::WORKER_CREATED;
+ }
+
// The worker is assigned to the list when the thread actually starts, which
// will manage the memory of the pointer.
new Worker(worker_pool_, thread_number, thread_name_prefix_);
}
void SequencedWorkerPool::Inner::SignalHasWork() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
has_work_cv_.Signal();
if (testing_observer_) {
testing_observer_->OnHasWork();
@@ -1173,6 +1342,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() {
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
return !thread_being_created_ &&
@@ -1209,6 +1379,18 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
return worker->worker_pool();
}
+// static
+void SequencedWorkerPool::
+ RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
+ DCHECK(TaskScheduler::GetInstance());
+ // Hitting this DCHECK indicates that a task was posted to a
+ // SequencedWorkerPool before the TaskScheduler was initialized and
+ // redirected, posting task to SequencedWorkerPools needs to at least be
+ // delayed until after that point.
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
+ g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
+}
+
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix,
base::TaskPriority task_priority)
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698