Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
| index ef6047ef478411350d64469fa62d2ca7e9295dc2..13323d776339183a68aa313fd67edbf28f67c189 100644 |
| --- a/base/threading/sequenced_worker_pool.cc |
| +++ b/base/threading/sequenced_worker_pool.cc |
| @@ -25,6 +25,8 @@ |
| #include "base/strings/stringprintf.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/synchronization/lock.h" |
| +#include "base/task_scheduler/post_task.h" |
| +#include "base/task_scheduler/task_scheduler.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/simple_thread.h" |
| #include "base/threading/thread_local.h" |
| @@ -50,6 +52,20 @@ namespace base { |
| namespace { |
| +// An enum representing the state of all pools. Any given process should only |
| +// ever transition from NONE_ACTIVE to the active states, transitions between |
| +// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
| +// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
| +// and the WORKER_CREATED transition occurs when a Worker needs to be created |
| +// because the first task was posted and the state is still NONE_ACTIVE. |
| +// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
| +// will be phased out completely otherwise). |
| +enum class AllPoolsState { |
| + NONE_ACTIVE, |
| + WORKER_CREATED, |
| + REDIRECTED_TO_TASK_SCHEDULER, |
| +} g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
| + |
| struct SequencedTask : public TrackingInfo { |
| SequencedTask() |
| : sequence_token_id(0), |
| @@ -354,6 +370,10 @@ class SequencedWorkerPool::Inner { |
| CLEANUP_DONE, |
| }; |
| + // Helper used by PostTask() to complete the work when redirection is on. |
| + // Coalesce upon resolution of http://crbug.com/622400. |
| + void PostTaskToTaskScheduler(const SequencedTask& sequenced); |
| + |
| // Called from within the lock, this converts the given token name into a |
| // token ID, creating a new one if necessary. |
| int LockedGetNamedTokenID(const std::string& name); |
| @@ -402,7 +422,7 @@ class SequencedWorkerPool::Inner { |
| // 0 or more. The caller should then call FinishStartingAdditionalThread to |
| // complete initialization once the lock is released. |
| // |
| - // If another thread is not necessary, returne 0; |
| + // If another thread is not necessary, return 0; |
| // |
| // See the implementedion for more. |
| int PrepareToStartAdditionalThreadIfHelpful(); |
| @@ -502,10 +522,23 @@ class SequencedWorkerPool::Inner { |
| TestingObserver* const testing_observer_; |
| + // Members below are used for the experimental redirection to TaskScheduler. |
| + // TODO(gab): Remove these if http://crbug.com/622400 fails |
| + // (SequencedWorkerPool will be phased out completely otherwise). |
| + |
| // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the |
| // TaskScheduler as an experiment (unused otherwise). |
| const base::TaskPriority task_priority_; |
| + // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
| + // SequencedWorkerPool usage to the TaskScheduler. |
| + std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
| + |
| + // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
| + // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
| + // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). |
| + mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(Inner); |
| }; |
| @@ -519,6 +552,7 @@ SequencedWorkerPool::Worker::Worker( |
| worker_pool_(std::move(worker_pool)), |
| task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| is_processing_task_(false) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| Start(); |
| } |
| @@ -526,6 +560,8 @@ SequencedWorkerPool::Worker::~Worker() { |
| } |
| void SequencedWorkerPool::Worker::Run() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| #if defined(OS_WIN) |
| win::ScopedCOMInitializer com_initializer; |
| #endif |
| @@ -634,6 +670,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
| int create_thread_id = 0; |
| { |
| AutoLock lock(lock_); |
| + |
|
danakj
2016/08/15 22:05:49
(you added whitespace here as a result)
gab
2016/09/07 15:00:10
True, but I kind of think it's cleaner that way as
|
| if (shutdown_called_) { |
| // Don't allow a new task to be posted if it doesn't block shutdown. |
| if (shutdown_behavior != BLOCK_SHUTDOWN) |
| @@ -669,36 +706,140 @@ bool SequencedWorkerPool::Inner::PostTask( |
| if (optional_token_name) |
| sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| - pending_tasks_.insert(sequenced); |
| - if (shutdown_behavior == BLOCK_SHUTDOWN) |
| - blocking_shutdown_pending_task_count_++; |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + PostTaskToTaskScheduler(sequenced); |
| + } else { |
| + pending_tasks_.insert(sequenced); |
| + |
| + if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
| + blocking_shutdown_pending_task_count_++; |
| + |
| + create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| + } |
| + } |
| - create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| + if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // Actually start the additional thread or signal an existing one outside |
| + // the lock. |
| + if (create_thread_id) |
| + FinishStartingAdditionalThread(create_thread_id); |
| + else |
| + SignalHasWork(); |
| } |
| - // Actually start the additional thread or signal an existing one now that |
| - // we're outside the lock. |
| - if (create_thread_id) |
| - FinishStartingAdditionalThread(create_thread_id); |
| - else |
| - SignalHasWork(); |
| +#if DCHECK_IS_ON() |
| + { |
| + AutoLock lock_for_dcheck(lock_); |
| + // Some variables are exposed in both modes for convenience but only really |
| + // intended for one of them at runtime, confirm exclusive usage here. |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + DCHECK(pending_tasks_.empty()); |
| + DCHECK_EQ(0, create_thread_id); |
| + } else { |
| + DCHECK(sequenced_task_runner_map_.empty()); |
| + } |
| + } |
| +#endif // DCHECK_IS_ON() |
| return true; |
| } |
| +void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| + const SequencedTask& sequenced) { |
|
fdoray
2016/08/25 15:14:01
This method doesn't honor delays :(
gab
2016/09/07 15:00:10
Oops, looks like you're addressing that in https:/
|
| + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| + lock_.AssertAcquired(); |
| + |
| + // Confirm that the TaskScheduler's shutdown behaviors use the same |
| + // underlying values as SequencedWorkerPool. |
| + static_assert( |
| + static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
| + static_cast<int>(CONTINUE_ON_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "CONTINUE_ON_SHUTDOWN."); |
| + static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == |
| + static_cast<int>(SKIP_ON_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "SKIP_ON_SHUTDOWN."); |
| + static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == |
| + static_cast<int>(BLOCK_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "BLOCK_SHUTDOWN."); |
| + |
| + const TaskShutdownBehavior task_shutdown_behavior = |
| + static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); |
| + const TaskTraits pool_traits = |
| + TaskTraits() |
| + .WithFileIO() |
| + .WithPriority(task_priority_) |
| + .WithShutdownBehavior(task_shutdown_behavior); |
|
fdoray
2016/08/25 15:14:01
SequencedWorkerPool allows tasks with different sh
gab
2016/09/07 15:00:10
Yes, let's do that (seems like that's what you're
|
| + |
| + // Find or create the TaskScheduler TaskRunner to redirect this task to if |
| + // it is posted to a specific sequence. |
| + scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
| + if (sequenced.sequence_token_id) { |
| + sequenced_task_runner = |
| + &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
| + if (!*sequenced_task_runner) { |
| + const ExecutionMode execution_mode = |
| + max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
| + : ExecutionMode::SEQUENCED; |
|
fdoray
2016/08/25 15:14:01
Should we DCHECK_LE(sequenced_task_runner_map_.siz
gab
2016/09/07 15:00:10
Good point, I think this is the case in practice s
|
| + *sequenced_task_runner = |
| + CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
| + } |
| + } |
| + |
| + if (sequenced_task_runner) { |
| + (*sequenced_task_runner) |
| + ->PostTask(sequenced.posted_from, sequenced.task); |
| + } else { |
| + // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
| + // however a few pools that use only one thread and therefore can currently |
| + // legitimatelly assume thread affinity despite using SequencedWorkerPool. |
| + // Such pools typically only give access to their TaskRunner which will be |
| + // SINGLE_THREADED per nature of the pool having only one thread but this |
| + // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() |
| + // directly. |
| + DCHECK_GT(max_threads_, 1U); |
| + base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
| + sequenced.task); |
| + } |
| +} |
| + |
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| AutoLock lock(lock_); |
| - return ContainsKey(threads_, PlatformThread::CurrentId()); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (!runs_tasks_on_verifier_) { |
| + runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| + TaskTraits().WithFileIO().WithPriority(task_priority_), |
| + ExecutionMode::PARALLEL); |
|
fdoray
2016/08/25 15:14:01
Should we have a special case for single-threaded
gab
2016/09/07 15:00:10
Yes, looks like you got this too :-)
|
| + } |
| + return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
| + } else { |
| + return ContainsKey(threads_, PlatformThread::CurrentId()); |
| + } |
| } |
| bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| SequenceToken sequence_token) const { |
| AutoLock lock(lock_); |
| - ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| - if (found == threads_.end()) |
| - return false; |
| - return found->second->is_processing_task() && |
| - sequence_token.Equals(found->second->task_sequence_token()); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // TODO(gab): This currently only verifies that the current thread is a |
| + // thread on which a task bound to |sequence_token| *could* run, but it |
| + // doesn't verify that the current is *currently running* a task bound to |
| + // |sequence_token|. |
| + const auto sequenced_task_runner_it = |
| + sequenced_task_runner_map_.find(sequence_token.id_); |
| + return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
| + sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
| + } else { |
| + ThreadMap::const_iterator found = |
| + threads_.find(PlatformThread::CurrentId()); |
| + if (found == threads_.end()) |
| + return false; |
| + return found->second->is_processing_task() && |
| + sequence_token.Equals(found->second->task_sequence_token()); |
| + } |
| } |
| // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| @@ -732,6 +873,10 @@ void SequencedWorkerPool::Inner::Shutdown( |
| if (shutdown_called_) |
| return; |
| shutdown_called_ = true; |
| + |
| + if (g_all_pools_state != AllPoolsState::WORKER_CREATED) |
| + return; |
| + |
| max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
| // Tickle the threads. This will wake up a waiting one so it will know that |
| @@ -771,6 +916,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
| } |
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| { |
| AutoLock lock(lock_); |
| DCHECK(thread_being_created_); |
| @@ -905,6 +1051,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| } |
| void SequencedWorkerPool::Inner::HandleCleanup() { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| if (cleanup_state_ == CLEANUP_DONE) |
| return; |
| @@ -970,6 +1118,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| SequencedTask* task, |
| TimeDelta* wait_time, |
| std::vector<Closure>* delete_these_outside_lock) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // Find the next task with a sequence token that's not currently in use. |
| @@ -1056,6 +1206,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| } |
| int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // Mark the task's sequence number as in use. |
| @@ -1087,6 +1239,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| } |
| void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
| @@ -1100,6 +1254,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| int sequence_token_id) const { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| return !sequence_token_id || |
| current_sequences_.find(sequence_token_id) == |
| @@ -1107,6 +1263,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| } |
| int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // How thread creation works: |
| // |
| @@ -1157,15 +1315,24 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| int thread_number) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| // Called outside of the lock. |
| DCHECK_GT(thread_number, 0); |
| + if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { |
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
| + g_all_pools_state = AllPoolsState::WORKER_CREATED; |
| + } |
| + |
| // The worker is assigned to the list when the thread actually starts, which |
| // will manage the memory of the pointer. |
| new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| } |
| void SequencedWorkerPool::Inner::SignalHasWork() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| has_work_cv_.Signal(); |
| if (testing_observer_) { |
| testing_observer_->OnHasWork(); |
| @@ -1173,6 +1340,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
| } |
| bool SequencedWorkerPool::Inner::CanShutdown() const { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| lock_.AssertAcquired(); |
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| return !thread_being_created_ && |
| @@ -1209,6 +1377,18 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| return worker->worker_pool(); |
| } |
| +// static |
| +void SequencedWorkerPool:: |
| + RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
| + DCHECK(TaskScheduler::GetInstance()); |
| + // Hitting this DCHECK indicates that a task was posted to a |
| + // SequencedWorkerPool before the TaskScheduler was initialized and |
| + // redirected, posting task to SequencedWorkerPools needs to at least be |
| + // delayed until after that point. |
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
| + g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
| +} |
| + |
| SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| const std::string& thread_name_prefix, |
| base::TaskPriority task_priority) |