Index: base/task_scheduler/scheduler_thread_pool_impl.cc |
diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool_impl.cc |
similarity index 64% |
rename from base/task_scheduler/scheduler_thread_pool.cc |
rename to base/task_scheduler/scheduler_thread_pool_impl.cc |
index 6e14638da37077aa04df244f182fba3e917a6875..e72ea6e12ceb326ab408a68a9aaf91d26efae9ea 100644 |
--- a/base/task_scheduler/scheduler_thread_pool.cc |
+++ b/base/task_scheduler/scheduler_thread_pool_impl.cc |
@@ -2,7 +2,7 @@ |
// Use of this source code is governed by a BSD-style license that can be |
// found in the LICENSE file. |
-#include "base/task_scheduler/scheduler_thread_pool.h" |
+#include "base/task_scheduler/scheduler_thread_pool_impl.h" |
#include <utility> |
@@ -12,7 +12,8 @@ |
#include "base/logging.h" |
#include "base/memory/ptr_util.h" |
#include "base/sequenced_task_runner.h" |
-#include "base/task_scheduler/utils.h" |
+#include "base/task_scheduler/delayed_task_manager.h" |
+#include "base/task_scheduler/task_tracker.h" |
#include "base/threading/thread_local.h" |
namespace base { |
@@ -21,48 +22,40 @@ namespace internal { |
namespace { |
// SchedulerThreadPool that owns the current thread, if any. |
-LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky |
+LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky |
tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; |
// A task runner that runs tasks with the PARALLEL ExecutionMode. |
class SchedulerParallelTaskRunner : public TaskRunner { |
public: |
// Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
- // long as |executor| is alive. |
- // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
+ // long as |thread_pool| is alive. |
+ // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory. |
SchedulerParallelTaskRunner(const TaskTraits& traits, |
- SchedulerTaskExecutor* executor, |
- TaskTracker* task_tracker, |
- DelayedTaskManager* delayed_task_manager) |
- : traits_(traits), |
- executor_(executor), |
- task_tracker_(task_tracker), |
- delayed_task_manager_(delayed_task_manager) {} |
+ SchedulerThreadPool* thread_pool) |
+ : traits_(traits), thread_pool_(thread_pool) {} |
// TaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
TimeDelta delay) override { |
// Post the task as part of a one-off single-task Sequence. |
- return PostTaskToExecutor( |
+ return thread_pool_->PostTaskWithSequence( |
WrapUnique( |
new Task(from_here, closure, traits_, |
delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), |
- make_scoped_refptr(new Sequence), executor_, task_tracker_, |
- delayed_task_manager_); |
+ make_scoped_refptr(new Sequence)); |
} |
bool RunsTasksOnCurrentThread() const override { |
- return tls_current_thread_pool.Get().Get() == executor_; |
+ return tls_current_thread_pool.Get().Get() == thread_pool_; |
} |
private: |
~SchedulerParallelTaskRunner() override = default; |
const TaskTraits traits_; |
- SchedulerTaskExecutor* const executor_; |
- TaskTracker* const task_tracker_; |
- DelayedTaskManager* const delayed_task_manager_; |
+ SchedulerThreadPool* const thread_pool_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
}; |
@@ -70,28 +63,23 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
// A task runner that runs tasks with the SEQUENCED ExecutionMode. |
class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
public: |
- // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
- // long as |executor| is alive. |
- // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
+ // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks |
+ // so long as |thread_pool| is alive. |
+ // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory. |
SchedulerSequencedTaskRunner(const TaskTraits& traits, |
- SchedulerTaskExecutor* executor, |
- TaskTracker* task_tracker, |
- DelayedTaskManager* delayed_task_manager) |
- : traits_(traits), |
- executor_(executor), |
- task_tracker_(task_tracker), |
- delayed_task_manager_(delayed_task_manager) {} |
+ SchedulerThreadPool* thread_pool) |
+ : traits_(traits), thread_pool_(thread_pool) {} |
// SequencedTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
TimeDelta delay) override { |
// Post the task as part of |sequence|. |
- return PostTaskToExecutor( |
+ return thread_pool_->PostTaskWithSequence( |
WrapUnique( |
new Task(from_here, closure, traits_, |
delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), |
- sequence_, executor_, task_tracker_, delayed_task_manager_); |
+ sequence_); |
} |
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
@@ -102,7 +90,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
} |
bool RunsTasksOnCurrentThread() const override { |
- return tls_current_thread_pool.Get().Get() == executor_; |
+ return tls_current_thread_pool.Get().Get() == thread_pool_; |
} |
private: |
@@ -112,66 +100,78 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
const scoped_refptr<Sequence> sequence_ = new Sequence; |
const TaskTraits traits_; |
- SchedulerTaskExecutor* const executor_; |
- TaskTracker* const task_tracker_; |
- DelayedTaskManager* const delayed_task_manager_; |
+ SchedulerThreadPool* const thread_pool_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
}; |
} // namespace |
-class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl |
+class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
: public SchedulerWorkerThread::Delegate { |
public: |
SchedulerWorkerThreadDelegateImpl( |
- SchedulerThreadPool* outer, |
- const EnqueueSequenceCallback& enqueue_sequence_callback); |
+ SchedulerThreadPoolImpl* outer, |
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback); |
~SchedulerWorkerThreadDelegateImpl() override; |
// SchedulerWorkerThread::Delegate: |
void OnMainEntry() override; |
scoped_refptr<Sequence> GetWork( |
SchedulerWorkerThread* worker_thread) override; |
- void EnqueueSequence(scoped_refptr<Sequence> sequence) override; |
+ void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
private: |
- SchedulerThreadPool* outer_; |
- const EnqueueSequenceCallback enqueue_sequence_callback_; |
+ SchedulerThreadPoolImpl* outer_; |
+ const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
}; |
-SchedulerThreadPool::~SchedulerThreadPool() { |
+SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { |
// SchedulerThreadPool should never be deleted in production unless its |
// initialization failed. |
DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
} |
-std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( |
+std::unique_ptr<SchedulerThreadPoolImpl> |
+SchedulerThreadPoolImpl::CreateThreadPool( |
ThreadPriority thread_priority, |
size_t max_threads, |
- const EnqueueSequenceCallback& enqueue_sequence_callback, |
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
TaskTracker* task_tracker, |
DelayedTaskManager* delayed_task_manager) { |
- std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool( |
- enqueue_sequence_callback, task_tracker, delayed_task_manager)); |
+ std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( |
+ new SchedulerThreadPoolImpl(re_enqueue_sequence_callback, task_tracker, |
+ delayed_task_manager)); |
if (thread_pool->Initialize(thread_priority, max_threads)) |
return thread_pool; |
return nullptr; |
} |
-scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
+void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { |
+ AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
+ while (idle_worker_threads_stack_.Size() < worker_threads_.size()) |
+ idle_worker_threads_stack_cv_for_testing_->Wait(); |
+} |
+ |
+void SchedulerThreadPoolImpl::JoinForTesting() { |
+ for (const auto& worker_thread : worker_threads_) |
+ worker_thread->JoinForTesting(); |
+ |
+ DCHECK(!join_for_testing_returned_.IsSignaled()); |
+ join_for_testing_returned_.Signal(); |
+} |
+ |
+scoped_refptr<TaskRunner> SchedulerThreadPoolImpl::CreateTaskRunnerWithTraits( |
const TaskTraits& traits, |
ExecutionMode execution_mode) { |
switch (execution_mode) { |
case ExecutionMode::PARALLEL: |
- return make_scoped_refptr(new SchedulerParallelTaskRunner( |
- traits, this, task_tracker_, delayed_task_manager_)); |
+ return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
case ExecutionMode::SEQUENCED: |
- return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
- traits, this, task_tracker_, delayed_task_manager_)); |
+ return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
case ExecutionMode::SINGLE_THREADED: |
// TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
@@ -183,7 +183,7 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
return nullptr; |
} |
-void SchedulerThreadPool::EnqueueSequence( |
+void SchedulerThreadPoolImpl::ReEnqueueSequence( |
scoped_refptr<Sequence> sequence, |
const SequenceSortKey& sequence_sort_key) { |
shared_priority_queue_.BeginTransaction()->Push( |
@@ -202,51 +202,70 @@ void SchedulerThreadPool::EnqueueSequence( |
WakeUpOneThread(); |
} |
-void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() { |
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- while (idle_worker_threads_stack_.Size() < worker_threads_.size()) |
- idle_worker_threads_stack_cv_for_testing_->Wait(); |
-} |
+bool SchedulerThreadPoolImpl::PostTaskWithSequence( |
+ std::unique_ptr<Task> task, |
+ scoped_refptr<Sequence> sequence) { |
+ DCHECK(task); |
+ DCHECK(sequence); |
-void SchedulerThreadPool::JoinForTesting() { |
- for (const auto& worker_thread : worker_threads_) |
- worker_thread->JoinForTesting(); |
+ if (!task_tracker_->WillPostTask(task.get())) |
+ return false; |
- DCHECK(!join_for_testing_returned_.IsSignaled()); |
- join_for_testing_returned_.Signal(); |
+ if (task->delayed_run_time.is_null()) { |
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
+ } else { |
+ delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), |
+ this); |
+ } |
+ |
+ return true; |
} |
-void SchedulerThreadPool::PostTaskWithSequence( |
+void SchedulerThreadPoolImpl::PostTaskWithSequenceNow( |
std::unique_ptr<Task> task, |
scoped_refptr<Sequence> sequence) { |
DCHECK(task); |
DCHECK(sequence); |
- const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( |
- std::move(task), std::move(sequence), &shared_priority_queue_); |
- |
- // No thread has already been woken up to run Tasks from |sequence| if it was |
- // empty before |task| was inserted into it. |
- if (sequence_was_empty) |
+ // Confirm that |task| is ready to run (its delayed run time is either null or |
+ // in the past). |
+ DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now()); |
+ |
+ const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
+ if (sequence_was_empty) { |
+ // Insert |sequence| in |shared_priority_queue_| if it was empty before |
+ // |task| was inserted into it. Otherwise, one of these must be true: |
+ // - |sequence| is already in a PriorityQueue (not necessarily |
+ // |shared_priority_queue_|), or, |
+ // - A worker thread is running a Task from |sequence|. It will insert |
+ // |sequence| in a PriorityQueue once it's done running the Task. |
+ const auto sequence_sort_key = sequence->GetSortKey(); |
+ shared_priority_queue_.BeginTransaction()->Push( |
+ WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), |
+ sequence_sort_key))); |
+ |
+ // Wake up a worker thread to process |sequence|. |
WakeUpOneThread(); |
+ } |
} |
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl:: |
+SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
SchedulerWorkerThreadDelegateImpl( |
- SchedulerThreadPool* outer, |
- const EnqueueSequenceCallback& enqueue_sequence_callback) |
- : outer_(outer), enqueue_sequence_callback_(enqueue_sequence_callback) {} |
+ SchedulerThreadPoolImpl* outer, |
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) |
+ : outer_(outer), |
+ re_enqueue_sequence_callback_(re_enqueue_sequence_callback) {} |
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl:: |
+SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
~SchedulerWorkerThreadDelegateImpl() = default; |
-void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() { |
+void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry() { |
DCHECK(!tls_current_thread_pool.Get().Get()); |
tls_current_thread_pool.Get().Set(outer_); |
} |
scoped_refptr<Sequence> |
-SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( |
+SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
SchedulerWorkerThread* worker_thread) { |
std::unique_ptr<PriorityQueue::Transaction> transaction( |
outer_->shared_priority_queue_.BeginTransaction()); |
@@ -274,13 +293,13 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( |
return sequence; |
} |
-void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( |
- scoped_refptr<Sequence> sequence) { |
- enqueue_sequence_callback_.Run(std::move(sequence)); |
+void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
+ ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
+ re_enqueue_sequence_callback_.Run(std::move(sequence)); |
} |
-SchedulerThreadPool::SchedulerThreadPool( |
- const EnqueueSequenceCallback& enqueue_sequence_callback, |
+SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
TaskTracker* task_tracker, |
DelayedTaskManager* delayed_task_manager) |
: idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), |
@@ -289,15 +308,15 @@ SchedulerThreadPool::SchedulerThreadPool( |
join_for_testing_returned_(true, false), |
worker_thread_delegate_( |
new SchedulerWorkerThreadDelegateImpl(this, |
- enqueue_sequence_callback)), |
+ re_enqueue_sequence_callback)), |
task_tracker_(task_tracker), |
delayed_task_manager_(delayed_task_manager) { |
DCHECK(task_tracker_); |
DCHECK(delayed_task_manager_); |
} |
-bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
- size_t max_threads) { |
+bool SchedulerThreadPoolImpl::Initialize(ThreadPriority thread_priority, |
+ size_t max_threads) { |
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
DCHECK(worker_threads_.empty()); |
@@ -315,7 +334,7 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
return !worker_threads_.empty(); |
} |
-void SchedulerThreadPool::WakeUpOneThread() { |
+void SchedulerThreadPoolImpl::WakeUpOneThread() { |
SchedulerWorkerThread* worker_thread; |
{ |
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
@@ -325,7 +344,7 @@ void SchedulerThreadPool::WakeUpOneThread() { |
worker_thread->WakeUp(); |
} |
-void SchedulerThreadPool::AddToIdleWorkerThreadsStack( |
+void SchedulerThreadPoolImpl::AddToIdleWorkerThreadsStack( |
SchedulerWorkerThread* worker_thread) { |
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
idle_worker_threads_stack_.Push(worker_thread); |