Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
| index d8d10f34e9f2998d79ba601a46fa6dfe0a4fdab4..e0bd2b7f3836af8e0c334a096857bec5254f1adb 100644 |
| --- a/base/threading/sequenced_worker_pool.cc |
| +++ b/base/threading/sequenced_worker_pool.cc |
| @@ -25,6 +25,8 @@ |
| #include "base/strings/stringprintf.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/synchronization/lock.h" |
| +#include "base/task_scheduler/post_task.h" |
| +#include "base/task_scheduler/task_scheduler.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/simple_thread.h" |
| #include "base/threading/thread_local.h" |
| @@ -50,6 +52,18 @@ namespace base { |
| namespace { |
| +// An enum representing the state of all pools. Any given process should only |
| +// ever transition from NONE_ACTIVE to the active states, transitions between |
| +// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
| +// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
| +// and the WORKER_CREATED transition occurs when a Worker needs to be created |
| +// because the first task was posted and the state is still NONE_ACTIVE. |
|
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:45
Done.
|
| +enum class AllPoolsState { |
| + NONE_ACTIVE, |
| + WORKER_CREATED, |
| + REDIRECTED_TO_TASK_SCHEDULER, |
| +} g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
| + |
| struct SequencedTask : public TrackingInfo { |
| SequencedTask() |
| : sequence_token_id(0), |
| @@ -502,10 +516,21 @@ class SequencedWorkerPool::Inner { |
| TestingObserver* const testing_observer_; |
| + /* Members used for the experimental redirection to TaskScheduler. */ |
|
brettw
2016/08/12 17:44:39
Can this be a normal // comment?
gab
2016/08/12 23:27:46
Sure, a /* */ comment felt more like a split from
|
| + |
| // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the |
| // TaskScheduler as an experiment (unused otherwise). |
| const base::TaskPriority task_priority_; |
| + // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
|
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Done.
|
| + // SequencedWorkerPool usage to the TaskScheduler. |
| + std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
| + |
| + // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
|
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Done.
|
| + // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
| + // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). |
| + mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(Inner); |
| }; |
| @@ -519,6 +544,7 @@ SequencedWorkerPool::Worker::Worker( |
| worker_pool_(std::move(worker_pool)), |
| task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| is_processing_task_(false) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| Start(); |
| } |
| @@ -526,6 +552,8 @@ SequencedWorkerPool::Worker::~Worker() { |
| } |
| void SequencedWorkerPool::Worker::Run() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| #if defined(OS_WIN) |
| win::ScopedCOMInitializer com_initializer; |
| #endif |
| @@ -669,36 +697,126 @@ bool SequencedWorkerPool::Inner::PostTask( |
| if (optional_token_name) |
| sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| - pending_tasks_.insert(sequenced); |
| - if (shutdown_behavior == BLOCK_SHUTDOWN) |
| - blocking_shutdown_pending_task_count_++; |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // Confirm that the TaskScheduler's shutdown behaviors use the same |
|
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Doesn't |g_all_pools_state| cover this?
|
| + // underlying values as SequencedWorkerPool. |
| + static_assert( |
| + static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
| + static_cast<int>(CONTINUE_ON_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "CONTINUE_ON_SHUTDOWN."); |
| + static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == |
| + static_cast<int>(SKIP_ON_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "SKIP_ON_SHUTDOWN."); |
| + static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == |
| + static_cast<int>(BLOCK_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "BLOCK_SHUTDOWN."); |
| + |
| + const TaskShutdownBehavior task_shutdown_behavior = |
| + static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); |
| + const TaskTraits pool_traits = |
| + TaskTraits() |
| + .WithFileIO() |
| + .WithPriority(task_priority_) |
| + .WithShutdownBehavior(task_shutdown_behavior); |
| + |
| + // Find or create the TaskScheduler TaskRunner to redirect this task to if |
| + // it is posted to a specific sequence. |
| + scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
| + if (sequenced.sequence_token_id) { |
| + sequenced_task_runner = |
| + &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
| + if (!*sequenced_task_runner) { |
| + const ExecutionMode execution_mode = |
| + max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
| + : ExecutionMode::SEQUENCED; |
| + *sequenced_task_runner = |
| + CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
| + } |
| + } |
| + |
| + if (sequenced_task_runner) { |
| + (*sequenced_task_runner) |
| + ->PostTask(sequenced.posted_from, sequenced.task); |
| + } else { |
| + // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
| + // however a few pools that use only one thread and therefore can |
| + // currently legitimatelly assuming thread affinity despite using |
| + // SequencedWorkerPool. Such pools typically only give access to their |
| + // TaskRunner which will be SINGLE_THREADED per nature of the pool |
| + // having only one thread but this DCHECK ensures no such pools use |
| + // SequencedWorkerPool::PostTask() directly. |
| + DCHECK_GT(max_threads_, 1U); |
| + base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
| + sequenced.task); |
| + } |
| + } else { // !REDIRECTED_TO_TASK_SCHEDULER |
| + pending_tasks_.insert(sequenced); |
| + |
| + if (shutdown_behavior == BLOCK_SHUTDOWN) |
| + blocking_shutdown_pending_task_count_++; |
| - create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| + create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| + } |
| } |
| - // Actually start the additional thread or signal an existing one now that |
| - // we're outside the lock. |
| - if (create_thread_id) |
| - FinishStartingAdditionalThread(create_thread_id); |
| - else |
| - SignalHasWork(); |
| + // Some variables are exposed in both modes for convenience but only really |
| + // intended for one of them at runtime, confirm exclusive usage here. |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + DCHECK(pending_tasks_.empty()); |
| + DCHECK(!create_thread_id); |
| + } else { |
| + DCHECK(sequenced_task_runner_map_.empty()); |
| + } |
| + |
| + if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // Actually start the additional thread or signal an existing one now that |
| + // we're outside the lock. |
| + if (create_thread_id) |
| + FinishStartingAdditionalThread(create_thread_id); |
| + else |
| + SignalHasWork(); |
| + } |
| return true; |
| } |
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| - AutoLock lock(lock_); |
| - return ContainsKey(threads_, PlatformThread::CurrentId()); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (!runs_tasks_on_verifier_) { |
| + runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| + TaskTraits().WithFileIO().WithPriority(task_priority_), |
| + ExecutionMode::PARALLEL); |
| + } |
| + return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
| + } else { |
| + AutoLock lock(lock_); |
| + return ContainsKey(threads_, PlatformThread::CurrentId()); |
| + } |
| } |
| bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| SequenceToken sequence_token) const { |
| - AutoLock lock(lock_); |
| - ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| - if (found == threads_.end()) |
| - return false; |
| - return found->second->is_processing_task() && |
| - sequence_token.Equals(found->second->task_sequence_token()); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // TODO(gab): This currently only verifies that the current thread is a |
| + // thread on which a task bound to |sequence_token| *could* run, but it |
| + // doesn't verify that the current is *currently running* a task bound to |
| + // |sequence_token|. |
| + const auto sequenced_task_runner_it = |
| + sequenced_task_runner_map_.find(sequence_token.id_); |
| + return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
| + sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
| + } else { |
| + AutoLock lock(lock_); |
| + ThreadMap::const_iterator found = |
| + threads_.find(PlatformThread::CurrentId()); |
| + if (found == threads_.end()) |
| + return false; |
| + return found->second->is_processing_task() && |
| + sequence_token.Equals(found->second->task_sequence_token()); |
| + } |
| } |
| // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| @@ -732,6 +850,10 @@ void SequencedWorkerPool::Inner::Shutdown( |
| if (shutdown_called_) |
| return; |
| shutdown_called_ = true; |
| + |
| + if (g_all_pools_state != AllPoolsState::WORKER_CREATED) |
|
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Doesn't |g_all_pools_state| cover this?
|
| + return; |
| + |
| max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
| // Tickle the threads. This will wake up a waiting one so it will know that |
| @@ -771,6 +893,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
| } |
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| { |
| AutoLock lock(lock_); |
| DCHECK(thread_being_created_); |
| @@ -905,6 +1028,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| } |
| void SequencedWorkerPool::Inner::HandleCleanup() { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| if (cleanup_state_ == CLEANUP_DONE) |
| return; |
| @@ -970,6 +1095,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| SequencedTask* task, |
| TimeDelta* wait_time, |
| std::vector<Closure>* delete_these_outside_lock) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // Find the next task with a sequence token that's not currently in use. |
| @@ -1056,6 +1183,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| } |
| int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // Mark the task's sequence number as in use. |
| @@ -1087,6 +1216,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| } |
| void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
| @@ -1100,6 +1231,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| int sequence_token_id) const { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| return !sequence_token_id || |
| current_sequences_.find(sequence_token_id) == |
| @@ -1107,6 +1240,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| } |
| int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // How thread creation works: |
| // |
| @@ -1157,15 +1292,24 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| int thread_number) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| // Called outside of the lock. |
| DCHECK_GT(thread_number, 0); |
| + if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { |
|
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Doesn't |g_all_pools_state| cover this?
|
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
| + g_all_pools_state = AllPoolsState::WORKER_CREATED; |
| + } |
| + |
| // The worker is assigned to the list when the thread actually starts, which |
| // will manage the memory of the pointer. |
| new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| } |
| void SequencedWorkerPool::Inner::SignalHasWork() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| has_work_cv_.Signal(); |
| if (testing_observer_) { |
| testing_observer_->OnHasWork(); |
| @@ -1173,6 +1317,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
| } |
| bool SequencedWorkerPool::Inner::CanShutdown() const { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| lock_.AssertAcquired(); |
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| return !thread_being_created_ && |
| @@ -1209,6 +1354,18 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| return worker->worker_pool(); |
| } |
| +// static |
| +void SequencedWorkerPool:: |
| + RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
| + DCHECK(TaskScheduler::GetInstance()); |
| + // Hitting this DCHECK indicates that a task was posted to a |
| + // SequencedWorkerPool before the TaskScheduler was initialized and |
| + // redirected, posting task to SequencedWorkerPools needs to at least be |
| + // delayed until after that point. |
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
| + g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
| +} |
| + |
| SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| const std::string& thread_name_prefix, |
| base::TaskPriority task_priority) |