| Index: base/task_scheduler/scheduler_thread_pool.cc
|
| diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool.cc
|
| index 0d42d8a370466c712ff261b72708bb18d1e4ee54..b379d9a3ed0f71182a687f7546609499a99df669 100644
|
| --- a/base/task_scheduler/scheduler_thread_pool.cc
|
| +++ b/base/task_scheduler/scheduler_thread_pool.cc
|
| @@ -174,10 +174,33 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
|
| return make_scoped_refptr(new SchedulerSequencedTaskRunner(
|
| traits, this, task_tracker_, delayed_task_manager_));
|
|
|
| - case ExecutionMode::SINGLE_THREADED:
|
| - // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
|
| - NOTREACHED();
|
| - return nullptr;
|
| + case ExecutionMode::SINGLE_THREADED: {
|
| + // Acquire a lock to ensure that the number of single-thread TaskRunners
|
| + // for a given SchedulerWorkerThread doesn't increase between the calls to
|
| + // GetNumSingleThreadTaskRunners() and CreateTaskRunnerWithTraits() below.
|
| + // This number can still decrease.
|
| + AutoSchedulerLock auto_lock(single_thread_task_runner_creation_lock_);
|
| +
|
| + SchedulerWorkerThread* worker_thread = nullptr;
|
| + size_t min_num_single_thread_task_runners = -1;
|
| + for (const auto& current_worker_thread : worker_threads_) {
|
| + const size_t current_num_single_thread_task_runners =
|
| + current_worker_thread->GetNumSingleThreadTaskRunners();
|
| + if (current_num_single_thread_task_runners <
|
| + min_num_single_thread_task_runners) {
|
| + worker_thread = current_worker_thread.get();
|
| + min_num_single_thread_task_runners =
|
| + current_num_single_thread_task_runners;
|
| + if (min_num_single_thread_task_runners == 0)
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // |worker_thread| is the SchedulerWorkerThread with the fewer single-
|
| + // thread TaskRunners.
|
| + DCHECK(worker_thread);
|
| + return worker_thread->CreateTaskRunnerWithTraits(traits);
|
| + }
|
| }
|
|
|
| NOTREACHED();
|
| @@ -205,7 +228,7 @@ void SchedulerThreadPool::EnqueueSequence(
|
|
|
| void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
|
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| - while (idle_worker_threads_stack_.size() < worker_threads_.size())
|
| + while (idle_worker_threads_stack_.Size() < worker_threads_.size())
|
| idle_worker_threads_stack_cv_for_testing_->Wait();
|
| }
|
|
|
| @@ -251,15 +274,26 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| SchedulerWorkerThread* worker_thread,
|
| PriorityQueue* alternate_priority_queue,
|
| bool* alternate_priority_queue_used) {
|
| - // TODO(fdoray): Return a Sequence from |alternate_priority_queue| when
|
| - // appropriate.
|
| + DCHECK(worker_thread);
|
| + DCHECK(alternate_priority_queue);
|
| + DCHECK(alternate_priority_queue_used);
|
|
|
| - std::unique_ptr<PriorityQueue::Transaction> transaction(
|
| + *is_single_threaded_sequence = false;
|
| +
|
| + std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
|
| outer_->shared_priority_queue_.BeginTransaction());
|
| - const auto sequence_and_sort_key = transaction->Peek();
|
| + const auto shared_sequence_and_sort_key = shared_transaction->Peek();
|
| +
|
| + std::unique_ptr<PriorityQueue::Transaction> alternate_transaction(
|
| + alternate_priority_queue->BeginTransaction());
|
| + const auto alternate_sequence_and_sort_key =
|
| + alternate_transaction->Peek();
|
| +
|
| + if (shared_sequence_and_sort_key.is_null() &&
|
| + alternate_sequence_and_sort_key.is_null()) {
|
| + alternate_transaction.reset();
|
|
|
| - if (sequence_and_sort_key.is_null()) {
|
| - // |transaction| is kept alive while |worker_thread| is added to
|
| + // |shared_transaction| is kept alive while |worker_thread| is added to
|
| // |idle_worker_threads_stack_| to avoid this race:
|
| // 1. This thread creates a Transaction, finds |shared_priority_queue_|
|
| // empty and ends the Transaction.
|
| @@ -275,8 +309,26 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
|
| return nullptr;
|
| }
|
|
|
| - transaction->Pop();
|
| - return sequence_and_sort_key.sequence;
|
| + scoped_refptr<Sequence> sequence;
|
| +
|
| + if (alternate_sequence_and_sort_key.is_null() ||
|
| + (!shared_sequence_and_sort_key.is_null() &&
|
| + alternate_sequence_and_sort_key.sort_key <
|
| + shared_sequence_and_sort_key.sort_key)) {
|
| + shared_transaction->Pop();
|
| + sequence = std::move(shared_sequence_and_sort_key.sequence);
|
| + } else {
|
| + DCHECK(!alternate_sequence_and_sort_key.is_null());
|
| + alternate_transaction->Pop();
|
| + sequence = std::move(alternate_sequence_and_sort_key.sequence);
|
| + *alternate_priority_queue_used = true;
|
| + }
|
| +
|
| + shared_transaction.reset();
|
| + alternate_transaction.reset();
|
| + outer_->RemoveFromIdleWorkerThreadsStack(worker_thread);
|
| +
|
| + return sequence;
|
| }
|
|
|
| void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
|
| @@ -314,7 +366,7 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
|
| delayed_task_manager_, &shared_priority_queue_);
|
| if (!worker_thread)
|
| break;
|
| - idle_worker_threads_stack_.push(worker_thread.get());
|
| + idle_worker_threads_stack_.Push(worker_thread.get());
|
| worker_threads_.push_back(std::move(worker_thread));
|
| }
|
|
|
| @@ -322,7 +374,13 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
|
| }
|
|
|
| void SchedulerThreadPool::WakeUpOneThread() {
|
| - SchedulerWorkerThread* worker_thread = PopOneIdleWorkerThread();
|
| + SchedulerWorkerThread* worker_thread = nullptr;
|
| + {
|
| + AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| + if (!idle_worker_threads_stack_.Empty())
|
| + worker_thread = idle_worker_threads_stack_.Pop();
|
| + }
|
| +
|
| if (worker_thread)
|
| worker_thread->WakeUp();
|
| }
|
| @@ -330,22 +388,17 @@ void SchedulerThreadPool::WakeUpOneThread() {
|
| void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
|
| SchedulerWorkerThread* worker_thread) {
|
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| - idle_worker_threads_stack_.push(worker_thread);
|
| - DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size());
|
| + idle_worker_threads_stack_.Push(worker_thread);
|
| + DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size());
|
|
|
| - if (idle_worker_threads_stack_.size() == worker_threads_.size())
|
| + if (idle_worker_threads_stack_.Size() == worker_threads_.size())
|
| idle_worker_threads_stack_cv_for_testing_->Broadcast();
|
| }
|
|
|
| -SchedulerWorkerThread* SchedulerThreadPool::PopOneIdleWorkerThread() {
|
| +void SchedulerThreadPool::RemoveFromIdleWorkerThreadsStack(
|
| + SchedulerWorkerThread* worker_thread) {
|
| AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
|
| -
|
| - if (idle_worker_threads_stack_.empty())
|
| - return nullptr;
|
| -
|
| - auto worker_thread = idle_worker_threads_stack_.top();
|
| - idle_worker_threads_stack_.pop();
|
| - return worker_thread;
|
| + idle_worker_threads_stack_.Remove(worker_thread);
|
| }
|
|
|
| } // namespace internal
|
|
|