| Index: base/task_scheduler/thread_pool.cc
|
| diff --git a/base/task_scheduler/thread_pool.cc b/base/task_scheduler/thread_pool.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..12df23d93ed909597dd47facd944c0058db43837
|
| --- /dev/null
|
| +++ b/base/task_scheduler/thread_pool.cc
|
| @@ -0,0 +1,345 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "base/task_scheduler/thread_pool.h"
|
| +
|
| +#include <utility>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/logging.h"
|
| +#include "base/task_scheduler/utils.h"
|
| +
|
| +namespace base {
|
| +namespace internal {
|
| +
|
| +namespace {
|
| +
|
| +bool WorkerThreadIsInVector(
|
| + const WorkerThread* worker_thread,
|
| + const std::vector<scoped_ptr<WorkerThread>>& worker_threads) {
|
| + for (const auto& current_worker_thread : worker_threads) {
|
| + if (current_worker_thread.get() == worker_thread)
|
| + return true;
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +// A task runner that runs tasks with the PARALLEL strategy.
|
| +class SchedulerParallelTaskRunner : public TaskRunner {
|
| + public:
|
| + // Tasks posted through this task runner have |traits| and are inserted in
|
| + // |shared_priority_queue|. |delayed_task_manager| is used to post delayed
|
| + // tasks. |shutdown_manager| is notified when a task is posted.
|
| + SchedulerParallelTaskRunner(const TaskTraits& traits,
|
| + PriorityQueue* priority_queue,
|
| + DelayedTaskManager* delayed_task_manager,
|
| + ShutdownManager* shutdown_manager);
|
| +
|
| + // TaskRunner:
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) override;
|
| + bool RunsTasksOnCurrentThread() const override;
|
| +
|
| + private:
|
| + ~SchedulerParallelTaskRunner() override;
|
| +
|
| + TaskTraits traits_;
|
| + PriorityQueue* priority_queue_;
|
| + DelayedTaskManager* delayed_task_manager_;
|
| + ShutdownManager* shutdown_manager_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
|
| +};
|
| +
|
| +SchedulerParallelTaskRunner::SchedulerParallelTaskRunner(
|
| + const TaskTraits& traits,
|
| + PriorityQueue* priority_queue,
|
| + DelayedTaskManager* delayed_task_manager,
|
| + ShutdownManager* shutdown_manager)
|
| + : traits_(traits),
|
| + priority_queue_(priority_queue),
|
| + delayed_task_manager_(delayed_task_manager),
|
| + shutdown_manager_(shutdown_manager) {}
|
| +
|
| +bool SchedulerParallelTaskRunner::PostDelayedTask(
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) {
|
| + Task task(from_here, closure, traits_, TimeTicks::Now());
|
| + if (!delay.is_zero())
|
| + task.delayed_run_time = task.post_time + delay;
|
| + PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_,
|
| + shutdown_manager_, delayed_task_manager_);
|
| + return true;
|
| +}
|
| +
|
| +bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const {
|
| + // TODO(fdoray): Return true only if tasks posted may actually run on the
|
| + // current thread. It is valid, but not ideal, to always return true.
|
| + return true;
|
| +}
|
| +
|
| +SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default;
|
| +
|
| +// A task runner that runs tasks in with the SEQUENCED strategy.
|
| +class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| + public:
|
| + // Tasks posted through this task runner have |traits| and are inserted in
|
| + // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|.
|
| + // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is
|
| + // notified when a task is posted.
|
| + SchedulerSequencedTaskRunner(const TaskTraits& traits,
|
| + scoped_refptr<Sequence> sequence,
|
| + PriorityQueue* priority_queue,
|
| + DelayedTaskManager* delayed_task_manager,
|
| + ShutdownManager* shutdown_manager);
|
| +
|
| + // SequencedTaskRunner:
|
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + TimeDelta delay) override;
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) override;
|
| + bool RunsTasksOnCurrentThread() const override;
|
| +
|
| + private:
|
| + ~SchedulerSequencedTaskRunner() override;
|
| +
|
| + TaskTraits traits_;
|
| + scoped_refptr<Sequence> sequence_;
|
| + PriorityQueue* priority_queue_;
|
| + DelayedTaskManager* delayed_task_manager_;
|
| + ShutdownManager* shutdown_manager_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
|
| +};
|
| +
|
| +SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner(
|
| + const TaskTraits& traits,
|
| + scoped_refptr<Sequence> sequence,
|
| + PriorityQueue* priority_queue,
|
| + DelayedTaskManager* delayed_task_manager,
|
| + ShutdownManager* shutdown_manager)
|
| + : traits_(traits),
|
| + sequence_(sequence),
|
| + priority_queue_(priority_queue),
|
| + delayed_task_manager_(delayed_task_manager),
|
| + shutdown_manager_(shutdown_manager) {}
|
| +
|
| +bool SchedulerSequencedTaskRunner::PostDelayedTask(
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) {
|
| + Task task(from_here, closure, traits_, TimeTicks::Now());
|
| + if (!delay.is_zero())
|
| + task.delayed_run_time = task.post_time + delay;
|
| + PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_,
|
| + delayed_task_manager_);
|
| + return true;
|
| +}
|
| +
|
| +bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const {
|
| + // TODO(fdoray): Return true only if tasks posted may actually run on the
|
| + // current thread. It is valid, but not ideal, to always return true.
|
| + return true;
|
| +}
|
| +
|
| +bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask(
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + TimeDelta delay) {
|
| + return PostDelayedTask(from_here, task, delay);
|
| +}
|
| +
|
| +SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default;
|
| +
|
| +} // namespace
|
| +
|
| +
|
| +ThreadPool::~ThreadPool() = default;
|
| +
|
| +scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool(
|
| + ThreadPriority thread_priority,
|
| + size_t num_threads,
|
| + const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
|
| + ShutdownManager* shutdown_manager) {
|
| + scoped_ptr<ThreadPool> thread_pool(
|
| + new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback,
|
| + shutdown_manager));
|
| + return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool)
|
| + : scoped_ptr<ThreadPool>();
|
| +}
|
| +
|
| +size_t ThreadPool::GetNumThreads() const {
|
| + return worker_threads_.size();
|
| +}
|
| +
|
| +scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits(
|
| + const TaskTraits& traits,
|
| + ExecutionMode execution_mode) {
|
| + switch (execution_mode) {
|
| + case ExecutionMode::PARALLEL: {
|
| + return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner(
|
| + traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_));
|
| + }
|
| +
|
| + case ExecutionMode::SEQUENCED: {
|
| + return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner(
|
| + traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_,
|
| + &delayed_task_manager_, shutdown_manager_));
|
| + }
|
| +
|
| + case ExecutionMode::SINGLE_THREADED: {
|
| + DCHECK(!worker_threads_.empty());
|
| + // TODO(fdoray): Better thread assignment.
|
| + return scoped_refptr<TaskRunner>(
|
| + worker_threads_.front()
|
| + ->CreateTaskRunnerWithTraits(traits, execution_mode)
|
| + .get());
|
| + }
|
| +
|
| +#if defined(OS_WIN)
|
| + case ExecutionMode::SINGLE_THREADED_COM_STA: {
|
| + // TODO(fdoray): Implement COM.
|
| + NOTIMPLEMENTED();
|
| + return scoped_refptr<TaskRunner>();
|
| + }
|
| +#endif
|
| +
|
| + default: {
|
| + NOTREACHED();
|
| + return scoped_refptr<TaskRunner>();
|
| + }
|
| + }
|
| +}
|
| +
|
| +void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence,
|
| + const SequenceSortKey& sequence_sort_key,
|
| + const WorkerThread* worker_thread) {
|
| + DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get());
|
| +
|
| + // Set a flag to avoid waking up a thread when reinserting |sequence| in
|
| + // |priority_queue_| if the thread doing the reinsertion:
|
| + // - Can run tasks from |priority_queue_|, and,
|
| + // - Doesn't have pending single-threaded tasks.
|
| + // If these conditions are met, the thread doing the reinsertion will soon
|
| + // pop a sequence from |priority_queue_|. There is no need to wake up a new
|
| + // thread to do it.
|
| + if (worker_thread->shared_priority_queue() == &priority_queue_ &&
|
| + !worker_thread->HasSingleThreadedTasks()) {
|
| + disable_wake_up_thread_on_sequence_insertion_.Set(true);
|
| + }
|
| +
|
| + // Insert the sequence in the priority queue.
|
| + priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key);
|
| +
|
| + disable_wake_up_thread_on_sequence_insertion_.Set(false);
|
| +}
|
| +
|
| +void ThreadPool::ShutdownAndJoinAllThreadsForTesting() {
|
| + for (const auto& worker_thread : worker_threads_)
|
| + worker_thread->ShutdownAndJoinForTesting();
|
| +}
|
| +
|
| +ThreadPool::ThreadPool(
|
| + ThreadPriority thread_priority,
|
| + size_t num_threads,
|
| + const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
|
| + ShutdownManager* shutdown_manager)
|
| + : thread_pool_ready_(true, false),
|
| + priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue,
|
| + Unretained(this))),
|
| + shutdown_manager_(shutdown_manager),
|
| + delayed_task_manager_(
|
| + Bind(&ThreadPool::WakeUpOneThread, Unretained(this)),
|
| + shutdown_manager_) {
|
| + DCHECK_GT(num_threads, 0u);
|
| + DCHECK(shutdown_manager);
|
| +
|
| + // The platform threads reference thread pool data structures and there's
|
| + // currently no way for us to create them in a suspended state. We'll use the
|
| + // main entry callback to have the threads wait and then signal them when
|
| + // we're ready.
|
| + const WorkerThread::MainEntryCallback main_entry_callback =
|
| + Bind(&WaitableEvent::Wait, Unretained(&thread_pool_ready_));
|
| +
|
| + const WorkerThread::BecomesIdleCallback becomes_idle_callback =
|
| + Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this));
|
| + worker_threads_.reserve(num_threads);
|
| +
|
| + for (size_t i = 0; i < num_threads; ++i) {
|
| + scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread(
|
| + thread_priority, &priority_queue_, main_entry_callback,
|
| + reinsert_sequence_callback, becomes_idle_callback,
|
| + &delayed_task_manager_, shutdown_manager_);
|
| + if (worker_thread.get() != nullptr)
|
| + worker_threads_.push_back(std::move(worker_thread));
|
| + }
|
| +
|
| + thread_pool_ready_.Signal();
|
| +}
|
| +
|
| +void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) {
|
| + DCHECK(WorkerThreadIsInVector(worker_thread, worker_threads_));
|
| +
|
| + AutoSchedulerLock auto_lock_(idle_worker_threads_lock_);
|
| +
|
| + if (idle_worker_threads_set_.find(worker_thread) !=
|
| + idle_worker_threads_set_.end()) {
|
| + // The worker thread is already on the stack of idle threads.
|
| + return;
|
| + }
|
| +
|
| + // Add the worker thread to the stack of idle threads.
|
| + idle_worker_threads_stack_.push(worker_thread);
|
| + idle_worker_threads_set_.insert(worker_thread);
|
| +}
|
| +
|
| +void ThreadPool::WakeUpOneThread() {
|
| + // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't
|
| + // have pending or running single-threaded tasks.
|
| + AutoSchedulerLock auto_lock(idle_worker_threads_lock_);
|
| + while (!idle_worker_threads_stack_.empty()) {
|
| + WorkerThread* worker_thread = idle_worker_threads_stack_.top();
|
| +
|
| + idle_worker_threads_stack_.pop();
|
| + idle_worker_threads_set_.erase(worker_thread);
|
| +
|
| + // HasSingleThreadedTasks() can return stale results. However, when it
|
| + // returns true below, it is guaranteed that |worker_thread| is either awake
|
| + // or about to be woken up and that it will not enter
|
| + // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This
|
| + // is important because if all threads in |idle_worker_threads_stack_|
|
| + // report that they have single-threaded tasks, no thread is woken up by
|
| + // this method. If these threads don't check |priority_queue_| before
|
| + // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could
|
| + // end up never being done. The guarantee works because between the moment
|
| + // HasSingleThreadedTasks() goes from true to false and the moment
|
| + // |worker_thread| enters WaitUntilWorkIsAvailable(),
|
| + // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool.
|
| + // Both WorkerThreadBecomesIdleCallback() and the current method acquire
|
| + // |idle_worker_threads_lock_|, which synchronizes the value returned by
|
| + // HasSingleThreadedTasks().
|
| + //
|
| + // TODO(fdoray): A single-threaded task can be posted to |worker_thread|
|
| + // immediately after HasSingleThreadedTasks() has returned false. Ideally,
|
| + // when this happens, another worker thread should be woken up.
|
| + if (!worker_thread->HasSingleThreadedTasks()) {
|
| + worker_thread->WakeUp();
|
| + break;
|
| + }
|
| + }
|
| +}
|
| +
|
| +void ThreadPool::OnSequenceInsertedInPriorityQueue() {
|
| + if (disable_wake_up_thread_on_sequence_insertion_.Get())
|
| + return;
|
| +
|
| + WakeUpOneThread();
|
| +}
|
| +
|
| +} // namespace internal
|
| +} // namespace base
|
|
|