Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
| index d8d10f34e9f2998d79ba601a46fa6dfe0a4fdab4..680cec102ee080d68dbd376cd216772178d847bb 100644 |
| --- a/base/threading/sequenced_worker_pool.cc |
| +++ b/base/threading/sequenced_worker_pool.cc |
| @@ -25,6 +25,8 @@ |
| #include "base/strings/stringprintf.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/synchronization/lock.h" |
| +#include "base/task_scheduler/post_task.h" |
| +#include "base/task_scheduler/task_scheduler.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/simple_thread.h" |
| #include "base/threading/thread_local.h" |
| @@ -50,6 +52,20 @@ namespace base { |
| namespace { |
| +// An enum representing the state of all pools. Any given process should only |
| +// ever transition from NONE_ACTIVE to the active states, transitions between |
| +// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
| +// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
| +// and the WORKER_CREATED transition occurs when a Worker needs to be created |
| +// because the first task was posted and the state is still NONE_ACTIVE. |
| +// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
| +// will be phased out completely otherwise). |
| +enum class AllPoolsState { |
| + NONE_ACTIVE, |
| + WORKER_CREATED, |
| + REDIRECTED_TO_TASK_SCHEDULER, |
| +} g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
| + |
| struct SequencedTask : public TrackingInfo { |
| SequencedTask() |
| : sequence_token_id(0), |
| @@ -354,6 +370,11 @@ class SequencedWorkerPool::Inner { |
| CLEANUP_DONE, |
| }; |
| + // Helpers used by PostTask() to complete the work depending on whether |
| + // redirection is on. Coalesce upon resolution of http://crbug.com/622400. |
| + void PostTaskToTaskScheduler(const SequencedTask& sequenced); |
| + void PostTaskToPool(const SequencedTask& sequenced); |
| + |
| // Called from within the lock, this converts the given token name into a |
| // token ID, creating a new one if necessary. |
| int LockedGetNamedTokenID(const std::string& name); |
| @@ -502,10 +523,23 @@ class SequencedWorkerPool::Inner { |
| TestingObserver* const testing_observer_; |
| + // Members below are used for the experimental redirection to TaskScheduler. |
| + // TODO(gab): Remove these if http://crbug.com/622400 fails |
| + // (SequencedWorkerPool will be phased out completely otherwise). |
| + |
| // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the |
| // TaskScheduler as an experiment (unused otherwise). |
| const base::TaskPriority task_priority_; |
| + // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
| + // SequencedWorkerPool usage to the TaskScheduler. |
| + std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
| + |
| + // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
| + // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
| + // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). |
| + mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(Inner); |
| }; |
| @@ -519,6 +553,7 @@ SequencedWorkerPool::Worker::Worker( |
| worker_pool_(std::move(worker_pool)), |
| task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| is_processing_task_(false) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| Start(); |
| } |
| @@ -526,6 +561,8 @@ SequencedWorkerPool::Worker::~Worker() { |
| } |
| void SequencedWorkerPool::Worker::Run() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
|
danakj
2016/08/15 21:02:52
Run is happening before RedirectSequencedWorkerPoo
gab
2016/08/15 21:32:44
No, this documents that Run() should never happen
|
| + |
| #if defined(OS_WIN) |
| win::ScopedCOMInitializer com_initializer; |
| #endif |
| @@ -631,74 +668,180 @@ bool SequencedWorkerPool::Inner::PostTask( |
| base::MakeCriticalClosure(task) : task; |
| sequenced.time_to_run = TimeTicks::Now() + delay; |
| - int create_thread_id = 0; |
| - { |
| - AutoLock lock(lock_); |
| - if (shutdown_called_) { |
| - // Don't allow a new task to be posted if it doesn't block shutdown. |
| - if (shutdown_behavior != BLOCK_SHUTDOWN) |
| - return false; |
| - |
| - // If the current thread is running a task, and that task doesn't block |
| - // shutdown, then it shouldn't be allowed to post any more tasks. |
| - ThreadMap::const_iterator found = |
| - threads_.find(PlatformThread::CurrentId()); |
| - if (found != threads_.end() && found->second->is_processing_task() && |
| - found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { |
| - return false; |
| - } |
| + AutoLock lock(lock_); |
| - if (max_blocking_tasks_after_shutdown_ <= 0) { |
| - DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; |
| - return false; |
| - } |
| - max_blocking_tasks_after_shutdown_ -= 1; |
| + if (shutdown_called_) { |
| + // Don't allow a new task to be posted if it doesn't block shutdown. |
| + if (shutdown_behavior != BLOCK_SHUTDOWN) |
| + return false; |
| + |
| + // If the current thread is running a task, and that task doesn't block |
| + // shutdown, then it shouldn't be allowed to post any more tasks. |
| + ThreadMap::const_iterator found = |
| + threads_.find(PlatformThread::CurrentId()); |
| + if (found != threads_.end() && found->second->is_processing_task() && |
| + found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { |
| + return false; |
| } |
| - // The trace_id is used for identifying the task in about:tracing. |
| - sequenced.trace_id = trace_id_++; |
| + if (max_blocking_tasks_after_shutdown_ <= 0) { |
| + DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; |
| + return false; |
| + } |
| + max_blocking_tasks_after_shutdown_ -= 1; |
| + } |
| - TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), |
| - "SequencedWorkerPool::Inner::PostTask", |
| - TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
| - TRACE_EVENT_FLAG_FLOW_OUT); |
| + // The trace_id is used for identifying the task in about:tracing. |
| + sequenced.trace_id = trace_id_++; |
| - sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
| + TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), |
| + "SequencedWorkerPool::Inner::PostTask", |
| + TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
| + TRACE_EVENT_FLAG_FLOW_OUT); |
| - // Now that we have the lock, apply the named token rules. |
| - if (optional_token_name) |
| - sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| + sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
| - pending_tasks_.insert(sequenced); |
| - if (shutdown_behavior == BLOCK_SHUTDOWN) |
| - blocking_shutdown_pending_task_count_++; |
| + // Now that we have the lock, apply the named token rules. |
| + if (optional_token_name) |
| + sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| - create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + PostTaskToTaskScheduler(sequenced); |
| + } else { |
| + PostTaskToPool(sequenced); |
| } |
| - // Actually start the additional thread or signal an existing one now that |
| - // we're outside the lock. |
| - if (create_thread_id) |
| - FinishStartingAdditionalThread(create_thread_id); |
| + // Some variables are exposed in both modes for convenience but only really |
| + // intended for one of them at runtime, confirm exclusive usage here. |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) |
| + DCHECK(pending_tasks_.empty()); |
| else |
| - SignalHasWork(); |
| + DCHECK(sequenced_task_runner_map_.empty()); |
| return true; |
| } |
| +void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| + const SequencedTask& sequenced) { |
| + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| + lock_.AssertAcquired(); |
| + |
| + // Confirm that the TaskScheduler's shutdown behaviors use the same |
| + // underlying values as SequencedWorkerPool. |
| + static_assert( |
| + static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
| + static_cast<int>(CONTINUE_ON_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "CONTINUE_ON_SHUTDOWN."); |
| + static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == |
| + static_cast<int>(SKIP_ON_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "SKIP_ON_SHUTDOWN."); |
| + static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == |
| + static_cast<int>(BLOCK_SHUTDOWN), |
| + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| + "BLOCK_SHUTDOWN."); |
| + |
| + const TaskShutdownBehavior task_shutdown_behavior = |
| + static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); |
| + const TaskTraits pool_traits = |
| + TaskTraits() |
| + .WithFileIO() |
| + .WithPriority(task_priority_) |
| + .WithShutdownBehavior(task_shutdown_behavior); |
| + |
| + // Find or create the TaskScheduler TaskRunner to redirect this task to if |
| + // it is posted to a specific sequence. |
| + scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
| + if (sequenced.sequence_token_id) { |
| + sequenced_task_runner = |
| + &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
| + if (!*sequenced_task_runner) { |
| + const ExecutionMode execution_mode = |
| + max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
| + : ExecutionMode::SEQUENCED; |
| + *sequenced_task_runner = |
| + CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
| + } |
| + } |
| + |
| + if (sequenced_task_runner) { |
| + (*sequenced_task_runner) |
| + ->PostTask(sequenced.posted_from, sequenced.task); |
| + } else { |
| + // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
| + // however a few pools that use only one thread and therefore can |
|
danakj
2016/08/15 21:02:52
nit: "few pools" extra space
gab
2016/08/15 21:32:44
Done.
|
| + // currently legitimatelly assuming thread affinity despite using |
|
danakj
2016/08/15 21:02:52
"legitimately assume"
gab
2016/08/15 21:32:44
Done.
|
| + // SequencedWorkerPool. Such pools typically only give access to their |
| + // TaskRunner which will be SINGLE_THREADED per nature of the pool |
| + // having only one thread but this DCHECK ensures no such pools use |
| + // SequencedWorkerPool::PostTask() directly. |
| + DCHECK_GT(max_threads_, 1U); |
| + base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
| + sequenced.task); |
| + } |
| +} |
| + |
| +void SequencedWorkerPool::Inner::PostTaskToPool( |
| + const SequencedTask& sequenced) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| + lock_.AssertAcquired(); |
| + |
| + pending_tasks_.insert(sequenced); |
| + |
| + if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
| + blocking_shutdown_pending_task_count_++; |
| + |
| + int create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| + |
| + { |
| + AutoUnlock unlock(lock_); |
|
brettw
2016/08/15 21:02:21
Sorry to add another pass here. I'm really relucta
danakj
2016/08/15 21:02:52
hm, this will relock the lock when you leave the b
gab
2016/08/15 21:32:44
Re-inlined PostTaskToPool() -- as without this sec
gab
2016/08/15 21:32:44
N/A after re-inlining PostTaskToPool per Brett's c
|
| + |
| + // Actually start the additional thread or signal an existing one outside |
| + // the lock. |
| + if (create_thread_id) |
| + FinishStartingAdditionalThread(create_thread_id); |
| + else |
| + SignalHasWork(); |
| + } |
| +} |
| + |
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| - AutoLock lock(lock_); |
| - return ContainsKey(threads_, PlatformThread::CurrentId()); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (!runs_tasks_on_verifier_) { |
| + runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| + TaskTraits().WithFileIO().WithPriority(task_priority_), |
| + ExecutionMode::PARALLEL); |
| + } |
| + return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
| + } else { |
| + AutoLock lock(lock_); |
| + return ContainsKey(threads_, PlatformThread::CurrentId()); |
| + } |
| } |
| bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| SequenceToken sequence_token) const { |
| - AutoLock lock(lock_); |
| - ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| - if (found == threads_.end()) |
| - return false; |
| - return found->second->is_processing_task() && |
| - sequence_token.Equals(found->second->task_sequence_token()); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // TODO(gab): This currently only verifies that the current thread is a |
| + // thread on which a task bound to |sequence_token| *could* run, but it |
| + // doesn't verify that the current is *currently running* a task bound to |
| + // |sequence_token|. |
| + const auto sequenced_task_runner_it = |
| + sequenced_task_runner_map_.find(sequence_token.id_); |
| + return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
| + sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
| + } else { |
| + AutoLock lock(lock_); |
| + ThreadMap::const_iterator found = |
| + threads_.find(PlatformThread::CurrentId()); |
| + if (found == threads_.end()) |
| + return false; |
| + return found->second->is_processing_task() && |
| + sequence_token.Equals(found->second->task_sequence_token()); |
| + } |
| } |
| // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| @@ -732,6 +875,10 @@ void SequencedWorkerPool::Inner::Shutdown( |
| if (shutdown_called_) |
| return; |
| shutdown_called_ = true; |
| + |
| + if (g_all_pools_state != AllPoolsState::WORKER_CREATED) |
| + return; |
| + |
| max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
| // Tickle the threads. This will wake up a waiting one so it will know that |
| @@ -771,6 +918,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
| } |
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
|
danakj
2016/08/15 21:02:52
Why are some of these != REDIRECTED instead of ==
gab
2016/08/15 21:32:44
We never explicitly set POOL (a.k.a. WORKER_CREATE
danakj
2016/08/15 22:05:49
Hm ok. I would personally prefer == checks wheneve
gab
2016/08/20 15:21:31
Unless the first Worker is created while only the
|
| { |
| AutoLock lock(lock_); |
| DCHECK(thread_being_created_); |
| @@ -905,6 +1053,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| } |
| void SequencedWorkerPool::Inner::HandleCleanup() { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| if (cleanup_state_ == CLEANUP_DONE) |
| return; |
| @@ -970,6 +1120,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| SequencedTask* task, |
| TimeDelta* wait_time, |
| std::vector<Closure>* delete_these_outside_lock) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // Find the next task with a sequence token that's not currently in use. |
| @@ -1056,6 +1208,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| } |
| int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // Mark the task's sequence number as in use. |
| @@ -1087,6 +1241,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| } |
| void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
| @@ -1100,6 +1256,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| int sequence_token_id) const { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| return !sequence_token_id || |
| current_sequences_.find(sequence_token_id) == |
| @@ -1107,6 +1265,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| } |
| int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| lock_.AssertAcquired(); |
| // How thread creation works: |
| // |
| @@ -1157,15 +1317,24 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| int thread_number) { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| // Called outside of the lock. |
| DCHECK_GT(thread_number, 0); |
| + if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { |
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
| + g_all_pools_state = AllPoolsState::WORKER_CREATED; |
| + } |
| + |
| // The worker is assigned to the list when the thread actually starts, which |
| // will manage the memory of the pointer. |
| new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| } |
| void SequencedWorkerPool::Inner::SignalHasWork() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| + |
| has_work_cv_.Signal(); |
| if (testing_observer_) { |
| testing_observer_->OnHasWork(); |
| @@ -1173,6 +1342,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
| } |
| bool SequencedWorkerPool::Inner::CanShutdown() const { |
| + DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
| lock_.AssertAcquired(); |
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| return !thread_being_created_ && |
| @@ -1209,6 +1379,18 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| return worker->worker_pool(); |
| } |
| +// static |
| +void SequencedWorkerPool:: |
| + RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
| + DCHECK(TaskScheduler::GetInstance()); |
| + // Hitting this DCHECK indicates that a task was posted to a |
| + // SequencedWorkerPool before the TaskScheduler was initialized and |
| + // redirected, posting task to SequencedWorkerPools needs to at least be |
| + // delayed until after that point. |
| + DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
| + g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
| +} |
| + |
| SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| const std::string& thread_name_prefix, |
| base::TaskPriority task_priority) |