| Index: base/task_scheduler/scheduler_thread_pool.cc
|
| diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..82198523f74bf07c10672f94895b68cb7694d924
|
| --- /dev/null
|
| +++ b/base/task_scheduler/scheduler_thread_pool.cc
|
| @@ -0,0 +1,269 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "base/task_scheduler/scheduler_thread_pool.h"
|
| +
|
| +#include <utility>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/bind_helpers.h"
|
| +#include "base/lazy_instance.h"
|
| +#include "base/logging.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/task_scheduler/utils.h"
|
| +#include "base/threading/thread_local.h"
|
| +
|
| +namespace base {
|
| +namespace internal {
|
| +
|
| +namespace {
|
| +
|
| +// Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads
|
| +// that don't belong to a SchedulerThreadPool.
|
| +LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky
|
| + tls_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER;
|
| +
|
| +// A task runner that runs tasks with the PARALLEL ExecutionMode.
|
| +class SchedulerParallelTaskRunner : public TaskRunner {
|
| + public:
|
| + SchedulerParallelTaskRunner(const TaskTraits& traits,
|
| + PriorityQueue* priority_queue,
|
| + TaskTracker* task_tracker)
|
| + : traits_(traits),
|
| + priority_queue_(priority_queue),
|
| + task_tracker_(task_tracker) {}
|
| +
|
| + // TaskRunner:
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) override {
|
| + // TODO(fdoray): Support delayed tasks.
|
| + DCHECK(delay.is_zero());
|
| +
|
| + // Post the task as part of a one-off single-task Sequence.
|
| + return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)),
|
| + make_scoped_refptr(new Sequence), priority_queue_,
|
| + task_tracker_);
|
| + }
|
| +
|
| + bool RunsTasksOnCurrentThread() const override {
|
| + return tls_current_shared_priority_queue.Get().Get() == priority_queue_;
|
| + }
|
| +
|
| + private:
|
| + ~SchedulerParallelTaskRunner() override = default;
|
| +
|
| + const TaskTraits traits_;
|
| + PriorityQueue* const priority_queue_;
|
| + TaskTracker* const task_tracker_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| +class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
|
| + : public SchedulerWorkerThread::Delegate {
|
| + public:
|
| + SchedulerWorkerThreadDelegateImpl(
|
| + SchedulerThreadPool* outer,
|
| + const EnqueueSequenceCallback enqueue_sequence_callback);
|
| + ~SchedulerWorkerThreadDelegateImpl() override;
|
| +
|
| + // SchedulerWorkerThread::Delegate:
|
| + void OnMainEntry() override;
|
| + scoped_refptr<Sequence> GetWork(
|
| + SchedulerWorkerThread* worker_thread) override;
|
| + void EnqueueSequence(scoped_refptr<Sequence> sequence) override;
|
| +
|
| + private:
|
| + SchedulerThreadPool* outer_;
|
| + const EnqueueSequenceCallback enqueue_sequence_callback_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
|
| +};
|
| +
|
| +std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
|
| + ThreadPriority thread_priority,
|
| + size_t max_threads,
|
| + const EnqueueSequenceCallback& enqueue_sequence_callback,
|
| + TaskTracker* task_tracker) {
|
| + std::unique_ptr<SchedulerThreadPool> thread_pool(
|
| + new SchedulerThreadPool(enqueue_sequence_callback, task_tracker));
|
| + if (thread_pool->Initialize(thread_priority, max_threads))
|
| + return thread_pool;
|
| + return nullptr;
|
| +}
|
| +
|
| +SchedulerThreadPool::~SchedulerThreadPool() {
|
| +#if DCHECK_IS_ON()
|
| + // SchedulerThreadPool should never be deleted in production unless its
|
| + // initialization failed.
|
| + AutoSchedulerLock auto_lock(worker_threads_lock_);
|
| + DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
|
| +#endif // DCHECK_IS_ON()
|
| +}
|
| +
|
| +scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
|
| + const TaskTraits& traits,
|
| + ExecutionMode execution_mode) {
|
| + switch (execution_mode) {
|
| + case ExecutionMode::PARALLEL:
|
| + return make_scoped_refptr(new SchedulerParallelTaskRunner(
|
| + traits, &shared_priority_queue_, task_tracker_));
|
| +
|
| + case ExecutionMode::SEQUENCED:
|
| + case ExecutionMode::SINGLE_THREADED:
|
| + // TODO(fdoray): Support SEQUENCED and SINGLE_THREADED TaskRunners.
|
| + NOTREACHED();
|
| + return nullptr;
|
| + }
|
| +
|
| + NOTREACHED();
|
| + return nullptr;
|
| +}
|
| +
|
| +void SchedulerThreadPool::EnqueueSequence(
|
| + scoped_refptr<Sequence> sequence,
|
| + const SequenceSortKey& sequence_sort_key) {
|
| + auto sequence_and_sort_key = WrapUnique(new PriorityQueue::SequenceAndSortKey(
|
| + std::move(sequence), sequence_sort_key));
|
| + auto transaction = shared_priority_queue_.BeginTransaction();
|
| +
|
| + // The thread calling this method just ran a Task from |sequence| and will
|
| + // soon try to get another Sequence from which to run a Task. If the thread
|
| + // belongs to this pool, it will get that Sequence from
|
| + // |shared_priority_queue_|. When that's the case, there is no need to wake up
|
| + // another thread after |sequence| is inserted in |shared_priority_queue_|. If
|
| + // we did wake up another thread, we would waste resources by having more
|
| + // threads trying to get a Sequence from |shared_priority_queue_| than the
|
| + // number of Sequences in it.
|
| + if (tls_current_shared_priority_queue.Get().Get() == &shared_priority_queue_)
|
| + transaction->PushNoWakeUp(std::move(sequence_and_sort_key));
|
| + else
|
| + transaction->Push(std::move(sequence_and_sort_key));
|
| +}
|
| +
|
| +void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
|
| + AutoSchedulerLock auto_lock(worker_threads_lock_);
|
| + while (idle_worker_threads_stack_.size() < worker_threads_.size())
|
| + idle_worker_threads_stack_cv_for_testing_->Wait();
|
| +}
|
| +
|
| +void SchedulerThreadPool::JoinForTesting() {
|
| + for (const auto& worker_thread : worker_threads_)
|
| + worker_thread->JoinForTesting();
|
| +
|
| + DCHECK(!join_for_testing_returned_.IsSignaled());
|
| + join_for_testing_returned_.Signal();
|
| +}
|
| +
|
| +SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
|
| + SchedulerWorkerThreadDelegateImpl(
|
| + SchedulerThreadPool* outer,
|
| + const EnqueueSequenceCallback enqueue_sequence_callback)
|
| + : outer_(outer), enqueue_sequence_callback_(enqueue_sequence_callback) {}
|
| +
|
| +SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
|
| + ~SchedulerWorkerThreadDelegateImpl() = default;
|
| +
|
| +void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
|
| + DCHECK(!tls_current_shared_priority_queue.Get().Get());
|
| + tls_current_shared_priority_queue.Get().Set(&outer_->shared_priority_queue_);
|
| +}
|
| +
|
| +scoped_refptr<Sequence>
|
| +SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| + SchedulerWorkerThread* worker_thread) {
|
| + std::unique_ptr<PriorityQueue::Transaction> transaction(
|
| + outer_->shared_priority_queue_.BeginTransaction());
|
| + const auto sequence_and_sort_key = transaction->Peek();
|
| +
|
| + if (sequence_and_sort_key.is_null()) {
|
| + // |transaction| is kept alive while |worker_thread| is added to
|
| + // |idle_worker_threads_stack_| to avoid this race:
|
| + // 1. This thread creates a Transaction, finds |shared_priority_queue_|
|
| + // empty and ends the Transaction.
|
| + // 2. Other thread creates a Transaction, inserts a Sequence into
|
| + // |shared_priority_queue_| and ends the Transaction. This can't happen
|
| + // if the Transaction of step 1 is still active because because there can
|
| + // only be one active Transaction per PriorityQueue at a time.
|
| + // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
|
| + // |idle_worker_threads_stack_| is empty.
|
| + // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
|
| + // sleep. No thread runs the Sequence inserted in step 2.
|
| + outer_->AddToIdleWorkerThreadsStack(worker_thread);
|
| + return nullptr;
|
| + }
|
| +
|
| + transaction->Pop();
|
| + return sequence_and_sort_key.sequence;
|
| +}
|
| +
|
| +void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
|
| + scoped_refptr<Sequence> sequence) {
|
| + enqueue_sequence_callback_.Run(std::move(sequence));
|
| +}
|
| +
|
| +SchedulerThreadPool::SchedulerThreadPool(
|
| + const EnqueueSequenceCallback& enqueue_sequence_callback,
|
| + TaskTracker* task_tracker)
|
| + : shared_priority_queue_(
|
| + Bind(&SchedulerThreadPool::WakeUpOneThread, Unretained(this))),
|
| + worker_threads_lock_(shared_priority_queue_.container_lock()),
|
| + idle_worker_threads_stack_cv_for_testing_(
|
| + worker_threads_lock_.CreateConditionVariable()),
|
| + join_for_testing_returned_(true, false),
|
| + worker_thread_delegate_(
|
| + new SchedulerWorkerThreadDelegateImpl(this,
|
| + enqueue_sequence_callback)),
|
| + task_tracker_(task_tracker) {
|
| + DCHECK(task_tracker_);
|
| +}
|
| +
|
| +bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
|
| + size_t max_threads) {
|
| + AutoSchedulerLock auto_lock(worker_threads_lock_);
|
| +
|
| + DCHECK(worker_threads_.empty());
|
| +
|
| + for (size_t i = 0; i < max_threads; ++i) {
|
| + std::unique_ptr<SchedulerWorkerThread> worker_thread =
|
| + SchedulerWorkerThread::CreateSchedulerWorkerThread(
|
| + thread_priority, worker_thread_delegate_.get(), task_tracker_);
|
| + if (!worker_thread)
|
| + break;
|
| + idle_worker_threads_stack_.push(worker_thread.get());
|
| + worker_threads_.push_back(std::move(worker_thread));
|
| + }
|
| +
|
| + return !worker_threads_.empty();
|
| +}
|
| +
|
| +void SchedulerThreadPool::WakeUpOneThread() {
|
| + SchedulerWorkerThread* worker_thread;
|
| + {
|
| + AutoSchedulerLock auto_lock(worker_threads_lock_);
|
| +
|
| + if (idle_worker_threads_stack_.empty())
|
| + return;
|
| +
|
| + worker_thread = idle_worker_threads_stack_.top();
|
| + idle_worker_threads_stack_.pop();
|
| + }
|
| + worker_thread->WakeUp();
|
| +}
|
| +
|
| +void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
|
| + SchedulerWorkerThread* worker_thread) {
|
| + AutoSchedulerLock auto_lock(worker_threads_lock_);
|
| + idle_worker_threads_stack_.push(worker_thread);
|
| + DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size());
|
| +
|
| + if (idle_worker_threads_stack_.size() == worker_threads_.size())
|
| + idle_worker_threads_stack_cv_for_testing_->Broadcast();
|
| +}
|
| +
|
| +} // namespace internal
|
| +} // namespace base
|
|
|