Chromium Code Reviews| Index: base/task_scheduler/thread_pool.cc | 
| diff --git a/base/task_scheduler/thread_pool.cc b/base/task_scheduler/thread_pool.cc | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..953613e122c2e5cef764a3ee1a05e63f0ee44eba | 
| --- /dev/null | 
| +++ b/base/task_scheduler/thread_pool.cc | 
| @@ -0,0 +1,320 @@ | 
| +// Copyright 2016 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +#include "base/task_scheduler/thread_pool.h" | 
| + | 
| +#include <utility> | 
| + | 
| +#include "base/bind.h" | 
| +#include "base/logging.h" | 
| +#include "base/task_scheduler/utils.h" | 
| + | 
| +namespace base { | 
| +namespace task_scheduler { | 
| + | 
| +namespace { | 
| + | 
| +// A task runner that runs tasks with the PARALLEL strategy. | 
| +class SchedulerParallelTaskRunner : public TaskRunner { | 
| + public: | 
| + // Tasks posted through this task runner have |traits| and are inserted in | 
| + // |shared_priority_queue|. |delayed_task_manager| is used to post delayed | 
| + // tasks. |shutdown_manager| is notified when a task is posted. | 
| + SchedulerParallelTaskRunner(const TaskTraits& traits, | 
| + PriorityQueue* priority_queue, | 
| + DelayedTaskManager* delayed_task_manager, | 
| + ShutdownManager* shutdown_manager); | 
| + | 
| + // TaskRunner: | 
| + bool PostDelayedTask(const tracked_objects::Location& from_here, | 
| + const Closure& closure, | 
| + TimeDelta delay) override; | 
| + bool RunsTasksOnCurrentThread() const override; | 
| + | 
| + private: | 
| + ~SchedulerParallelTaskRunner() override; | 
| + | 
| + TaskTraits traits_; | 
| + PriorityQueue* priority_queue_; | 
| + DelayedTaskManager* delayed_task_manager_; | 
| + ShutdownManager* shutdown_manager_; | 
| + | 
| + DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 
| +}; | 
| + | 
| +SchedulerParallelTaskRunner::SchedulerParallelTaskRunner( | 
| + const TaskTraits& traits, | 
| + PriorityQueue* priority_queue, | 
| + DelayedTaskManager* delayed_task_manager, | 
| + ShutdownManager* shutdown_manager) | 
| + : traits_(traits), | 
| + priority_queue_(priority_queue), | 
| + delayed_task_manager_(delayed_task_manager), | 
| + shutdown_manager_(shutdown_manager) {} | 
| + | 
| +bool SchedulerParallelTaskRunner::PostDelayedTask( | 
| + const tracked_objects::Location& from_here, | 
| + const Closure& closure, | 
| + TimeDelta delay) { | 
| + Task task(from_here, closure, traits_, TimeTicks::Now()); | 
| + if (!delay.is_zero()) | 
| + task.delayed_run_time = task.post_time + delay; | 
| + PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_, | 
| + shutdown_manager_, delayed_task_manager_); | 
| + return true; | 
| +} | 
| + | 
| +bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const { | 
| + // TODO(fdoray): Return true only if tasks posted may actually run on the | 
| + // current thread. It is valid, but not ideal, to always return true. | 
| + return true; | 
| +} | 
| + | 
| +SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default; | 
| + | 
| +// A task runner that runs tasks in with the SEQUENCED strategy. | 
| +class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 
| + public: | 
| + // Tasks posted through this task runner have |traits| and are inserted in | 
| + // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|. | 
| + // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is | 
| + // notified when a task is posted. | 
| + SchedulerSequencedTaskRunner(const TaskTraits& traits, | 
| + scoped_refptr<Sequence> sequence, | 
| + PriorityQueue* priority_queue, | 
| + DelayedTaskManager* delayed_task_manager, | 
| + ShutdownManager* shutdown_manager); | 
| + | 
| + // SequencedTaskRunner: | 
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 
| + const Closure& task, | 
| + TimeDelta delay) override; | 
| + bool PostDelayedTask(const tracked_objects::Location& from_here, | 
| + const Closure& closure, | 
| + TimeDelta delay) override; | 
| + bool RunsTasksOnCurrentThread() const override; | 
| + | 
| + private: | 
| + ~SchedulerSequencedTaskRunner() override; | 
| + | 
| + TaskTraits traits_; | 
| + scoped_refptr<Sequence> sequence_; | 
| + PriorityQueue* priority_queue_; | 
| + DelayedTaskManager* delayed_task_manager_; | 
| + ShutdownManager* shutdown_manager_; | 
| + | 
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 
| +}; | 
| + | 
| +SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner( | 
| + const TaskTraits& traits, | 
| + scoped_refptr<Sequence> sequence, | 
| + PriorityQueue* priority_queue, | 
| + DelayedTaskManager* delayed_task_manager, | 
| + ShutdownManager* shutdown_manager) | 
| + : traits_(traits), | 
| + sequence_(sequence), | 
| + priority_queue_(priority_queue), | 
| + delayed_task_manager_(delayed_task_manager), | 
| + shutdown_manager_(shutdown_manager) {} | 
| + | 
| +bool SchedulerSequencedTaskRunner::PostDelayedTask( | 
| + const tracked_objects::Location& from_here, | 
| + const Closure& closure, | 
| + TimeDelta delay) { | 
| + Task task(from_here, closure, traits_, TimeTicks::Now()); | 
| + if (!delay.is_zero()) | 
| + task.delayed_run_time = task.post_time + delay; | 
| + PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_, | 
| + delayed_task_manager_); | 
| + return true; | 
| +} | 
| + | 
| +bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const { | 
| + // TODO(fdoray): Return true only if tasks posted may actually run on the | 
| + // current thread. It is valid, but not ideal, to always return true. | 
| + return true; | 
| +} | 
| + | 
| +bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask( | 
| + const tracked_objects::Location& from_here, | 
| + const Closure& task, | 
| + TimeDelta delay) { | 
| + return PostDelayedTask(from_here, task, delay); | 
| +} | 
| + | 
| +SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default; | 
| + | 
| +} // namespace | 
| + | 
| + | 
| +ThreadPool::~ThreadPool() = default; | 
| + | 
| +scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool( | 
| + ThreadPriority thread_priority, | 
| + size_t num_threads, | 
| + const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, | 
| + ShutdownManager* shutdown_manager) { | 
| + scoped_ptr<ThreadPool> thread_pool( | 
| + new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback, | 
| + shutdown_manager)); | 
| + return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool) | 
| + : scoped_ptr<ThreadPool>(); | 
| +} | 
| + | 
| +size_t ThreadPool::GetNumThreads() const { | 
| + return worker_threads_.size(); | 
| +} | 
| + | 
| +scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits( | 
| + const TaskTraits& traits, | 
| + ExecutionMode execution_mode) { | 
| + switch (execution_mode) { | 
| + case ExecutionMode::PARALLEL: { | 
| + return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner( | 
| + traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_)); | 
| + } | 
| + | 
| + case ExecutionMode::SEQUENCED: { | 
| + // TODO(fdoray): Support TaskTraits().WithSequenceToken(). | 
| + return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner( | 
| + traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_, | 
| + &delayed_task_manager_, shutdown_manager_)); | 
| + } | 
| + | 
| +#if defined(OS_WIN) | 
| + case ExecutionMode::SINGLE_THREADED_COM_STA: | 
| +#endif | 
| + case ExecutionMode::SINGLE_THREADED: { | 
| + DCHECK(!worker_threads_.empty()); | 
| + // TODO(fdoray): Better thread assignment. | 
| + return scoped_refptr<TaskRunner>( | 
| + worker_threads_.front() | 
| + ->CreateTaskRunnerWithTraits(traits, execution_mode) | 
| + .get()); | 
| + } | 
| + | 
| + default: { | 
| + NOTREACHED(); | 
| + return scoped_refptr<TaskRunner>(); | 
| + } | 
| + } | 
| +} | 
| + | 
| +void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence, | 
| + const SequenceSortKey& sequence_sort_key, | 
| + const WorkerThread* worker_thread) { | 
| + DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get()); | 
| + | 
| + // Set a flag to avoid waking up a thread when reinserting |sequence| in | 
| + // |priority_queue_| if the thread doing the reinsertion: | 
| + // - Can run tasks from |priority_queue_|, and, | 
| + // - Doesn't have pending single-threaded tasks. | 
| + // If these conditions are met, the thread doing the reinsertion will soon | 
| + // pop a sequence from |priority_queue_|. There is no need to wake up a new | 
| + // thread to do it. | 
| + if (worker_thread->shared_priority_queue() == &priority_queue_ && | 
| + !worker_thread->HasSingleThreadedTasks()) { | 
| + disable_wake_up_thread_on_sequence_insertion_.Set(true); | 
| + } | 
| + | 
| + // Insert the sequence in the priority queue. | 
| + priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key); | 
| + | 
| + disable_wake_up_thread_on_sequence_insertion_.Set(false); | 
| +} | 
| + | 
| +void ThreadPool::JoinAllThreadsForTesting() { | 
| + for (const auto& worker_thread : worker_threads_) { | 
| + worker_thread->WakeUp(); | 
| + worker_thread->JoinForTesting(); | 
| + } | 
| +} | 
| + | 
| +ThreadPool::ThreadPool( | 
| + ThreadPriority thread_priority, | 
| + size_t num_threads, | 
| + const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, | 
| + ShutdownManager* shutdown_manager) | 
| + : priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue, | 
| + Unretained(this))), | 
| + shutdown_manager_(shutdown_manager), | 
| + delayed_task_manager_( | 
| + Bind(&ThreadPool::WakeUpOneThread, Unretained(this)), | 
| + shutdown_manager_) { | 
| + DCHECK_GT(num_threads, 0u); | 
| + DCHECK(shutdown_manager); | 
| + | 
| + const WorkerThread::BecomesIdleCallback becomes_idle_callback = | 
| + Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this)); | 
| + worker_threads_.reserve(num_threads); | 
| + | 
| + for (size_t i = 0; i < num_threads; ++i) { | 
| + scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread( | 
| + thread_priority, &priority_queue_, reinsert_sequence_callback, | 
| + becomes_idle_callback, &delayed_task_manager_, shutdown_manager_); | 
| + if (worker_thread.get() != nullptr) | 
| + worker_threads_.push_back(std::move(worker_thread)); | 
| + } | 
| +} | 
| + | 
| +void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) { | 
| 
 
fdoray
2016/02/11 17:30:33
DCHECK that |worker_thread| belongs to this thread
 
fdoray
2016/02/12 04:16:20
Done.
 
 | 
| + AutoSchedulerLock auto_lock_(idle_worker_threads_lock_); | 
| + | 
| + if (idle_worker_threads_set_.find(worker_thread) != | 
| + idle_worker_threads_set_.end()) { | 
| + // The worker thread is already on the stack of idle threads. | 
| + return; | 
| + } | 
| + | 
| + // Add the worker thread to the stack of idle threads. | 
| + idle_worker_threads_stack_.push(worker_thread); | 
| + idle_worker_threads_set_.insert(worker_thread); | 
| +} | 
| + | 
| +void ThreadPool::WakeUpOneThread() { | 
| + // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't | 
| + // have pending or running single-threaded tasks. | 
| + AutoSchedulerLock auto_lock(idle_worker_threads_lock_); | 
| + while (!idle_worker_threads_stack_.empty()) { | 
| + WorkerThread* worker_thread = idle_worker_threads_stack_.top(); | 
| + | 
| + idle_worker_threads_stack_.pop(); | 
| + idle_worker_threads_set_.erase(worker_thread); | 
| + | 
| + // HasSingleThreadedTasks() can return stale results. However, when it | 
| + // returns true below, it is guaranteed that |worker_thread| is either awake | 
| + // or about to be woken up and that it will not enter | 
| + // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This | 
| + // is important because if all threads in |idle_worker_threads_stack_| | 
| + // report that they have single-threaded tasks, no thread is woken up by | 
| + // this method. If these threads don't check |priority_queue_| before | 
| + // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could | 
| + // end up never being done. The guarantee works between the moment | 
| + // HasSingleThreadedTasks() goes from true to false and the moment | 
| + // |worker_thread| enters WaitUntilWorkIsAvailable(), | 
| + // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool. | 
| + // Both WorkerThreadBecomesIdleCallback() and the current method acquire | 
| + // |idle_worker_threads_lock_|, which synchronizes the value returned by | 
| + // HasSingleThreadedTasks(). | 
| + // | 
| + // TODO(fdoray): A single-threaded task can be posted to |worker_thread| | 
| + // immediately after HasSingleThreadedTasks() has returned false. Ideally, | 
| + // when this happens, another worker thread should be woken up. | 
| + if (!worker_thread->HasSingleThreadedTasks()) { | 
| 
 
robliao
2016/02/11 22:30:07
Maybe a controller pattern might make reasoning ov
 
fdoray
2016/02/12 04:16:20
I agree. I'll try to write this (tomorrow in the p
 
 | 
| + worker_thread->WakeUp(); | 
| + break; | 
| + } | 
| + } | 
| +} | 
| + | 
| +void ThreadPool::OnSequenceInsertedInPriorityQueue() { | 
| + if (disable_wake_up_thread_on_sequence_insertion_.Get()) | 
| + return; | 
| + | 
| + WakeUpOneThread(); | 
| +} | 
| + | 
| +} // namespace task_scheduler | 
| +} // namespace base |