| Index: base/task_scheduler/scheduler_thread_pool_impl.cc
|
| diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool_impl.cc
|
| similarity index 64%
|
| rename from base/task_scheduler/scheduler_thread_pool.cc
|
| rename to base/task_scheduler/scheduler_thread_pool_impl.cc
|
| index 6e14638da37077aa04df244f182fba3e917a6875..e72ea6e12ceb326ab408a68a9aaf91d26efae9ea 100644
|
| --- a/base/task_scheduler/scheduler_thread_pool.cc
|
| +++ b/base/task_scheduler/scheduler_thread_pool_impl.cc
|
| @@ -2,7 +2,7 @@
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
|
|
| -#include "base/task_scheduler/scheduler_thread_pool.h"
|
| +#include "base/task_scheduler/scheduler_thread_pool_impl.h"
|
|
|
| #include <utility>
|
|
|
| @@ -12,7 +12,8 @@
|
| #include "base/logging.h"
|
| #include "base/memory/ptr_util.h"
|
| #include "base/sequenced_task_runner.h"
|
| -#include "base/task_scheduler/utils.h"
|
| +#include "base/task_scheduler/delayed_task_manager.h"
|
| +#include "base/task_scheduler/task_tracker.h"
|
| #include "base/threading/thread_local.h"
|
|
|
| namespace base {
|
| @@ -21,48 +22,40 @@ namespace internal {
|
| namespace {
|
|
|
| // SchedulerThreadPool that owns the current thread, if any.
|
| -LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
|
| +LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
|
| tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
|
|
|
| // A task runner that runs tasks with the PARALLEL ExecutionMode.
|
| class SchedulerParallelTaskRunner : public TaskRunner {
|
| public:
|
| // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
|
| - // long as |executor| is alive.
|
| - // TODO(robliao): Find a concrete way to manage |executor|'s memory.
|
| + // long as |thread_pool| is alive.
|
| + // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory.
|
| SchedulerParallelTaskRunner(const TaskTraits& traits,
|
| - SchedulerTaskExecutor* executor,
|
| - TaskTracker* task_tracker,
|
| - DelayedTaskManager* delayed_task_manager)
|
| - : traits_(traits),
|
| - executor_(executor),
|
| - task_tracker_(task_tracker),
|
| - delayed_task_manager_(delayed_task_manager) {}
|
| + SchedulerThreadPool* thread_pool)
|
| + : traits_(traits), thread_pool_(thread_pool) {}
|
|
|
| // TaskRunner:
|
| bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| const Closure& closure,
|
| TimeDelta delay) override {
|
| // Post the task as part of a one-off single-task Sequence.
|
| - return PostTaskToExecutor(
|
| + return thread_pool_->PostTaskWithSequence(
|
| WrapUnique(
|
| new Task(from_here, closure, traits_,
|
| delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
|
| - make_scoped_refptr(new Sequence), executor_, task_tracker_,
|
| - delayed_task_manager_);
|
| + make_scoped_refptr(new Sequence));
|
| }
|
|
|
| bool RunsTasksOnCurrentThread() const override {
|
| - return tls_current_thread_pool.Get().Get() == executor_;
|
| + return tls_current_thread_pool.Get().Get() == thread_pool_;
|
| }
|
|
|
| private:
|
| ~SchedulerParallelTaskRunner() override = default;
|
|
|
| const TaskTraits traits_;
|
| - SchedulerTaskExecutor* const executor_;
|
| - TaskTracker* const task_tracker_;
|
| - DelayedTaskManager* const delayed_task_manager_;
|
| + SchedulerThreadPool* const thread_pool_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
|
| };
|
| @@ -70,28 +63,23 @@ class SchedulerParallelTaskRunner : public TaskRunner {
|
| // A task runner that runs tasks with the SEQUENCED ExecutionMode.
|
| class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| public:
|
| - // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
|
| - // long as |executor| is alive.
|
| - // TODO(robliao): Find a concrete way to manage |executor|'s memory.
|
| + // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
|
| + // so long as |thread_pool| is alive.
|
| + // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory.
|
| SchedulerSequencedTaskRunner(const TaskTraits& traits,
|
| - SchedulerTaskExecutor* executor,
|
| - TaskTracker* task_tracker,
|
| - DelayedTaskManager* delayed_task_manager)
|
| - : traits_(traits),
|
| - executor_(executor),
|
| - task_tracker_(task_tracker),
|
| - delayed_task_manager_(delayed_task_manager) {}
|
| + SchedulerThreadPool* thread_pool)
|
| + : traits_(traits), thread_pool_(thread_pool) {}
|
|
|
| // SequencedTaskRunner:
|
| bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| const Closure& closure,
|
| TimeDelta delay) override {
|
| // Post the task as part of |sequence|.
|
| - return PostTaskToExecutor(
|
| + return thread_pool_->PostTaskWithSequence(
|
| WrapUnique(
|
| new Task(from_here, closure, traits_,
|
| delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
|
| - sequence_, executor_, task_tracker_, delayed_task_manager_);
|
| + sequence_);
|
| }
|
|
|
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| @@ -102,7 +90,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| }
|
|
|
| bool RunsTasksOnCurrentThread() const override {
|
| - return tls_current_thread_pool.Get().Get() == executor_;
|
| + return tls_current_thread_pool.Get().Get() == thread_pool_;
|
| }
|
|
|
| private:
|
| @@ -112,66 +100,78 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| const scoped_refptr<Sequence> sequence_ = new Sequence;
|
|
|
| const TaskTraits traits_;
|
| - SchedulerTaskExecutor* const executor_;
|
| - TaskTracker* const task_tracker_;
|
| - DelayedTaskManager* const delayed_task_manager_;
|
| + SchedulerThreadPool* const thread_pool_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
|
| };
|
|
|
| } // namespace
|
|
|
| -class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
|
| +class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
|
| : public SchedulerWorkerThread::Delegate {
|
| public:
|
| SchedulerWorkerThreadDelegateImpl(
|
| - SchedulerThreadPool* outer,
|
| - const EnqueueSequenceCallback& enqueue_sequence_callback);
|
| + SchedulerThreadPoolImpl* outer,
|
| + const ReEnqueueSequenceCallback& re_enqueue_sequence_callback);
|
| ~SchedulerWorkerThreadDelegateImpl() override;
|
|
|
| // SchedulerWorkerThread::Delegate:
|
| void OnMainEntry() override;
|
| scoped_refptr<Sequence> GetWork(
|
| SchedulerWorkerThread* worker_thread) override;
|
| - void EnqueueSequence(scoped_refptr<Sequence> sequence) override;
|
| + void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
|
|
|
| private:
|
| - SchedulerThreadPool* outer_;
|
| - const EnqueueSequenceCallback enqueue_sequence_callback_;
|
| + SchedulerThreadPoolImpl* outer_;
|
| + const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
|
| };
|
|
|
| -SchedulerThreadPool::~SchedulerThreadPool() {
|
| +SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() {
|
| // SchedulerThreadPool should never be deleted in production unless its
|
| // initialization failed.
|
| DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
|
| }
|
|
|
| -std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
|
| +std::unique_ptr<SchedulerThreadPoolImpl>
|
| +SchedulerThreadPoolImpl::CreateThreadPool(
|
| ThreadPriority thread_priority,
|
| size_t max_threads,
|
| - const EnqueueSequenceCallback& enqueue_sequence_callback,
|
| + const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
|
| TaskTracker* task_tracker,
|
| DelayedTaskManager* delayed_task_manager) {
|
| - std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool(
|
| - enqueue_sequence_callback, task_tracker, delayed_task_manager));
|
| + std::unique_ptr<SchedulerThreadPoolImpl> thread_pool(
|
| + new SchedulerThreadPoolImpl(re_enqueue_sequence_callback, task_tracker,
|
| + delayed_task_manager));
|
| if (thread_pool->Initialize(thread_priority, max_threads))
|
| return thread_pool;
|
| return nullptr;
|
| }
|
|
|
| -scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
|
| +void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() {
|
| + AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| + while (idle_worker_threads_stack_.Size() < worker_threads_.size())
|
| + idle_worker_threads_stack_cv_for_testing_->Wait();
|
| +}
|
| +
|
| +void SchedulerThreadPoolImpl::JoinForTesting() {
|
| + for (const auto& worker_thread : worker_threads_)
|
| + worker_thread->JoinForTesting();
|
| +
|
| + DCHECK(!join_for_testing_returned_.IsSignaled());
|
| + join_for_testing_returned_.Signal();
|
| +}
|
| +
|
| +scoped_refptr<TaskRunner> SchedulerThreadPoolImpl::CreateTaskRunnerWithTraits(
|
| const TaskTraits& traits,
|
| ExecutionMode execution_mode) {
|
| switch (execution_mode) {
|
| case ExecutionMode::PARALLEL:
|
| - return make_scoped_refptr(new SchedulerParallelTaskRunner(
|
| - traits, this, task_tracker_, delayed_task_manager_));
|
| + return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
|
|
|
| case ExecutionMode::SEQUENCED:
|
| - return make_scoped_refptr(new SchedulerSequencedTaskRunner(
|
| - traits, this, task_tracker_, delayed_task_manager_));
|
| + return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
|
|
|
| case ExecutionMode::SINGLE_THREADED:
|
| // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
|
| @@ -183,7 +183,7 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
|
| return nullptr;
|
| }
|
|
|
| -void SchedulerThreadPool::EnqueueSequence(
|
| +void SchedulerThreadPoolImpl::ReEnqueueSequence(
|
| scoped_refptr<Sequence> sequence,
|
| const SequenceSortKey& sequence_sort_key) {
|
| shared_priority_queue_.BeginTransaction()->Push(
|
| @@ -202,51 +202,70 @@ void SchedulerThreadPool::EnqueueSequence(
|
| WakeUpOneThread();
|
| }
|
|
|
| -void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
|
| - AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| - while (idle_worker_threads_stack_.Size() < worker_threads_.size())
|
| - idle_worker_threads_stack_cv_for_testing_->Wait();
|
| -}
|
| +bool SchedulerThreadPoolImpl::PostTaskWithSequence(
|
| + std::unique_ptr<Task> task,
|
| + scoped_refptr<Sequence> sequence) {
|
| + DCHECK(task);
|
| + DCHECK(sequence);
|
|
|
| -void SchedulerThreadPool::JoinForTesting() {
|
| - for (const auto& worker_thread : worker_threads_)
|
| - worker_thread->JoinForTesting();
|
| + if (!task_tracker_->WillPostTask(task.get()))
|
| + return false;
|
|
|
| - DCHECK(!join_for_testing_returned_.IsSignaled());
|
| - join_for_testing_returned_.Signal();
|
| + if (task->delayed_run_time.is_null()) {
|
| + PostTaskWithSequenceNow(std::move(task), std::move(sequence));
|
| + } else {
|
| + delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence),
|
| + this);
|
| + }
|
| +
|
| + return true;
|
| }
|
|
|
| -void SchedulerThreadPool::PostTaskWithSequence(
|
| +void SchedulerThreadPoolImpl::PostTaskWithSequenceNow(
|
| std::unique_ptr<Task> task,
|
| scoped_refptr<Sequence> sequence) {
|
| DCHECK(task);
|
| DCHECK(sequence);
|
|
|
| - const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
|
| - std::move(task), std::move(sequence), &shared_priority_queue_);
|
| -
|
| - // No thread has already been woken up to run Tasks from |sequence| if it was
|
| - // empty before |task| was inserted into it.
|
| - if (sequence_was_empty)
|
| + // Confirm that |task| is ready to run (its delayed run time is either null or
|
| + // in the past).
|
| + DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now());
|
| +
|
| + const bool sequence_was_empty = sequence->PushTask(std::move(task));
|
| + if (sequence_was_empty) {
|
| + // Insert |sequence| in |shared_priority_queue_| if it was empty before
|
| + // |task| was inserted into it. Otherwise, one of these must be true:
|
| + // - |sequence| is already in a PriorityQueue (not necessarily
|
| + // |shared_priority_queue_|), or,
|
| + // - A worker thread is running a Task from |sequence|. It will insert
|
| + // |sequence| in a PriorityQueue once it's done running the Task.
|
| + const auto sequence_sort_key = sequence->GetSortKey();
|
| + shared_priority_queue_.BeginTransaction()->Push(
|
| + WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
|
| + sequence_sort_key)));
|
| +
|
| + // Wake up a worker thread to process |sequence|.
|
| WakeUpOneThread();
|
| + }
|
| }
|
|
|
| -SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
|
| +SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
|
| SchedulerWorkerThreadDelegateImpl(
|
| - SchedulerThreadPool* outer,
|
| - const EnqueueSequenceCallback& enqueue_sequence_callback)
|
| - : outer_(outer), enqueue_sequence_callback_(enqueue_sequence_callback) {}
|
| + SchedulerThreadPoolImpl* outer,
|
| + const ReEnqueueSequenceCallback& re_enqueue_sequence_callback)
|
| + : outer_(outer),
|
| + re_enqueue_sequence_callback_(re_enqueue_sequence_callback) {}
|
|
|
| -SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
|
| +SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
|
| ~SchedulerWorkerThreadDelegateImpl() = default;
|
|
|
| -void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
|
| +void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
|
| DCHECK(!tls_current_thread_pool.Get().Get());
|
| tls_current_thread_pool.Get().Set(outer_);
|
| }
|
|
|
| scoped_refptr<Sequence>
|
| -SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| +SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| SchedulerWorkerThread* worker_thread) {
|
| std::unique_ptr<PriorityQueue::Transaction> transaction(
|
| outer_->shared_priority_queue_.BeginTransaction());
|
| @@ -274,13 +293,13 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| return sequence;
|
| }
|
|
|
| -void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
|
| - scoped_refptr<Sequence> sequence) {
|
| - enqueue_sequence_callback_.Run(std::move(sequence));
|
| +void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
|
| + ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
|
| + re_enqueue_sequence_callback_.Run(std::move(sequence));
|
| }
|
|
|
| -SchedulerThreadPool::SchedulerThreadPool(
|
| - const EnqueueSequenceCallback& enqueue_sequence_callback,
|
| +SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
|
| + const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
|
| TaskTracker* task_tracker,
|
| DelayedTaskManager* delayed_task_manager)
|
| : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
|
| @@ -289,15 +308,15 @@ SchedulerThreadPool::SchedulerThreadPool(
|
| join_for_testing_returned_(true, false),
|
| worker_thread_delegate_(
|
| new SchedulerWorkerThreadDelegateImpl(this,
|
| - enqueue_sequence_callback)),
|
| + re_enqueue_sequence_callback)),
|
| task_tracker_(task_tracker),
|
| delayed_task_manager_(delayed_task_manager) {
|
| DCHECK(task_tracker_);
|
| DCHECK(delayed_task_manager_);
|
| }
|
|
|
| -bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
|
| - size_t max_threads) {
|
| +bool SchedulerThreadPoolImpl::Initialize(ThreadPriority thread_priority,
|
| + size_t max_threads) {
|
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
|
|
| DCHECK(worker_threads_.empty());
|
| @@ -315,7 +334,7 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
|
| return !worker_threads_.empty();
|
| }
|
|
|
| -void SchedulerThreadPool::WakeUpOneThread() {
|
| +void SchedulerThreadPoolImpl::WakeUpOneThread() {
|
| SchedulerWorkerThread* worker_thread;
|
| {
|
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| @@ -325,7 +344,7 @@ void SchedulerThreadPool::WakeUpOneThread() {
|
| worker_thread->WakeUp();
|
| }
|
|
|
| -void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
|
| +void SchedulerThreadPoolImpl::AddToIdleWorkerThreadsStack(
|
| SchedulerWorkerThread* worker_thread) {
|
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| idle_worker_threads_stack_.Push(worker_thread);
|
|
|