Chromium Code Reviews| Index: base/task_scheduler/scheduler_thread_pool.cc |
| diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool.cc |
| index 8a8d3819b308181c17abd744e4ab92ea9c3debcf..16e249d9cebec31ba206d41a54db2127dfe3d183 100644 |
| --- a/base/task_scheduler/scheduler_thread_pool.cc |
| +++ b/base/task_scheduler/scheduler_thread_pool.cc |
| @@ -168,10 +168,33 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
| return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
| traits, this, task_tracker_, delayed_task_manager_)); |
| - case ExecutionMode::SINGLE_THREADED: |
| - // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
| - NOTREACHED(); |
| - return nullptr; |
| + case ExecutionMode::SINGLE_THREADED: { |
| + // Acquire a lock to ensure that the number of single-thread TaskRunners |
| + // for a given SchedulerWorkerThread doesn't increase between the calls to |
| + // GetNumSingleThreadTaskRunners() and CreateTaskRunnerWithTraits() below. |
| + // This number can still decrease. |
| + AutoSchedulerLock auto_lock(single_thread_task_runner_creation_lock_); |
| + |
| + SchedulerWorkerThread* worker_thread = nullptr; |
| + size_t min_num_single_thread_task_runners = -1; |
| + for (const auto& current_worker_thread : worker_threads_) { |
|
gab
2016/04/18 19:06:35
Add a TODO to make this work with dynamic thread c
|
| + const size_t current_num_single_thread_task_runners = |
| + current_worker_thread->GetNumSingleThreadTaskRunners(); |
| + if (current_num_single_thread_task_runners < |
| + min_num_single_thread_task_runners) { |
| + worker_thread = current_worker_thread.get(); |
| + min_num_single_thread_task_runners = |
| + current_num_single_thread_task_runners; |
| + if (min_num_single_thread_task_runners == 0) |
| + break; |
|
gab
2016/04/18 19:06:35
If we assume we always assign in a loop then there
fdoray
2016/04/18 19:47:14
SingleThreadTaskRunners can be destroyed and thus
|
| + } |
| + } |
| + |
| + // |worker_thread| is the SchedulerWorkerThread with the fewer single- |
| + // thread TaskRunners. |
| + DCHECK(worker_thread); |
| + return worker_thread->CreateTaskRunnerWithTraits(traits); |
| + } |
| } |
| NOTREACHED(); |
| @@ -199,7 +222,7 @@ void SchedulerThreadPool::EnqueueSequence( |
| void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() { |
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| - while (idle_worker_threads_stack_.size() < worker_threads_.size()) |
| + while (idle_worker_threads_stack_.Size() < worker_threads_.size()) |
| idle_worker_threads_stack_cv_for_testing_->Wait(); |
| } |
| @@ -245,15 +268,26 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( |
| SchedulerWorkerThread* worker_thread, |
| PriorityQueue* single_threaded_priority_queue, |
| bool* is_single_threaded_sequence) { |
| - // TODO(fdoray): Return a Sequence from |single_threaded_priority_queue| when |
| - // appropriate. |
| + DCHECK(worker_thread); |
| + DCHECK(single_threaded_priority_queue); |
| + DCHECK(is_single_threaded_sequence); |
| - std::unique_ptr<PriorityQueue::Transaction> transaction( |
| + *is_single_threaded_sequence = false; |
| + |
| + std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
| outer_->shared_priority_queue_.BeginTransaction()); |
| - const auto sequence_and_sort_key = transaction->Peek(); |
| + const auto shared_sequence_and_sort_key = shared_transaction->Peek(); |
| + |
| + std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
| + single_threaded_priority_queue->BeginTransaction()); |
| + const auto single_threaded_sequence_and_sort_key = |
| + single_threaded_transaction->Peek(); |
| + |
| + if (shared_sequence_and_sort_key.is_null() && |
| + single_threaded_sequence_and_sort_key.is_null()) { |
| + single_threaded_transaction.reset(); |
| - if (sequence_and_sort_key.is_null()) { |
| - // |transaction| is kept alive while |worker_thread| is added to |
| + // |shared_transaction| is kept alive while |worker_thread| is added to |
| // |idle_worker_threads_stack_| to avoid this race: |
| // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
| // empty and ends the Transaction. |
| @@ -269,8 +303,26 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( |
| return nullptr; |
| } |
| - transaction->Pop(); |
| - return sequence_and_sort_key.sequence; |
| + scoped_refptr<Sequence> sequence; |
| + |
| + if (single_threaded_sequence_and_sort_key.is_null() || |
| + (!shared_sequence_and_sort_key.is_null() && |
| + single_threaded_sequence_and_sort_key.sort_key < |
| + shared_sequence_and_sort_key.sort_key)) { |
| + shared_transaction->Pop(); |
| + sequence = std::move(shared_sequence_and_sort_key.sequence); |
| + } else { |
| + DCHECK(!single_threaded_sequence_and_sort_key.is_null()); |
| + single_threaded_transaction->Pop(); |
| + sequence = std::move(single_threaded_sequence_and_sort_key.sequence); |
| + *is_single_threaded_sequence = true; |
| + } |
| + |
| + single_threaded_transaction.reset(); |
| + shared_transaction.reset(); |
| + outer_->RemoveFromIdleWorkerThreadsStack(worker_thread); |
| + |
| + return sequence; |
| } |
| void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( |
| @@ -308,7 +360,7 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
| delayed_task_manager_, &shared_priority_queue_); |
| if (!worker_thread) |
| break; |
| - idle_worker_threads_stack_.push(worker_thread.get()); |
| + idle_worker_threads_stack_.Push(worker_thread.get()); |
| worker_threads_.push_back(std::move(worker_thread)); |
| } |
| @@ -316,7 +368,13 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
| } |
| void SchedulerThreadPool::WakeUpOneThread() { |
| - SchedulerWorkerThread* worker_thread = PopOneIdleWorkerThread(); |
| + SchedulerWorkerThread* worker_thread = nullptr; |
| + { |
| + AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| + if (!idle_worker_threads_stack_.Empty()) |
| + worker_thread = idle_worker_threads_stack_.Pop(); |
| + } |
| + |
| if (worker_thread) |
| worker_thread->WakeUp(); |
| } |
| @@ -324,22 +382,17 @@ void SchedulerThreadPool::WakeUpOneThread() { |
| void SchedulerThreadPool::AddToIdleWorkerThreadsStack( |
| SchedulerWorkerThread* worker_thread) { |
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| - idle_worker_threads_stack_.push(worker_thread); |
| - DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); |
| + idle_worker_threads_stack_.Push(worker_thread); |
| + DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size()); |
| - if (idle_worker_threads_stack_.size() == worker_threads_.size()) |
| + if (idle_worker_threads_stack_.Size() == worker_threads_.size()) |
| idle_worker_threads_stack_cv_for_testing_->Broadcast(); |
| } |
| -SchedulerWorkerThread* SchedulerThreadPool::PopOneIdleWorkerThread() { |
| +void SchedulerThreadPool::RemoveFromIdleWorkerThreadsStack( |
| + SchedulerWorkerThread* worker_thread) { |
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| - |
| - if (idle_worker_threads_stack_.empty()) |
| - return nullptr; |
| - |
| - auto worker_thread = idle_worker_threads_stack_.top(); |
| - idle_worker_threads_stack_.pop(); |
| - return worker_thread; |
| + idle_worker_threads_stack_.Remove(worker_thread); |
| } |
| } // namespace internal |