Index: base/task_scheduler/scheduler_thread_pool_impl.cc |
diff --git a/base/task_scheduler/scheduler_thread_pool_impl.cc b/base/task_scheduler/scheduler_thread_pool_impl.cc |
index 599c46be89f514d8e3b4be9a64c36846bf0ce3f0..b64405dd471d50d5e09c8a42ec5d641d0eeac6e6 100644 |
--- a/base/task_scheduler/scheduler_thread_pool_impl.cc |
+++ b/base/task_scheduler/scheduler_thread_pool_impl.cc |
@@ -4,14 +4,15 @@ |
#include "base/task_scheduler/scheduler_thread_pool_impl.h" |
+#include <algorithm> |
#include <utility> |
#include "base/bind.h" |
#include "base/bind_helpers.h" |
#include "base/lazy_instance.h" |
-#include "base/logging.h" |
#include "base/memory/ptr_util.h" |
#include "base/sequenced_task_runner.h" |
+#include "base/single_thread_task_runner.h" |
#include "base/task_scheduler/delayed_task_manager.h" |
#include "base/task_scheduler/task_tracker.h" |
#include "base/threading/thread_local.h" |
@@ -25,6 +26,10 @@ namespace { |
LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky |
tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; |
+// SchedulerWorkerThread that owns the current thread, if any. |
+LazyInstance<ThreadLocalPointer<const SchedulerWorkerThread>>::Leaky |
+ tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; |
+ |
// A task runner that runs tasks with the PARALLEL ExecutionMode. |
class SchedulerParallelTaskRunner : public TaskRunner { |
public: |
@@ -42,7 +47,7 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
// Post the task as part of a one-off single-task Sequence. |
return thread_pool_->PostTaskWithSequence( |
WrapUnique(new Task(from_here, closure, traits_, delay)), |
- make_scoped_refptr(new Sequence)); |
+ make_scoped_refptr(new Sequence), nullptr); |
} |
bool RunsTasksOnCurrentThread() const override { |
@@ -72,9 +77,10 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
TimeDelta delay) override { |
- // Post the task as part of |sequence|. |
+ // Post the task as part of |sequence_|. |
return thread_pool_->PostTaskWithSequence( |
- WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_); |
+ WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_, |
+ nullptr); |
} |
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
@@ -100,18 +106,88 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
}; |
+// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
+class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
+ public: |
+ // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
+ // tasks so long as |thread_pool| and |worker_thread| are alive. |
+ // TODO(robliao): Find a concrete way to manage the memory of |thread_pool| |
+ // and |worker_thread|. |
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
+ SchedulerThreadPool* thread_pool, |
+ SchedulerWorkerThread* worker_thread) |
+ : traits_(traits), |
+ thread_pool_(thread_pool), |
+ worker_thread_(worker_thread) {} |
+ |
+ // SingleThreadTaskRunner: |
+ bool PostDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ TimeDelta delay) override { |
+ // Post the task to be executed by |worker_thread_| as part of |sequence_|. |
+ return thread_pool_->PostTaskWithSequence( |
+ WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_, |
+ worker_thread_); |
+ } |
+ |
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ base::TimeDelta delay) override { |
+ // Tasks are never nested within the task scheduler. |
+ return PostDelayedTask(from_here, closure, delay); |
+ } |
+ |
+ bool RunsTasksOnCurrentThread() const override { |
+ return tls_current_worker_thread.Get().Get() == worker_thread_; |
+ } |
+ |
+ private: |
+ ~SchedulerSingleThreadTaskRunner() override = default; |
+ |
+ // Sequence for all Tasks posted through this TaskRunner. |
+ const scoped_refptr<Sequence> sequence_ = new Sequence; |
+ |
+ const TaskTraits traits_; |
+ SchedulerThreadPool* const thread_pool_; |
+ SchedulerWorkerThread* const worker_thread_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
+}; |
+ |
+// Only used in DCHECKs. |
+bool ContainsWorkerThread( |
+ const std::vector<std::unique_ptr<SchedulerWorkerThread>>& worker_threads, |
+ const SchedulerWorkerThread* worker_thread) { |
+ auto it = std::find_if( |
+ worker_threads.begin(), worker_threads.end(), |
+ [worker_thread](const std::unique_ptr<SchedulerWorkerThread>& i) { |
+ return i.get() == worker_thread; |
+ }); |
+ return it != worker_threads.end(); |
+} |
+ |
} // namespace |
class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
: public SchedulerWorkerThread::Delegate { |
public: |
+ // |outer| owns the worker thread for which this delegate is constructed. |
+ // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
+ // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
+ // PriorityQueue whose transactions may overlap with the worker thread's |
+ // single-threaded PriorityQueue's transactions. |
SchedulerWorkerThreadDelegateImpl( |
SchedulerThreadPoolImpl* outer, |
- const ReEnqueueSequenceCallback& re_enqueue_sequence_callback); |
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
+ const PriorityQueue* shared_priority_queue); |
~SchedulerWorkerThreadDelegateImpl() override; |
+ PriorityQueue* single_threaded_priority_queue() { |
+ return &single_threaded_priority_queue_; |
+ } |
+ |
// SchedulerWorkerThread::Delegate: |
- void OnMainEntry() override; |
+ void OnMainEntry(SchedulerWorkerThread* worker_thread) override; |
scoped_refptr<Sequence> GetWork( |
SchedulerWorkerThread* worker_thread) override; |
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
@@ -120,6 +196,13 @@ class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
SchedulerThreadPoolImpl* outer_; |
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
+ // Single-threaded PriorityQueue for the worker thread. |
+ PriorityQueue single_threaded_priority_queue_; |
+ |
+ // True if the last Sequence returned by GetWork() was extracted from |
+ // |single_threaded_priority_queue_|. |
+ bool last_sequence_is_single_threaded_ = false; |
+ |
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
}; |
@@ -129,6 +212,7 @@ SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { |
DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
} |
+// static |
std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( |
ThreadPriority thread_priority, |
size_t max_threads, |
@@ -168,10 +252,21 @@ scoped_refptr<TaskRunner> SchedulerThreadPoolImpl::CreateTaskRunnerWithTraits( |
case ExecutionMode::SEQUENCED: |
return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
- case ExecutionMode::SINGLE_THREADED: |
- // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
- NOTREACHED(); |
- return nullptr; |
+ case ExecutionMode::SINGLE_THREADED: { |
+ // TODO(fdoray): Find a way to take load into account when assigning a |
+ // SchedulerWorkerThread to a SingleThreadTaskRunner. Also, this code |
+ // assumes that all SchedulerWorkerThreads are alive. Eventually, we might |
+ // decide to tear down threads that haven't run tasks for a long time. |
+ size_t worker_thread_index; |
+ { |
+ AutoSchedulerLock auto_lock(next_worker_thread_index_lock_); |
+ worker_thread_index = next_worker_thread_index_; |
+ next_worker_thread_index_ = |
+ (next_worker_thread_index_ + 1) % worker_threads_.size(); |
+ } |
+ return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
+ traits, this, worker_threads_[worker_thread_index].get())); |
+ } |
} |
NOTREACHED(); |
@@ -199,18 +294,22 @@ void SchedulerThreadPoolImpl::ReEnqueueSequence( |
bool SchedulerThreadPoolImpl::PostTaskWithSequence( |
std::unique_ptr<Task> task, |
- scoped_refptr<Sequence> sequence) { |
+ scoped_refptr<Sequence> sequence, |
+ SchedulerWorkerThread* worker_thread) { |
DCHECK(task); |
DCHECK(sequence); |
+ DCHECK(!worker_thread || |
+ ContainsWorkerThread(worker_threads_, worker_thread)); |
if (!task_tracker_->WillPostTask(task.get())) |
return false; |
if (task->delayed_run_time.is_null()) { |
- PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence), |
+ worker_thread); |
} else { |
delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), |
- this); |
+ worker_thread, this); |
} |
return true; |
@@ -218,79 +317,151 @@ bool SchedulerThreadPoolImpl::PostTaskWithSequence( |
void SchedulerThreadPoolImpl::PostTaskWithSequenceNow( |
std::unique_ptr<Task> task, |
- scoped_refptr<Sequence> sequence) { |
+ scoped_refptr<Sequence> sequence, |
+ SchedulerWorkerThread* worker_thread) { |
DCHECK(task); |
DCHECK(sequence); |
+ DCHECK(!worker_thread || |
+ ContainsWorkerThread(worker_threads_, worker_thread)); |
// Confirm that |task| is ready to run (its delayed run time is either null or |
// in the past). |
DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now()); |
+ // Because |worker_thread| belongs to this thread pool, we know that the type |
+ // of its delegate is SchedulerWorkerThreadDelegateImpl. |
+ PriorityQueue* const priority_queue = |
+ worker_thread |
+ ? static_cast<SchedulerWorkerThreadDelegateImpl*>( |
+ worker_thread->delegate()) |
+ ->single_threaded_priority_queue() |
+ : &shared_priority_queue_; |
+ DCHECK(priority_queue); |
+ |
const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
if (sequence_was_empty) { |
- // Insert |sequence| in |shared_priority_queue_| if it was empty before |
- // |task| was inserted into it. Otherwise, one of these must be true: |
+ // Insert |sequence| in |priority_queue| if it was empty before |task| was |
+ // inserted into it. Otherwise, one of these must be true: |
// - |sequence| is already in a PriorityQueue (not necessarily |
// |shared_priority_queue_|), or, |
// - A worker thread is running a Task from |sequence|. It will insert |
// |sequence| in a PriorityQueue once it's done running the Task. |
const auto sequence_sort_key = sequence->GetSortKey(); |
- shared_priority_queue_.BeginTransaction()->Push( |
+ priority_queue->BeginTransaction()->Push( |
WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), |
sequence_sort_key))); |
// Wake up a worker thread to process |sequence|. |
- WakeUpOneThread(); |
+ if (worker_thread) |
+ worker_thread->WakeUp(); |
+ else |
+ WakeUpOneThread(); |
} |
} |
SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
SchedulerWorkerThreadDelegateImpl( |
SchedulerThreadPoolImpl* outer, |
- const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) |
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
+ const PriorityQueue* shared_priority_queue) |
: outer_(outer), |
- re_enqueue_sequence_callback_(re_enqueue_sequence_callback) {} |
+ re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
+ single_threaded_priority_queue_(shared_priority_queue) {} |
SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
~SchedulerWorkerThreadDelegateImpl() = default; |
-void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry() { |
+void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( |
+ SchedulerWorkerThread* worker_thread) { |
+#if DCHECK_IS_ON() |
+ // Wait for |outer_->threads_created_| to avoid traversing |
+ // |outer_->worker_threads_| while it is being filled by Initialize(). |
+ outer_->threads_created_.Wait(); |
+ DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
+#endif |
+ |
+ DCHECK(!tls_current_worker_thread.Get().Get()); |
DCHECK(!tls_current_thread_pool.Get().Get()); |
+ tls_current_worker_thread.Get().Set(worker_thread); |
tls_current_thread_pool.Get().Set(outer_); |
} |
scoped_refptr<Sequence> |
SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
SchedulerWorkerThread* worker_thread) { |
- std::unique_ptr<PriorityQueue::Transaction> transaction( |
- outer_->shared_priority_queue_.BeginTransaction()); |
- const auto& sequence_and_sort_key = transaction->Peek(); |
- |
- if (sequence_and_sort_key.is_null()) { |
- // |transaction| is kept alive while |worker_thread| is added to |
- // |idle_worker_threads_stack_| to avoid this race: |
- // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
- // empty and ends the Transaction. |
- // 2. Other thread creates a Transaction, inserts a Sequence into |
- // |shared_priority_queue_| and ends the Transaction. This can't happen |
- // if the Transaction of step 1 is still active because because there can |
- // only be one active Transaction per PriorityQueue at a time. |
- // 3. Other thread calls WakeUpOneThread(). No thread is woken up because |
- // |idle_worker_threads_stack_| is empty. |
- // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to |
- // sleep. No thread runs the Sequence inserted in step 2. |
- outer_->AddToIdleWorkerThreadsStack(worker_thread); |
- return nullptr; |
+ DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
+ |
+ scoped_refptr<Sequence> sequence; |
+ { |
+ std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
+ outer_->shared_priority_queue_.BeginTransaction()); |
+ const auto& shared_sequence_and_sort_key = shared_transaction->Peek(); |
+ |
+ std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
+ single_threaded_priority_queue_.BeginTransaction()); |
+ const auto& single_threaded_sequence_and_sort_key = |
+ single_threaded_transaction->Peek(); |
+ |
+ if (shared_sequence_and_sort_key.is_null() && |
+ single_threaded_sequence_and_sort_key.is_null()) { |
+ single_threaded_transaction.reset(); |
+ |
+ // |shared_transaction| is kept alive while |worker_thread| is added to |
+ // |idle_worker_threads_stack_| to avoid this race: |
+ // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
+ // empty and ends the Transaction. |
+ // 2. Other thread creates a Transaction, inserts a Sequence into |
+ // |shared_priority_queue_| and ends the Transaction. This can't happen |
+ // if the Transaction of step 1 is still active because because there |
+ // can only be one active Transaction per PriorityQueue at a time. |
+ // 3. Other thread calls WakeUpOneThread(). No thread is woken up because |
+ // |idle_worker_threads_stack_| is empty. |
+ // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to |
+ // sleep. No thread runs the Sequence inserted in step 2. |
+ outer_->AddToIdleWorkerThreadsStack(worker_thread); |
+ return nullptr; |
+ } |
+ |
+ // True if both PriorityQueues have Sequences and the Sequence at the top of |
+ // the shared PriorityQueue is more important. |
+ const bool shared_sequence_is_more_important = |
+ !shared_sequence_and_sort_key.is_null() && |
+ !single_threaded_sequence_and_sort_key.is_null() && |
+ shared_sequence_and_sort_key.sort_key > |
+ single_threaded_sequence_and_sort_key.sort_key; |
+ |
+ if (single_threaded_sequence_and_sort_key.is_null() || |
+ shared_sequence_is_more_important) { |
+ sequence = shared_sequence_and_sort_key.sequence; |
+ shared_transaction->Pop(); |
+ last_sequence_is_single_threaded_ = false; |
+ } else { |
+ DCHECK(!single_threaded_sequence_and_sort_key.is_null()); |
+ sequence = single_threaded_sequence_and_sort_key.sequence; |
+ single_threaded_transaction->Pop(); |
+ last_sequence_is_single_threaded_ = true; |
+ } |
} |
+ DCHECK(sequence); |
- scoped_refptr<Sequence> sequence = sequence_and_sort_key.sequence; |
- transaction->Pop(); |
+ outer_->RemoveFromIdleWorkerThreadsStack(worker_thread); |
return sequence; |
} |
void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
- re_enqueue_sequence_callback_.Run(std::move(sequence)); |
+ if (last_sequence_is_single_threaded_) { |
+ // A single-threaded Sequence is always re-enqueued in the single-threaded |
+ // PriorityQueue from which it was extracted. |
+ const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
+ single_threaded_priority_queue_.BeginTransaction()->Push( |
+ WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), |
+ sequence_sort_key))); |
+ } else { |
+ // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
+ // |sequence| must be enqueued. |
+ re_enqueue_sequence_callback_.Run(std::move(sequence)); |
+ } |
} |
SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
@@ -300,6 +471,9 @@ SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
idle_worker_threads_stack_cv_for_testing_( |
idle_worker_threads_stack_lock_.CreateConditionVariable()), |
join_for_testing_returned_(true, false), |
+#if DCHECK_IS_ON() |
+ threads_created_(true, false), |
+#endif |
task_tracker_(task_tracker), |
delayed_task_manager_(delayed_task_manager) { |
DCHECK(task_tracker_); |
@@ -317,8 +491,9 @@ bool SchedulerThreadPoolImpl::Initialize( |
for (size_t i = 0; i < max_threads; ++i) { |
std::unique_ptr<SchedulerWorkerThread> worker_thread = |
SchedulerWorkerThread::Create( |
- thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
- this, re_enqueue_sequence_callback)), |
+ thread_priority, |
+ WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
+ this, re_enqueue_sequence_callback, &shared_priority_queue_)), |
task_tracker_); |
if (!worker_thread) |
break; |
@@ -326,6 +501,10 @@ bool SchedulerThreadPoolImpl::Initialize( |
worker_threads_.push_back(std::move(worker_thread)); |
} |
+#if DCHECK_IS_ON() |
+ threads_created_.Signal(); |
+#endif |
+ |
return !worker_threads_.empty(); |
} |
@@ -349,5 +528,11 @@ void SchedulerThreadPoolImpl::AddToIdleWorkerThreadsStack( |
idle_worker_threads_stack_cv_for_testing_->Broadcast(); |
} |
+void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( |
+ SchedulerWorkerThread* worker_thread) { |
+ AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
+ idle_worker_threads_stack_.Remove(worker_thread); |
+} |
+ |
} // namespace internal |
} // namespace base |