Chromium Code Reviews| Index: base/task_scheduler/scheduler_thread_pool_impl.cc |
| diff --git a/base/task_scheduler/scheduler_thread_pool_impl.cc b/base/task_scheduler/scheduler_thread_pool_impl.cc |
| index 599c46be89f514d8e3b4be9a64c36846bf0ce3f0..c0cbefc5c749b4fb5fbaf34c97777f1545a967d7 100644 |
| --- a/base/task_scheduler/scheduler_thread_pool_impl.cc |
| +++ b/base/task_scheduler/scheduler_thread_pool_impl.cc |
| @@ -9,9 +9,9 @@ |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| -#include "base/logging.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/sequenced_task_runner.h" |
| +#include "base/single_thread_task_runner.h" |
| #include "base/task_scheduler/delayed_task_manager.h" |
| #include "base/task_scheduler/task_tracker.h" |
| #include "base/threading/thread_local.h" |
| @@ -25,6 +25,10 @@ namespace { |
| LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky |
| tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; |
| +// SchedulerWorkerThread that owns the current thread, if any. |
| +LazyInstance<ThreadLocalPointer<const SchedulerWorkerThread>>::Leaky |
| + tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; |
| + |
| // A task runner that runs tasks with the PARALLEL ExecutionMode. |
| class SchedulerParallelTaskRunner : public TaskRunner { |
| public: |
| @@ -42,7 +46,7 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
| // Post the task as part of a one-off single-task Sequence. |
| return thread_pool_->PostTaskWithSequence( |
| WrapUnique(new Task(from_here, closure, traits_, delay)), |
| - make_scoped_refptr(new Sequence)); |
| + make_scoped_refptr(new Sequence), nullptr); |
| } |
| bool RunsTasksOnCurrentThread() const override { |
| @@ -72,9 +76,10 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| TimeDelta delay) override { |
| - // Post the task as part of |sequence|. |
| + // Post the task as part of |sequence_|. |
| return thread_pool_->PostTaskWithSequence( |
| - WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_); |
| + WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_, |
| + nullptr); |
| } |
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| @@ -100,18 +105,87 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| }; |
| +// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| +class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| + public: |
| + // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| + // tasks so long as |thread_pool| and |worker_thread| are alive. |
| + // TODO(robliao): Find a concrete way to manage the memory of |thread_pool| |
| + // and |worker_thread|. |
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| + SchedulerThreadPool* thread_pool, |
| + SchedulerWorkerThread* worker_thread) |
| + : traits_(traits), |
| + thread_pool_(thread_pool), |
| + worker_thread_(worker_thread) {} |
| + |
| + // SingleThreadTaskRunner: |
| + bool PostDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + TimeDelta delay) override { |
| + // Post the task to be executed by |worker_thread_| as part of |sequence_|. |
| + return thread_pool_->PostTaskWithSequence( |
| + WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_, |
| + worker_thread_); |
| + } |
| + |
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + base::TimeDelta delay) override { |
| + // Tasks are never nested within the task scheduler. |
| + return PostDelayedTask(from_here, closure, delay); |
| + } |
| + |
| + bool RunsTasksOnCurrentThread() const override { |
| + return tls_current_worker_thread.Get().Get() == worker_thread_; |
| + } |
| + |
| + private: |
| + ~SchedulerSingleThreadTaskRunner() override = default; |
| + |
| + // Sequence for all Tasks posted through this TaskRunner. |
| + const scoped_refptr<Sequence> sequence_ = new Sequence; |
| + |
| + const TaskTraits traits_; |
| + SchedulerThreadPool* const thread_pool_; |
| + SchedulerWorkerThread* const worker_thread_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| +}; |
| + |
| +// Only used in DCHECKs. |
| +bool ContainsWorkerThread( |
| + const std::vector<std::unique_ptr<SchedulerWorkerThread>>& worker_threads, |
| + const SchedulerWorkerThread* worker_thread) { |
| + for (const auto& current_worker_thread : worker_threads) { |
| + if (current_worker_thread.get() == worker_thread) |
| + return true; |
| + } |
| + return false; |
| +} |
| + |
| } // namespace |
| class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
| : public SchedulerWorkerThread::Delegate { |
| public: |
| + // |outer| owns the worker thread for which this delegate is constructed. |
| + // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
| + // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
| + // PriorityQueue whose transactions may overlap with the worker thread's |
| + // single-threaded PriorityQueue's transactions. |
| SchedulerWorkerThreadDelegateImpl( |
| SchedulerThreadPoolImpl* outer, |
| - const ReEnqueueSequenceCallback& re_enqueue_sequence_callback); |
| + const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| + const PriorityQueue* shared_priority_queue); |
| ~SchedulerWorkerThreadDelegateImpl() override; |
| + PriorityQueue* single_threaded_priority_queue() { |
| + return &single_threaded_priority_queue_; |
| + } |
| + |
| // SchedulerWorkerThread::Delegate: |
| - void OnMainEntry() override; |
| + void OnMainEntry(SchedulerWorkerThread* worker_thread) override; |
| scoped_refptr<Sequence> GetWork( |
| SchedulerWorkerThread* worker_thread) override; |
| void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
| @@ -120,6 +194,13 @@ class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
| SchedulerThreadPoolImpl* outer_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| + // Single-threaded PriorityQueue for the worker thread. |
| + PriorityQueue single_threaded_priority_queue_; |
| + |
| + // True if the last Sequence returned by GetWork() was extracted from |
| + // |single_threaded_priority_queue_|. |
| + bool last_sequence_is_single_threaded_ = false; |
| + |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
| }; |
| @@ -129,6 +210,7 @@ SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { |
| DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
| } |
| +// static |
| std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( |
| ThreadPriority thread_priority, |
| size_t max_threads, |
| @@ -168,10 +250,21 @@ scoped_refptr<TaskRunner> SchedulerThreadPoolImpl::CreateTaskRunnerWithTraits( |
| case ExecutionMode::SEQUENCED: |
| return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| - case ExecutionMode::SINGLE_THREADED: |
| - // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
| - NOTREACHED(); |
| - return nullptr; |
| + case ExecutionMode::SINGLE_THREADED: { |
| + // TODO(fdoray): Find a way to take load into account when assigning a |
| + // SchedulerWorkerThread to a SingleThreadTaskRunner. Also, this code |
| + // assumes that all SchedulerWorkerThreads are alive. Eventually, we might |
| + // decide to tear down threads that haven't run tasks for a long time. |
| + size_t worker_thread_index; |
| + { |
| + AutoSchedulerLock auto_lock(next_worker_thread_index_lock_); |
| + worker_thread_index = next_worker_thread_index_; |
| + next_worker_thread_index_ = |
| + (next_worker_thread_index_ + 1) % worker_threads_.size(); |
| + } |
| + return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
| + traits, this, worker_threads_[worker_thread_index].get())); |
| + } |
| } |
| NOTREACHED(); |
| @@ -199,18 +292,22 @@ void SchedulerThreadPoolImpl::ReEnqueueSequence( |
| bool SchedulerThreadPoolImpl::PostTaskWithSequence( |
| std::unique_ptr<Task> task, |
| - scoped_refptr<Sequence> sequence) { |
| + scoped_refptr<Sequence> sequence, |
| + SchedulerWorkerThread* worker_thread) { |
| DCHECK(task); |
| DCHECK(sequence); |
| + DCHECK(!worker_thread || |
| + ContainsWorkerThread(worker_threads_, worker_thread)); |
| if (!task_tracker_->WillPostTask(task.get())) |
| return false; |
| if (task->delayed_run_time.is_null()) { |
| - PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
| + PostTaskWithSequenceNow(std::move(task), std::move(sequence), |
| + worker_thread); |
| } else { |
| delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), |
| - this); |
| + worker_thread, this); |
| } |
| return true; |
| @@ -218,79 +315,143 @@ bool SchedulerThreadPoolImpl::PostTaskWithSequence( |
| void SchedulerThreadPoolImpl::PostTaskWithSequenceNow( |
| std::unique_ptr<Task> task, |
| - scoped_refptr<Sequence> sequence) { |
| + scoped_refptr<Sequence> sequence, |
| + SchedulerWorkerThread* worker_thread) { |
| DCHECK(task); |
| DCHECK(sequence); |
| + DCHECK(!worker_thread || |
| + ContainsWorkerThread(worker_threads_, worker_thread)); |
| // Confirm that |task| is ready to run (its delayed run time is either null or |
| // in the past). |
| DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now()); |
| + PriorityQueue* const priority_queue = |
| + worker_thread |
| + ? static_cast<SchedulerWorkerThreadDelegateImpl*>( |
| + worker_thread->delegate()) |
| + ->single_threaded_priority_queue() |
| + : &shared_priority_queue_; |
| + DCHECK(priority_queue); |
| + |
| const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
| if (sequence_was_empty) { |
| - // Insert |sequence| in |shared_priority_queue_| if it was empty before |
| - // |task| was inserted into it. Otherwise, one of these must be true: |
| + // Insert |sequence| in |priority_queue| if it was empty before |task| was |
| + // inserted into it. Otherwise, one of these must be true: |
| // - |sequence| is already in a PriorityQueue (not necessarily |
| // |shared_priority_queue_|), or, |
| // - A worker thread is running a Task from |sequence|. It will insert |
| // |sequence| in a PriorityQueue once it's done running the Task. |
| const auto sequence_sort_key = sequence->GetSortKey(); |
| - shared_priority_queue_.BeginTransaction()->Push( |
| + priority_queue->BeginTransaction()->Push( |
| WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), |
| sequence_sort_key))); |
| // Wake up a worker thread to process |sequence|. |
| - WakeUpOneThread(); |
| + if (worker_thread) |
| + worker_thread->WakeUp(); |
| + else |
| + WakeUpOneThread(); |
| } |
| } |
| SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
| SchedulerWorkerThreadDelegateImpl( |
| SchedulerThreadPoolImpl* outer, |
| - const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) |
| + const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| + const PriorityQueue* shared_priority_queue) |
| : outer_(outer), |
| - re_enqueue_sequence_callback_(re_enqueue_sequence_callback) {} |
| + re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| + single_threaded_priority_queue_(shared_priority_queue) {} |
| SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
| ~SchedulerWorkerThreadDelegateImpl() = default; |
| -void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry() { |
| +void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( |
| + SchedulerWorkerThread* worker_thread) { |
| +#if DCHECK_IS_ON() |
| + // Wait for |outer_->threads_created_| to avoid traversing |
| + // |outer_->worker_threads_| while it is being filled by Initialize(). |
| + outer_->threads_created_.Wait(); |
| + DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
| +#endif // DCHECK_IS_ON() |
| + |
| + DCHECK(!tls_current_worker_thread.Get().Get()); |
| DCHECK(!tls_current_thread_pool.Get().Get()); |
| + tls_current_worker_thread.Get().Set(worker_thread); |
| tls_current_thread_pool.Get().Set(outer_); |
| } |
| scoped_refptr<Sequence> |
| SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
| SchedulerWorkerThread* worker_thread) { |
| - std::unique_ptr<PriorityQueue::Transaction> transaction( |
| - outer_->shared_priority_queue_.BeginTransaction()); |
| - const auto& sequence_and_sort_key = transaction->Peek(); |
| - |
| - if (sequence_and_sort_key.is_null()) { |
| - // |transaction| is kept alive while |worker_thread| is added to |
| - // |idle_worker_threads_stack_| to avoid this race: |
| - // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
| - // empty and ends the Transaction. |
| - // 2. Other thread creates a Transaction, inserts a Sequence into |
| - // |shared_priority_queue_| and ends the Transaction. This can't happen |
| - // if the Transaction of step 1 is still active because because there can |
| - // only be one active Transaction per PriorityQueue at a time. |
| - // 3. Other thread calls WakeUpOneThread(). No thread is woken up because |
| - // |idle_worker_threads_stack_| is empty. |
| - // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to |
| - // sleep. No thread runs the Sequence inserted in step 2. |
| - outer_->AddToIdleWorkerThreadsStack(worker_thread); |
| - return nullptr; |
| + DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
| + |
| + scoped_refptr<Sequence> sequence; |
| + { |
| + std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
| + outer_->shared_priority_queue_.BeginTransaction()); |
| + const auto& shared_sequence_and_sort_key = shared_transaction->Peek(); |
| + |
| + std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
| + single_threaded_priority_queue_.BeginTransaction()); |
| + const auto& single_threaded_sequence_and_sort_key = |
| + single_threaded_transaction->Peek(); |
| + |
| + if (shared_sequence_and_sort_key.is_null() && |
| + single_threaded_sequence_and_sort_key.is_null()) { |
| + single_threaded_transaction.reset(); |
| + |
| + // |shared_transaction| is kept alive while |worker_thread| is added to |
| + // |idle_worker_threads_stack_| to avoid this race: |
| + // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
| + // empty and ends the Transaction. |
| + // 2. Other thread creates a Transaction, inserts a Sequence into |
| + // |shared_priority_queue_| and ends the Transaction. This can't happen |
| + // if the Transaction of step 1 is still active because because there |
| + // can only be one active Transaction per PriorityQueue at a time. |
| + // 3. Other thread calls WakeUpOneThread(). No thread is woken up because |
| + // |idle_worker_threads_stack_| is empty. |
| + // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to |
| + // sleep. No thread runs the Sequence inserted in step 2. |
| + outer_->AddToIdleWorkerThreadsStack(worker_thread); |
| + return nullptr; |
| + } |
| + |
| + if (single_threaded_sequence_and_sort_key.is_null() || |
| + (!shared_sequence_and_sort_key.is_null() && |
|
danakj
2016/04/27 20:38:13
nit: I don't like combining && and || in one if st
fdoray
2016/04/28 12:33:20
Done.
|
| + single_threaded_sequence_and_sort_key.sort_key < |
| + shared_sequence_and_sort_key.sort_key)) { |
| + sequence = shared_sequence_and_sort_key.sequence; |
| + shared_transaction->Pop(); |
| + last_sequence_is_single_threaded_ = false; |
| + } else { |
| + DCHECK(!single_threaded_sequence_and_sort_key.is_null()); |
| + sequence = single_threaded_sequence_and_sort_key.sequence; |
| + single_threaded_transaction->Pop(); |
| + last_sequence_is_single_threaded_ = true; |
| + } |
| } |
| + DCHECK(sequence); |
| - scoped_refptr<Sequence> sequence = sequence_and_sort_key.sequence; |
| - transaction->Pop(); |
| + outer_->RemoveFromIdleWorkerThreadsStack(worker_thread); |
| return sequence; |
| } |
| void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
| ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
| - re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| + if (last_sequence_is_single_threaded_) { |
| + // A single-threaded Sequence is always re-enqueued in the single-threaded |
| + // PriorityQueue from which it was extracted. |
| + const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
| + single_threaded_priority_queue_.BeginTransaction()->Push( |
| + WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), |
| + sequence_sort_key))); |
| + } else { |
| + // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
| + // |sequence| must be enqueued. |
| + re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| + } |
| } |
| SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
| @@ -300,6 +461,9 @@ SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
| idle_worker_threads_stack_cv_for_testing_( |
| idle_worker_threads_stack_lock_.CreateConditionVariable()), |
| join_for_testing_returned_(true, false), |
| +#if DCHECK_IS_ON() |
| + threads_created_(true, false), |
| +#endif // DCHECK_IS_ON() |
| task_tracker_(task_tracker), |
| delayed_task_manager_(delayed_task_manager) { |
| DCHECK(task_tracker_); |
| @@ -317,8 +481,9 @@ bool SchedulerThreadPoolImpl::Initialize( |
| for (size_t i = 0; i < max_threads; ++i) { |
| std::unique_ptr<SchedulerWorkerThread> worker_thread = |
| SchedulerWorkerThread::Create( |
| - thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
| - this, re_enqueue_sequence_callback)), |
| + thread_priority, |
| + WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
| + this, re_enqueue_sequence_callback, &shared_priority_queue_)), |
| task_tracker_); |
| if (!worker_thread) |
| break; |
| @@ -326,6 +491,10 @@ bool SchedulerThreadPoolImpl::Initialize( |
| worker_threads_.push_back(std::move(worker_thread)); |
| } |
| +#if DCHECK_IS_ON() |
| + threads_created_.Signal(); |
| +#endif // DCHECK_IS_ON() |
| + |
| return !worker_threads_.empty(); |
| } |
| @@ -349,5 +518,11 @@ void SchedulerThreadPoolImpl::AddToIdleWorkerThreadsStack( |
| idle_worker_threads_stack_cv_for_testing_->Broadcast(); |
| } |
| +void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( |
| + SchedulerWorkerThread* worker_thread) { |
| + AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| + idle_worker_threads_stack_.Remove(worker_thread); |
| +} |
| + |
| } // namespace internal |
| } // namespace base |