Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
| index 13323d776339183a68aa313fd67edbf28f67c189..5d6efe167d43bfbe647bb60c74edb13ffb625eeb 100644 |
| --- a/base/threading/sequenced_worker_pool.cc |
| +++ b/base/threading/sequenced_worker_pool.cc |
| @@ -10,6 +10,7 @@ |
| #include <map> |
| #include <memory> |
| #include <set> |
| +#include <unordered_map> |
| #include <utility> |
| #include <vector> |
| @@ -52,12 +53,15 @@ namespace base { |
| namespace { |
| -// An enum representing the state of all pools. Any given process should only |
| -// ever transition from NONE_ACTIVE to the active states, transitions between |
| -// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
| -// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
| -// and the WORKER_CREATED transition occurs when a Worker needs to be created |
| -// because the first task was posted and the state is still NONE_ACTIVE. |
| +// An enum representing the state of all pools. Any given non-test process |
| +// should only ever transition from NONE_ACTIVE to the active states, |
| +// transitions between actives states are unexpected. The |
| +// REDIRECTED_TO_TASK_SCHEDULER transition occurs when |
| +// RedirectToTaskSchedulerForProcess() is called and the WORKER_CREATED |
| +// transition occurs when a Worker needs to be created because the first task |
| +// was posted and the state is still NONE_ACTIVE. In a test process, |
| +// ResetRedirectToTaskSchedulerForProcessForTesting() causes a transition to |
| +// the NONE_ACTIVE state. |
| // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
| // will be phased out completely otherwise). |
| enum class AllPoolsState { |
| @@ -370,9 +374,17 @@ class SequencedWorkerPool::Inner { |
| CLEANUP_DONE, |
| }; |
| + struct TaskRunnerAndShutdownBehavior { |
| + scoped_refptr<TaskRunner> task_runner; |
| + TaskShutdownBehavior shutdown_behavior; |
| + }; |
| + |
| + bool RunsTasksOnCurrentThreadAssertSynchronized() const; |
| + |
| // Helper used by PostTask() to complete the work when redirection is on. |
| // Coalesce upon resolution of http://crbug.com/622400. |
| - void PostTaskToTaskScheduler(const SequencedTask& sequenced); |
| + bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
| + const TimeDelta& delay); |
| // Called from within the lock, this converts the given token name into a |
| // token ID, creating a new one if necessary. |
| @@ -531,8 +543,14 @@ class SequencedWorkerPool::Inner { |
| const base::TaskPriority task_priority_; |
| // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
| - // SequencedWorkerPool usage to the TaskScheduler. |
| - std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
| + // sequenced tasks to the TaskScheduler. |
| + std::unordered_map<int, TaskRunnerAndShutdownBehavior> |
| + sequenced_task_runner_map_; |
| + |
| + // TaskScheduler TaskRunners to redirect unsequenced tasks to the |
| + // TaskScheduler. |
| + std::unordered_map<TaskShutdownBehavior, scoped_refptr<TaskRunner>> |
| + unsequenced_task_runners_; |
| // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
| // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
| @@ -667,11 +685,14 @@ bool SequencedWorkerPool::Inner::PostTask( |
| base::MakeCriticalClosure(task) : task; |
| sequenced.time_to_run = TimeTicks::Now() + delay; |
| + bool task_successfully_posted = true; |
| + |
| int create_thread_id = 0; |
| { |
| AutoLock lock(lock_); |
| - if (shutdown_called_) { |
| + if (shutdown_called_ && |
| + g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| // Don't allow a new task to be posted if it doesn't block shutdown. |
| if (shutdown_behavior != BLOCK_SHUTDOWN) |
| return false; |
| @@ -707,7 +728,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
| sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| - PostTaskToTaskScheduler(sequenced); |
| + task_successfully_posted = PostTaskToTaskScheduler(sequenced, delay); |
| } else { |
| pending_tasks_.insert(sequenced); |
| @@ -741,11 +762,28 @@ bool SequencedWorkerPool::Inner::PostTask( |
| } |
| #endif // DCHECK_IS_ON() |
| - return true; |
| + return task_successfully_posted; |
| +} |
| + |
| +bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThreadAssertSynchronized() |
| + const { |
| + lock_.AssertAcquired(); |
| + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + // TODO(fdoray): Add a special case for single-threaded pools. |
| + if (!runs_tasks_on_verifier_) { |
| + runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| + TaskTraits().WithFileIO().WithPriority(task_priority_), |
| + ExecutionMode::PARALLEL); |
| + } |
| + return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
| + } else { |
|
robliao
2016/08/29 23:28:50
Nit: Remove else
fdoray
2016/08/30 17:20:49
Done.
|
| + return ContainsKey(threads_, PlatformThread::CurrentId()); |
| + } |
| } |
| -void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| - const SequencedTask& sequenced) { |
| +bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| + const SequencedTask& sequenced, |
| + const TimeDelta& delay) { |
| DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| lock_.AssertAcquired(); |
| @@ -774,56 +812,64 @@ void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| .WithPriority(task_priority_) |
| .WithShutdownBehavior(task_shutdown_behavior); |
| - // Find or create the TaskScheduler TaskRunner to redirect this task to if |
| - // it is posted to a specific sequence. |
| - scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
| + // Find or create the TaskScheduler TaskRunner to redirect this task to. |
| + scoped_refptr<TaskRunner> task_runner; |
| if (sequenced.sequence_token_id) { |
| - sequenced_task_runner = |
| - &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
| - if (!*sequenced_task_runner) { |
| + TaskRunnerAndShutdownBehavior& task_runner_and_shutdown_behavior = |
| + sequenced_task_runner_map_[sequenced.sequence_token_id]; |
| + if (!task_runner_and_shutdown_behavior.task_runner) { |
| const ExecutionMode execution_mode = |
| max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
| : ExecutionMode::SEQUENCED; |
| - *sequenced_task_runner = |
| + task_runner_and_shutdown_behavior.task_runner = |
| CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
| + task_runner_and_shutdown_behavior.shutdown_behavior = |
| + task_shutdown_behavior; |
| } |
| - } |
| - if (sequenced_task_runner) { |
| - (*sequenced_task_runner) |
| - ->PostTask(sequenced.posted_from, sequenced.task); |
| + // Posting tasks to the same sequence with different shutdown behaviors |
| + // isn't supported by the TaskScheduler. |
| + DCHECK_EQ(task_shutdown_behavior, |
| + task_runner_and_shutdown_behavior.shutdown_behavior); |
| + |
| + task_runner = task_runner_and_shutdown_behavior.task_runner; |
| } else { |
| - // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
| - // however a few pools that use only one thread and therefore can currently |
| - // legitimatelly assume thread affinity despite using SequencedWorkerPool. |
| - // Such pools typically only give access to their TaskRunner which will be |
| - // SINGLE_THREADED per nature of the pool having only one thread but this |
| - // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() |
| - // directly. |
| - DCHECK_GT(max_threads_, 1U); |
| - base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
| - sequenced.task); |
| + scoped_refptr<TaskRunner>& task_runner_for_shutdown_behavior_ref = |
| + unsequenced_task_runners_[task_shutdown_behavior]; |
| + if (!task_runner_for_shutdown_behavior_ref) { |
| + const ExecutionMode execution_mode = max_threads_ == 1U |
| + ? ExecutionMode::SINGLE_THREADED |
| + : ExecutionMode::PARALLEL; |
| + task_runner_for_shutdown_behavior_ref = |
| + CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
| + } |
| + |
| + task_runner = task_runner_for_shutdown_behavior_ref; |
| } |
| + |
| + // Single-threaded pools can legitimatelly assume thread affinity. Disallow |
|
robliao
2016/08/29 23:28:50
Nit: s/legitimatelly/legitimately/
fdoray
2016/08/30 17:20:49
Done.
|
| + // posting tasks with different sequence tokens or shutdown behaviors in such |
| + // pools since the TaskScheduler can't force them to run on the same thread. |
| + DCHECK(max_threads_ > 1 || |
| + (unsequenced_task_runners_.size() + |
|
robliao
2016/08/29 23:28:50
A caller could plausibly call GetTaskRunnerWithShu
fdoray
2016/08/30 17:20:49
No, we shouldn't disallow that. Done.
|
| + sequenced_task_runner_map_.size()) == 1); |
| + |
| + return task_runner->PostDelayedTask(sequenced.posted_from, sequenced.task, |
| + delay); |
| } |
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| AutoLock lock(lock_); |
| - if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| - if (!runs_tasks_on_verifier_) { |
| - runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| - TaskTraits().WithFileIO().WithPriority(task_priority_), |
| - ExecutionMode::PARALLEL); |
| - } |
| - return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
| - } else { |
| - return ContainsKey(threads_, PlatformThread::CurrentId()); |
| - } |
| + return RunsTasksOnCurrentThreadAssertSynchronized(); |
| } |
| bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| SequenceToken sequence_token) const { |
| AutoLock lock(lock_); |
| if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| + if (!sequence_token.IsValid()) |
| + return RunsTasksOnCurrentThreadAssertSynchronized(); |
| + |
| // TODO(gab): This currently only verifies that the current thread is a |
| // thread on which a task bound to |sequence_token| *could* run, but it |
| // doesn't verify that the current is *currently running* a task bound to |
| @@ -831,7 +877,8 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| const auto sequenced_task_runner_it = |
| sequenced_task_runner_map_.find(sequence_token.id_); |
| return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
| - sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
| + sequenced_task_runner_it->second.task_runner |
| + ->RunsTasksOnCurrentThread(); |
| } else { |
| ThreadMap::const_iterator found = |
| threads_.find(PlatformThread::CurrentId()); |
| @@ -1378,8 +1425,7 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| } |
| // static |
| -void SequencedWorkerPool:: |
| - RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
| +void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { |
| DCHECK(TaskScheduler::GetInstance()); |
| // Hitting this DCHECK indicates that a task was posted to a |
| // SequencedWorkerPool before the TaskScheduler was initialized and |
| @@ -1389,6 +1435,11 @@ void SequencedWorkerPool:: |
| g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
| } |
| +// static |
| +void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { |
| + g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
| +} |
| + |
| SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| const std::string& thread_name_prefix, |
| base::TaskPriority task_priority) |
| @@ -1528,6 +1579,7 @@ bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| } |
| void SequencedWorkerPool::FlushForTesting() { |
| + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| inner_->CleanupForTesting(); |
| } |