Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(392)

Unified Diff: base/task_scheduler/scheduler_thread_pool.cc

Issue 1906083002: TaskScheduler: Remove base/task_scheduler/utils.h/.cc (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@sched_2_stack
Patch Set: typos Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_thread_pool.cc
diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool.cc
deleted file mode 100644
index 6e14638da37077aa04df244f182fba3e917a6875..0000000000000000000000000000000000000000
--- a/base/task_scheduler/scheduler_thread_pool.cc
+++ /dev/null
@@ -1,339 +0,0 @@
-// Copyright 2016 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "base/task_scheduler/scheduler_thread_pool.h"
-
-#include <utility>
-
-#include "base/bind.h"
-#include "base/bind_helpers.h"
-#include "base/lazy_instance.h"
-#include "base/logging.h"
-#include "base/memory/ptr_util.h"
-#include "base/sequenced_task_runner.h"
-#include "base/task_scheduler/utils.h"
-#include "base/threading/thread_local.h"
-
-namespace base {
-namespace internal {
-
-namespace {
-
-// SchedulerThreadPool that owns the current thread, if any.
-LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
- tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
-
-// A task runner that runs tasks with the PARALLEL ExecutionMode.
-class SchedulerParallelTaskRunner : public TaskRunner {
- public:
- // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
- // long as |executor| is alive.
- // TODO(robliao): Find a concrete way to manage |executor|'s memory.
- SchedulerParallelTaskRunner(const TaskTraits& traits,
- SchedulerTaskExecutor* executor,
- TaskTracker* task_tracker,
- DelayedTaskManager* delayed_task_manager)
- : traits_(traits),
- executor_(executor),
- task_tracker_(task_tracker),
- delayed_task_manager_(delayed_task_manager) {}
-
- // TaskRunner:
- bool PostDelayedTask(const tracked_objects::Location& from_here,
- const Closure& closure,
- TimeDelta delay) override {
- // Post the task as part of a one-off single-task Sequence.
- return PostTaskToExecutor(
- WrapUnique(
- new Task(from_here, closure, traits_,
- delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
- make_scoped_refptr(new Sequence), executor_, task_tracker_,
- delayed_task_manager_);
- }
-
- bool RunsTasksOnCurrentThread() const override {
- return tls_current_thread_pool.Get().Get() == executor_;
- }
-
- private:
- ~SchedulerParallelTaskRunner() override = default;
-
- const TaskTraits traits_;
- SchedulerTaskExecutor* const executor_;
- TaskTracker* const task_tracker_;
- DelayedTaskManager* const delayed_task_manager_;
-
- DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
-};
-
-// A task runner that runs tasks with the SEQUENCED ExecutionMode.
-class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
- public:
- // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
- // long as |executor| is alive.
- // TODO(robliao): Find a concrete way to manage |executor|'s memory.
- SchedulerSequencedTaskRunner(const TaskTraits& traits,
- SchedulerTaskExecutor* executor,
- TaskTracker* task_tracker,
- DelayedTaskManager* delayed_task_manager)
- : traits_(traits),
- executor_(executor),
- task_tracker_(task_tracker),
- delayed_task_manager_(delayed_task_manager) {}
-
- // SequencedTaskRunner:
- bool PostDelayedTask(const tracked_objects::Location& from_here,
- const Closure& closure,
- TimeDelta delay) override {
- // Post the task as part of |sequence|.
- return PostTaskToExecutor(
- WrapUnique(
- new Task(from_here, closure, traits_,
- delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
- sequence_, executor_, task_tracker_, delayed_task_manager_);
- }
-
- bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
- const Closure& closure,
- base::TimeDelta delay) override {
- // Tasks are never nested within the task scheduler.
- return PostDelayedTask(from_here, closure, delay);
- }
-
- bool RunsTasksOnCurrentThread() const override {
- return tls_current_thread_pool.Get().Get() == executor_;
- }
-
- private:
- ~SchedulerSequencedTaskRunner() override = default;
-
- // Sequence for all Tasks posted through this TaskRunner.
- const scoped_refptr<Sequence> sequence_ = new Sequence;
-
- const TaskTraits traits_;
- SchedulerTaskExecutor* const executor_;
- TaskTracker* const task_tracker_;
- DelayedTaskManager* const delayed_task_manager_;
-
- DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
-};
-
-} // namespace
-
-class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
- : public SchedulerWorkerThread::Delegate {
- public:
- SchedulerWorkerThreadDelegateImpl(
- SchedulerThreadPool* outer,
- const EnqueueSequenceCallback& enqueue_sequence_callback);
- ~SchedulerWorkerThreadDelegateImpl() override;
-
- // SchedulerWorkerThread::Delegate:
- void OnMainEntry() override;
- scoped_refptr<Sequence> GetWork(
- SchedulerWorkerThread* worker_thread) override;
- void EnqueueSequence(scoped_refptr<Sequence> sequence) override;
-
- private:
- SchedulerThreadPool* outer_;
- const EnqueueSequenceCallback enqueue_sequence_callback_;
-
- DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
-};
-
-SchedulerThreadPool::~SchedulerThreadPool() {
- // SchedulerThreadPool should never be deleted in production unless its
- // initialization failed.
- DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
-}
-
-std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
- ThreadPriority thread_priority,
- size_t max_threads,
- const EnqueueSequenceCallback& enqueue_sequence_callback,
- TaskTracker* task_tracker,
- DelayedTaskManager* delayed_task_manager) {
- std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool(
- enqueue_sequence_callback, task_tracker, delayed_task_manager));
- if (thread_pool->Initialize(thread_priority, max_threads))
- return thread_pool;
- return nullptr;
-}
-
-scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
- const TaskTraits& traits,
- ExecutionMode execution_mode) {
- switch (execution_mode) {
- case ExecutionMode::PARALLEL:
- return make_scoped_refptr(new SchedulerParallelTaskRunner(
- traits, this, task_tracker_, delayed_task_manager_));
-
- case ExecutionMode::SEQUENCED:
- return make_scoped_refptr(new SchedulerSequencedTaskRunner(
- traits, this, task_tracker_, delayed_task_manager_));
-
- case ExecutionMode::SINGLE_THREADED:
- // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
- NOTREACHED();
- return nullptr;
- }
-
- NOTREACHED();
- return nullptr;
-}
-
-void SchedulerThreadPool::EnqueueSequence(
- scoped_refptr<Sequence> sequence,
- const SequenceSortKey& sequence_sort_key) {
- shared_priority_queue_.BeginTransaction()->Push(
- WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
- sequence_sort_key)));
-
- // The thread calling this method just ran a Task from |sequence| and will
- // soon try to get another Sequence from which to run a Task. If the thread
- // belongs to this pool, it will get that Sequence from
- // |shared_priority_queue_|. When that's the case, there is no need to wake up
- // another thread after |sequence| is inserted in |shared_priority_queue_|. If
- // we did wake up another thread, we would waste resources by having more
- // threads trying to get a Sequence from |shared_priority_queue_| than the
- // number of Sequences in it.
- if (tls_current_thread_pool.Get().Get() != this)
- WakeUpOneThread();
-}
-
-void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- while (idle_worker_threads_stack_.Size() < worker_threads_.size())
- idle_worker_threads_stack_cv_for_testing_->Wait();
-}
-
-void SchedulerThreadPool::JoinForTesting() {
- for (const auto& worker_thread : worker_threads_)
- worker_thread->JoinForTesting();
-
- DCHECK(!join_for_testing_returned_.IsSignaled());
- join_for_testing_returned_.Signal();
-}
-
-void SchedulerThreadPool::PostTaskWithSequence(
- std::unique_ptr<Task> task,
- scoped_refptr<Sequence> sequence) {
- DCHECK(task);
- DCHECK(sequence);
-
- const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
- std::move(task), std::move(sequence), &shared_priority_queue_);
-
- // No thread has already been woken up to run Tasks from |sequence| if it was
- // empty before |task| was inserted into it.
- if (sequence_was_empty)
- WakeUpOneThread();
-}
-
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
- SchedulerWorkerThreadDelegateImpl(
- SchedulerThreadPool* outer,
- const EnqueueSequenceCallback& enqueue_sequence_callback)
- : outer_(outer), enqueue_sequence_callback_(enqueue_sequence_callback) {}
-
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
- ~SchedulerWorkerThreadDelegateImpl() = default;
-
-void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
- DCHECK(!tls_current_thread_pool.Get().Get());
- tls_current_thread_pool.Get().Set(outer_);
-}
-
-scoped_refptr<Sequence>
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
- SchedulerWorkerThread* worker_thread) {
- std::unique_ptr<PriorityQueue::Transaction> transaction(
- outer_->shared_priority_queue_.BeginTransaction());
- const auto& sequence_and_sort_key = transaction->Peek();
-
- if (sequence_and_sort_key.is_null()) {
- // |transaction| is kept alive while |worker_thread| is added to
- // |idle_worker_threads_stack_| to avoid this race:
- // 1. This thread creates a Transaction, finds |shared_priority_queue_|
- // empty and ends the Transaction.
- // 2. Other thread creates a Transaction, inserts a Sequence into
- // |shared_priority_queue_| and ends the Transaction. This can't happen
- // if the Transaction of step 1 is still active because because there can
- // only be one active Transaction per PriorityQueue at a time.
- // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
- // |idle_worker_threads_stack_| is empty.
- // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
- // sleep. No thread runs the Sequence inserted in step 2.
- outer_->AddToIdleWorkerThreadsStack(worker_thread);
- return nullptr;
- }
-
- scoped_refptr<Sequence> sequence = sequence_and_sort_key.sequence;
- transaction->Pop();
- return sequence;
-}
-
-void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
- scoped_refptr<Sequence> sequence) {
- enqueue_sequence_callback_.Run(std::move(sequence));
-}
-
-SchedulerThreadPool::SchedulerThreadPool(
- const EnqueueSequenceCallback& enqueue_sequence_callback,
- TaskTracker* task_tracker,
- DelayedTaskManager* delayed_task_manager)
- : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
- idle_worker_threads_stack_cv_for_testing_(
- idle_worker_threads_stack_lock_.CreateConditionVariable()),
- join_for_testing_returned_(true, false),
- worker_thread_delegate_(
- new SchedulerWorkerThreadDelegateImpl(this,
- enqueue_sequence_callback)),
- task_tracker_(task_tracker),
- delayed_task_manager_(delayed_task_manager) {
- DCHECK(task_tracker_);
- DCHECK(delayed_task_manager_);
-}
-
-bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
- size_t max_threads) {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
-
- DCHECK(worker_threads_.empty());
-
- for (size_t i = 0; i < max_threads; ++i) {
- std::unique_ptr<SchedulerWorkerThread> worker_thread =
- SchedulerWorkerThread::CreateSchedulerWorkerThread(
- thread_priority, worker_thread_delegate_.get(), task_tracker_);
- if (!worker_thread)
- break;
- idle_worker_threads_stack_.Push(worker_thread.get());
- worker_threads_.push_back(std::move(worker_thread));
- }
-
- return !worker_threads_.empty();
-}
-
-void SchedulerThreadPool::WakeUpOneThread() {
- SchedulerWorkerThread* worker_thread;
- {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- worker_thread = idle_worker_threads_stack_.Pop();
- }
- if (worker_thread)
- worker_thread->WakeUp();
-}
-
-void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
- SchedulerWorkerThread* worker_thread) {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- idle_worker_threads_stack_.Push(worker_thread);
- DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size());
-
- if (idle_worker_threads_stack_.Size() == worker_threads_.size())
- idle_worker_threads_stack_cv_for_testing_->Broadcast();
-}
-
-} // namespace internal
-} // namespace base

Powered by Google App Engine
This is Rietveld 408576698