Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4825)

Unified Diff: base/task_scheduler/scheduler_thread_pool_impl.cc

Issue 1906083002: TaskScheduler: Remove base/task_scheduler/utils.h/.cc (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@sched_2_stack
Patch Set: typos Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_thread_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool_impl.cc
similarity index 64%
rename from base/task_scheduler/scheduler_thread_pool.cc
rename to base/task_scheduler/scheduler_thread_pool_impl.cc
index 6e14638da37077aa04df244f182fba3e917a6875..e72ea6e12ceb326ab408a68a9aaf91d26efae9ea 100644
--- a/base/task_scheduler/scheduler_thread_pool.cc
+++ b/base/task_scheduler/scheduler_thread_pool_impl.cc
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "base/task_scheduler/scheduler_thread_pool.h"
+#include "base/task_scheduler/scheduler_thread_pool_impl.h"
#include <utility>
@@ -12,7 +12,8 @@
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/sequenced_task_runner.h"
-#include "base/task_scheduler/utils.h"
+#include "base/task_scheduler/delayed_task_manager.h"
+#include "base/task_scheduler/task_tracker.h"
#include "base/threading/thread_local.h"
namespace base {
@@ -21,48 +22,40 @@ namespace internal {
namespace {
// SchedulerThreadPool that owns the current thread, if any.
-LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
+LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
// A task runner that runs tasks with the PARALLEL ExecutionMode.
class SchedulerParallelTaskRunner : public TaskRunner {
public:
// Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
- // long as |executor| is alive.
- // TODO(robliao): Find a concrete way to manage |executor|'s memory.
+ // long as |thread_pool| is alive.
+ // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits& traits,
- SchedulerTaskExecutor* executor,
- TaskTracker* task_tracker,
- DelayedTaskManager* delayed_task_manager)
- : traits_(traits),
- executor_(executor),
- task_tracker_(task_tracker),
- delayed_task_manager_(delayed_task_manager) {}
+ SchedulerThreadPool* thread_pool)
+ : traits_(traits), thread_pool_(thread_pool) {}
// TaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
const Closure& closure,
TimeDelta delay) override {
// Post the task as part of a one-off single-task Sequence.
- return PostTaskToExecutor(
+ return thread_pool_->PostTaskWithSequence(
WrapUnique(
new Task(from_here, closure, traits_,
delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
- make_scoped_refptr(new Sequence), executor_, task_tracker_,
- delayed_task_manager_);
+ make_scoped_refptr(new Sequence));
}
bool RunsTasksOnCurrentThread() const override {
- return tls_current_thread_pool.Get().Get() == executor_;
+ return tls_current_thread_pool.Get().Get() == thread_pool_;
}
private:
~SchedulerParallelTaskRunner() override = default;
const TaskTraits traits_;
- SchedulerTaskExecutor* const executor_;
- TaskTracker* const task_tracker_;
- DelayedTaskManager* const delayed_task_manager_;
+ SchedulerThreadPool* const thread_pool_;
DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
};
@@ -70,28 +63,23 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// A task runner that runs tasks with the SEQUENCED ExecutionMode.
class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
public:
- // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
- // long as |executor| is alive.
- // TODO(robliao): Find a concrete way to manage |executor|'s memory.
+ // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
+ // so long as |thread_pool| is alive.
+ // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits& traits,
- SchedulerTaskExecutor* executor,
- TaskTracker* task_tracker,
- DelayedTaskManager* delayed_task_manager)
- : traits_(traits),
- executor_(executor),
- task_tracker_(task_tracker),
- delayed_task_manager_(delayed_task_manager) {}
+ SchedulerThreadPool* thread_pool)
+ : traits_(traits), thread_pool_(thread_pool) {}
// SequencedTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
const Closure& closure,
TimeDelta delay) override {
// Post the task as part of |sequence|.
- return PostTaskToExecutor(
+ return thread_pool_->PostTaskWithSequence(
WrapUnique(
new Task(from_here, closure, traits_,
delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
- sequence_, executor_, task_tracker_, delayed_task_manager_);
+ sequence_);
}
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
@@ -102,7 +90,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
}
bool RunsTasksOnCurrentThread() const override {
- return tls_current_thread_pool.Get().Get() == executor_;
+ return tls_current_thread_pool.Get().Get() == thread_pool_;
}
private:
@@ -112,66 +100,78 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
const scoped_refptr<Sequence> sequence_ = new Sequence;
const TaskTraits traits_;
- SchedulerTaskExecutor* const executor_;
- TaskTracker* const task_tracker_;
- DelayedTaskManager* const delayed_task_manager_;
+ SchedulerThreadPool* const thread_pool_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
} // namespace
-class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
+class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
: public SchedulerWorkerThread::Delegate {
public:
SchedulerWorkerThreadDelegateImpl(
- SchedulerThreadPool* outer,
- const EnqueueSequenceCallback& enqueue_sequence_callback);
+ SchedulerThreadPoolImpl* outer,
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback);
~SchedulerWorkerThreadDelegateImpl() override;
// SchedulerWorkerThread::Delegate:
void OnMainEntry() override;
scoped_refptr<Sequence> GetWork(
SchedulerWorkerThread* worker_thread) override;
- void EnqueueSequence(scoped_refptr<Sequence> sequence) override;
+ void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
private:
- SchedulerThreadPool* outer_;
- const EnqueueSequenceCallback enqueue_sequence_callback_;
+ SchedulerThreadPoolImpl* outer_;
+ const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
};
-SchedulerThreadPool::~SchedulerThreadPool() {
+SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() {
// SchedulerThreadPool should never be deleted in production unless its
// initialization failed.
DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
}
-std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
+std::unique_ptr<SchedulerThreadPoolImpl>
+SchedulerThreadPoolImpl::CreateThreadPool(
ThreadPriority thread_priority,
size_t max_threads,
- const EnqueueSequenceCallback& enqueue_sequence_callback,
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager) {
- std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool(
- enqueue_sequence_callback, task_tracker, delayed_task_manager));
+ std::unique_ptr<SchedulerThreadPoolImpl> thread_pool(
+ new SchedulerThreadPoolImpl(re_enqueue_sequence_callback, task_tracker,
+ delayed_task_manager));
if (thread_pool->Initialize(thread_priority, max_threads))
return thread_pool;
return nullptr;
}
-scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
+void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() {
+ AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
+ while (idle_worker_threads_stack_.Size() < worker_threads_.size())
+ idle_worker_threads_stack_cv_for_testing_->Wait();
+}
+
+void SchedulerThreadPoolImpl::JoinForTesting() {
+ for (const auto& worker_thread : worker_threads_)
+ worker_thread->JoinForTesting();
+
+ DCHECK(!join_for_testing_returned_.IsSignaled());
+ join_for_testing_returned_.Signal();
+}
+
+scoped_refptr<TaskRunner> SchedulerThreadPoolImpl::CreateTaskRunnerWithTraits(
const TaskTraits& traits,
ExecutionMode execution_mode) {
switch (execution_mode) {
case ExecutionMode::PARALLEL:
- return make_scoped_refptr(new SchedulerParallelTaskRunner(
- traits, this, task_tracker_, delayed_task_manager_));
+ return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
case ExecutionMode::SEQUENCED:
- return make_scoped_refptr(new SchedulerSequencedTaskRunner(
- traits, this, task_tracker_, delayed_task_manager_));
+ return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
case ExecutionMode::SINGLE_THREADED:
// TODO(fdoray): Support SINGLE_THREADED TaskRunners.
@@ -183,7 +183,7 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
return nullptr;
}
-void SchedulerThreadPool::EnqueueSequence(
+void SchedulerThreadPoolImpl::ReEnqueueSequence(
scoped_refptr<Sequence> sequence,
const SequenceSortKey& sequence_sort_key) {
shared_priority_queue_.BeginTransaction()->Push(
@@ -202,51 +202,70 @@ void SchedulerThreadPool::EnqueueSequence(
WakeUpOneThread();
}
-void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- while (idle_worker_threads_stack_.Size() < worker_threads_.size())
- idle_worker_threads_stack_cv_for_testing_->Wait();
-}
+bool SchedulerThreadPoolImpl::PostTaskWithSequence(
+ std::unique_ptr<Task> task,
+ scoped_refptr<Sequence> sequence) {
+ DCHECK(task);
+ DCHECK(sequence);
-void SchedulerThreadPool::JoinForTesting() {
- for (const auto& worker_thread : worker_threads_)
- worker_thread->JoinForTesting();
+ if (!task_tracker_->WillPostTask(task.get()))
+ return false;
- DCHECK(!join_for_testing_returned_.IsSignaled());
- join_for_testing_returned_.Signal();
+ if (task->delayed_run_time.is_null()) {
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence));
+ } else {
+ delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence),
+ this);
+ }
+
+ return true;
}
-void SchedulerThreadPool::PostTaskWithSequence(
+void SchedulerThreadPoolImpl::PostTaskWithSequenceNow(
std::unique_ptr<Task> task,
scoped_refptr<Sequence> sequence) {
DCHECK(task);
DCHECK(sequence);
- const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
- std::move(task), std::move(sequence), &shared_priority_queue_);
-
- // No thread has already been woken up to run Tasks from |sequence| if it was
- // empty before |task| was inserted into it.
- if (sequence_was_empty)
+ // Confirm that |task| is ready to run (its delayed run time is either null or
+ // in the past).
+ DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now());
+
+ const bool sequence_was_empty = sequence->PushTask(std::move(task));
+ if (sequence_was_empty) {
+ // Insert |sequence| in |shared_priority_queue_| if it was empty before
+ // |task| was inserted into it. Otherwise, one of these must be true:
+ // - |sequence| is already in a PriorityQueue (not necessarily
+ // |shared_priority_queue_|), or,
+ // - A worker thread is running a Task from |sequence|. It will insert
+ // |sequence| in a PriorityQueue once it's done running the Task.
+ const auto sequence_sort_key = sequence->GetSortKey();
+ shared_priority_queue_.BeginTransaction()->Push(
+ WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
+ sequence_sort_key)));
+
+ // Wake up a worker thread to process |sequence|.
WakeUpOneThread();
+ }
}
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
+SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
SchedulerWorkerThreadDelegateImpl(
- SchedulerThreadPool* outer,
- const EnqueueSequenceCallback& enqueue_sequence_callback)
- : outer_(outer), enqueue_sequence_callback_(enqueue_sequence_callback) {}
+ SchedulerThreadPoolImpl* outer,
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback)
+ : outer_(outer),
+ re_enqueue_sequence_callback_(re_enqueue_sequence_callback) {}
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::
+SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
~SchedulerWorkerThreadDelegateImpl() = default;
-void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
+void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
DCHECK(!tls_current_thread_pool.Get().Get());
tls_current_thread_pool.Get().Set(outer_);
}
scoped_refptr<Sequence>
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
+SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
SchedulerWorkerThread* worker_thread) {
std::unique_ptr<PriorityQueue::Transaction> transaction(
outer_->shared_priority_queue_.BeginTransaction());
@@ -274,13 +293,13 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
return sequence;
}
-void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
- scoped_refptr<Sequence> sequence) {
- enqueue_sequence_callback_.Run(std::move(sequence));
+void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
+ ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
+ re_enqueue_sequence_callback_.Run(std::move(sequence));
}
-SchedulerThreadPool::SchedulerThreadPool(
- const EnqueueSequenceCallback& enqueue_sequence_callback,
+SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager)
: idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
@@ -289,15 +308,15 @@ SchedulerThreadPool::SchedulerThreadPool(
join_for_testing_returned_(true, false),
worker_thread_delegate_(
new SchedulerWorkerThreadDelegateImpl(this,
- enqueue_sequence_callback)),
+ re_enqueue_sequence_callback)),
task_tracker_(task_tracker),
delayed_task_manager_(delayed_task_manager) {
DCHECK(task_tracker_);
DCHECK(delayed_task_manager_);
}
-bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
- size_t max_threads) {
+bool SchedulerThreadPoolImpl::Initialize(ThreadPriority thread_priority,
+ size_t max_threads) {
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
DCHECK(worker_threads_.empty());
@@ -315,7 +334,7 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
return !worker_threads_.empty();
}
-void SchedulerThreadPool::WakeUpOneThread() {
+void SchedulerThreadPoolImpl::WakeUpOneThread() {
SchedulerWorkerThread* worker_thread;
{
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
@@ -325,7 +344,7 @@ void SchedulerThreadPool::WakeUpOneThread() {
worker_thread->WakeUp();
}
-void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
+void SchedulerThreadPoolImpl::AddToIdleWorkerThreadsStack(
SchedulerWorkerThread* worker_thread) {
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
idle_worker_threads_stack_.Push(worker_thread);

Powered by Google App Engine
This is Rietveld 408576698