| Index: base/task_scheduler/scheduler_thread_pool.cc
|
| diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool.cc
|
| index 8cd9620e26041965dbf9cb2779bbfe52b54944b1..9f2174ba45f8470656bbc7d4fa26262c81b46f35 100644
|
| --- a/base/task_scheduler/scheduler_thread_pool.cc
|
| +++ b/base/task_scheduler/scheduler_thread_pool.cc
|
| @@ -12,6 +12,7 @@
|
| #include "base/logging.h"
|
| #include "base/memory/ptr_util.h"
|
| #include "base/sequenced_task_runner.h"
|
| +#include "base/single_thread_task_runner.h"
|
| #include "base/task_scheduler/delayed_task_manager.h"
|
| #include "base/task_scheduler/task_tracker.h"
|
| #include "base/threading/thread_local.h"
|
| @@ -25,6 +26,10 @@ namespace {
|
| LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
|
| tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
|
|
|
| +// SchedulerWorkerThread that owns the current thread, if any.
|
| +LazyInstance<ThreadLocalPointer<const SchedulerWorkerThread>>::Leaky
|
| + tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
|
| +
|
| // A task runner that runs tasks with the PARALLEL ExecutionMode.
|
| class SchedulerParallelTaskRunner : public TaskRunner {
|
| public:
|
| @@ -42,7 +47,7 @@ class SchedulerParallelTaskRunner : public TaskRunner {
|
| // Post the task as part of a one-off single-task Sequence.
|
| return thread_pool_->PostTaskWithSequence(
|
| WrapUnique(new Task(from_here, closure, traits_, delay)),
|
| - make_scoped_refptr(new Sequence));
|
| + make_scoped_refptr(new Sequence), nullptr);
|
| }
|
|
|
| bool RunsTasksOnCurrentThread() const override {
|
| @@ -72,9 +77,10 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| const Closure& closure,
|
| TimeDelta delay) override {
|
| - // Post the task as part of |sequence|.
|
| + // Post the task as part of |sequence_|.
|
| return thread_pool_->PostTaskWithSequence(
|
| - WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_);
|
| + WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_,
|
| + nullptr);
|
| }
|
|
|
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| @@ -100,6 +106,54 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
|
| };
|
|
|
| +// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
|
| +class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
|
| + public:
|
| + // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
|
| + // tasks so long as |thread_pool| and |worker_thread| are alive.
|
| + // TODO(robliao): Find a concrete way to manage the memory of |thread_pool|
|
| + // and |worker_thread|.
|
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
|
| + SchedulerThreadPool* thread_pool,
|
| + SchedulerWorkerThread* worker_thread)
|
| + : traits_(traits),
|
| + thread_pool_(thread_pool),
|
| + worker_thread_(worker_thread) {}
|
| +
|
| + // SingleThreadTaskRunner:
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) override {
|
| + // Post the task to be executed by |worker_thread_| as part of |sequence_|.
|
| + return thread_pool_->PostTaskWithSequence(
|
| + WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_,
|
| + worker_thread_);
|
| + }
|
| +
|
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + base::TimeDelta delay) override {
|
| + // Tasks are never nested within the task scheduler.
|
| + return PostDelayedTask(from_here, closure, delay);
|
| + }
|
| +
|
| + bool RunsTasksOnCurrentThread() const override {
|
| + return tls_current_worker_thread.Get().Get() == worker_thread_;
|
| + }
|
| +
|
| + private:
|
| + ~SchedulerSingleThreadTaskRunner() override = default;
|
| +
|
| + // Sequence for all Tasks posted through this TaskRunner.
|
| + const scoped_refptr<Sequence> sequence_ = new Sequence;
|
| +
|
| + const TaskTraits traits_;
|
| + SchedulerThreadPool* const thread_pool_;
|
| + SchedulerWorkerThread* const worker_thread_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
|
| +};
|
| +
|
| } // namespace
|
|
|
| class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
|
| @@ -107,19 +161,25 @@ class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
|
| public:
|
| SchedulerWorkerThreadDelegateImpl(
|
| SchedulerThreadPool* outer,
|
| + PriorityQueue* single_threaded_priority_queue,
|
| const EnqueueSequenceCallback& enqueue_sequence_callback);
|
| ~SchedulerWorkerThreadDelegateImpl() override;
|
|
|
| // SchedulerWorkerThread::Delegate:
|
| - void OnMainEntry() override;
|
| + void OnMainEntry(SchedulerWorkerThread* worker_thread) override;
|
| scoped_refptr<Sequence> GetWork(
|
| SchedulerWorkerThread* worker_thread) override;
|
| void EnqueueSequence(scoped_refptr<Sequence> sequence) override;
|
|
|
| private:
|
| SchedulerThreadPool* outer_;
|
| + PriorityQueue* const single_threaded_priority_queue_;
|
| const EnqueueSequenceCallback enqueue_sequence_callback_;
|
|
|
| + // True if the last Sequence returned by GetWork() was extracted from
|
| + // |single_threaded_priority_queue_|.
|
| + bool last_sequence_is_single_threaded_ = false;
|
| +
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
|
| };
|
|
|
| @@ -153,10 +213,19 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
|
| case ExecutionMode::SEQUENCED:
|
| return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
|
|
|
| - case ExecutionMode::SINGLE_THREADED:
|
| - // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
|
| - NOTREACHED();
|
| - return nullptr;
|
| + case ExecutionMode::SINGLE_THREADED: {
|
| + // TODO(fdoray): Find a better way to assign a worker thread to a
|
| + // SingleThreadTaskRunner.
|
| + size_t worker_thread_index;
|
| + {
|
| + AutoSchedulerLock auto_lock(next_worker_thread_index_lock_);
|
| + worker_thread_index = next_worker_thread_index_;
|
| + next_worker_thread_index_ =
|
| + (next_worker_thread_index_ + 1) % worker_threads_.size();
|
| + }
|
| + return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
|
| + traits, this, worker_threads_[worker_thread_index].get()));
|
| + }
|
| }
|
|
|
| NOTREACHED();
|
| @@ -198,7 +267,8 @@ void SchedulerThreadPool::JoinForTesting() {
|
|
|
| bool SchedulerThreadPool::PostTaskWithSequence(
|
| std::unique_ptr<Task> task,
|
| - scoped_refptr<Sequence> sequence) {
|
| + scoped_refptr<Sequence> sequence,
|
| + SchedulerWorkerThread* worker_thread) {
|
| DCHECK(task);
|
| DCHECK(sequence);
|
|
|
| @@ -206,10 +276,11 @@ bool SchedulerThreadPool::PostTaskWithSequence(
|
| return false;
|
|
|
| if (task->delayed_run_time.is_null()) {
|
| - PostTaskWithSequenceNow(std::move(task), std::move(sequence));
|
| + PostTaskWithSequenceNow(std::move(task), std::move(sequence),
|
| + worker_thread);
|
| } else {
|
| delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence),
|
| - this);
|
| + worker_thread, this);
|
| }
|
|
|
| return true;
|
| @@ -217,7 +288,8 @@ bool SchedulerThreadPool::PostTaskWithSequence(
|
|
|
| void SchedulerThreadPool::PostTaskWithSequenceNow(
|
| std::unique_ptr<Task> task,
|
| - scoped_refptr<Sequence> sequence) {
|
| + scoped_refptr<Sequence> sequence,
|
| + SchedulerWorkerThread* worker_thread) {
|
| DCHECK(task);
|
| DCHECK(sequence);
|
|
|
| @@ -225,47 +297,69 @@ void SchedulerThreadPool::PostTaskWithSequenceNow(
|
| // in the past).
|
| DCHECK_LE(task->delayed_run_time, TimeTicks::Now());
|
|
|
| + PriorityQueue* const priority_queue =
|
| + worker_thread ? single_threaded_priority_queues_[worker_thread].get()
|
| + : &shared_priority_queue_;
|
| + DCHECK(priority_queue);
|
| +
|
| const bool sequence_was_empty = sequence->PushTask(std::move(task));
|
| if (sequence_was_empty) {
|
| - // Insert |sequence| in |shared_priority_queue_| if it was empty before
|
| - // |task| was inserted into it. Otherwise, one of these must be true:
|
| + // Insert |sequence| in |priority_queue| if it was empty before |task| was
|
| + // inserted into it. Otherwise, one of these must be true:
|
| // - |sequence| is already in a PriorityQueue (not necessarily
|
| // |shared_priority_queue_|), or,
|
| // - A worker thread is running a Task from |sequence|. It will insert
|
| // |sequence| in a PriorityQueue once it's done running the Task.
|
| const auto sequence_sort_key = sequence->GetSortKey();
|
| - shared_priority_queue_.BeginTransaction()->Push(
|
| + priority_queue->BeginTransaction()->Push(
|
| WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
|
| sequence_sort_key)));
|
|
|
| // Wake up a worker thread to process |sequence|.
|
| - WakeUpOneThread();
|
| + if (worker_thread)
|
| + worker_thread->WakeUp();
|
| + else
|
| + WakeUpOneThread();
|
| }
|
| }
|
|
|
| SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
|
| SchedulerWorkerThreadDelegateImpl(
|
| SchedulerThreadPool* outer,
|
| + PriorityQueue* single_threaded_priority_queue,
|
| const EnqueueSequenceCallback& enqueue_sequence_callback)
|
| - : outer_(outer), enqueue_sequence_callback_(enqueue_sequence_callback) {}
|
| + : outer_(outer),
|
| + single_threaded_priority_queue_(single_threaded_priority_queue),
|
| + enqueue_sequence_callback_(enqueue_sequence_callback) {}
|
|
|
| SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
|
| ~SchedulerWorkerThreadDelegateImpl() = default;
|
|
|
| -void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
|
| +void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry(
|
| + SchedulerWorkerThread* worker_thread) {
|
| + DCHECK(!tls_current_worker_thread.Get().Get());
|
| DCHECK(!tls_current_thread_pool.Get().Get());
|
| + tls_current_worker_thread.Get().Set(worker_thread);
|
| tls_current_thread_pool.Get().Set(outer_);
|
| }
|
|
|
| scoped_refptr<Sequence>
|
| SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| SchedulerWorkerThread* worker_thread) {
|
| - std::unique_ptr<PriorityQueue::Transaction> transaction(
|
| + std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
|
| outer_->shared_priority_queue_.BeginTransaction());
|
| - const auto& sequence_and_sort_key = transaction->Peek();
|
| + const auto& shared_sequence_and_sort_key = shared_transaction->Peek();
|
| +
|
| + std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction(
|
| + single_threaded_priority_queue_->BeginTransaction());
|
| + const auto& single_threaded_sequence_and_sort_key =
|
| + single_threaded_transaction->Peek();
|
| +
|
| + if (shared_sequence_and_sort_key.is_null() &&
|
| + single_threaded_sequence_and_sort_key.is_null()) {
|
| + single_threaded_transaction.reset();
|
|
|
| - if (sequence_and_sort_key.is_null()) {
|
| - // |transaction| is kept alive while |worker_thread| is added to
|
| + // |shared_transaction| is kept alive while |worker_thread| is added to
|
| // |idle_worker_threads_stack_| to avoid this race:
|
| // 1. This thread creates a Transaction, finds |shared_priority_queue_|
|
| // empty and ends the Transaction.
|
| @@ -281,14 +375,43 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| return nullptr;
|
| }
|
|
|
| - scoped_refptr<Sequence> sequence = sequence_and_sort_key.sequence;
|
| - transaction->Pop();
|
| + scoped_refptr<Sequence> sequence;
|
| +
|
| + if (single_threaded_sequence_and_sort_key.is_null() ||
|
| + (!shared_sequence_and_sort_key.is_null() &&
|
| + single_threaded_sequence_and_sort_key.sort_key <
|
| + shared_sequence_and_sort_key.sort_key)) {
|
| + sequence = shared_sequence_and_sort_key.sequence;
|
| + shared_transaction->Pop();
|
| + last_sequence_is_single_threaded_ = false;
|
| + } else {
|
| + DCHECK(!single_threaded_sequence_and_sort_key.is_null());
|
| + sequence = single_threaded_sequence_and_sort_key.sequence;
|
| + single_threaded_transaction->Pop();
|
| + last_sequence_is_single_threaded_ = true;
|
| + }
|
| +
|
| + shared_transaction.reset();
|
| + single_threaded_transaction.reset();
|
| + outer_->RemoveFromIdleWorkerThreadsStack(worker_thread);
|
| +
|
| return sequence;
|
| }
|
|
|
| void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
|
| scoped_refptr<Sequence> sequence) {
|
| - enqueue_sequence_callback_.Run(std::move(sequence));
|
| + if (last_sequence_is_single_threaded_) {
|
| + // A single-threaded Sequence is always re-enqueued in the single-threaded
|
| + // PriorityQueue from which it was extracted.
|
| + const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
|
| + single_threaded_priority_queue_->BeginTransaction()->Push(
|
| + WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
|
| + sequence_sort_key)));
|
| + } else {
|
| + // |enqueue_sequence_callback_| will determine in which PriorityQueue
|
| + // |sequence| must be enqueued.
|
| + enqueue_sequence_callback_.Run(std::move(sequence));
|
| + }
|
| }
|
|
|
| SchedulerThreadPool::SchedulerThreadPool(
|
| @@ -313,14 +436,19 @@ bool SchedulerThreadPool::Initialize(
|
| DCHECK(worker_threads_.empty());
|
|
|
| for (size_t i = 0; i < max_threads; ++i) {
|
| + std::unique_ptr<PriorityQueue> single_threaded_priority_queue(
|
| + new PriorityQueue(&shared_priority_queue_));
|
| std::unique_ptr<SchedulerWorkerThread> worker_thread =
|
| SchedulerWorkerThread::CreateWorkerThread(
|
| thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl(
|
| - this, enqueue_sequence_callback)),
|
| + this, single_threaded_priority_queue.get(),
|
| + enqueue_sequence_callback)),
|
| task_tracker_);
|
| if (!worker_thread)
|
| break;
|
| idle_worker_threads_stack_.Push(worker_thread.get());
|
| + single_threaded_priority_queues_[worker_thread.get()] =
|
| + std::move(single_threaded_priority_queue);
|
| worker_threads_.push_back(std::move(worker_thread));
|
| }
|
|
|
| @@ -343,6 +471,12 @@ void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
|
| idle_worker_threads_stack_cv_for_testing_->Broadcast();
|
| }
|
|
|
| +void SchedulerThreadPool::RemoveFromIdleWorkerThreadsStack(
|
| + SchedulerWorkerThread* worker_thread) {
|
| + AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| + idle_worker_threads_stack_.Remove(worker_thread);
|
| +}
|
| +
|
| SchedulerWorkerThread* SchedulerThreadPool::PopOneIdleWorkerThread() {
|
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
|
|
|
|