Index: base/task_scheduler/scheduler_thread_pool.cc |
diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool.cc |
index 8a8d3819b308181c17abd744e4ab92ea9c3debcf..16e249d9cebec31ba206d41a54db2127dfe3d183 100644 |
--- a/base/task_scheduler/scheduler_thread_pool.cc |
+++ b/base/task_scheduler/scheduler_thread_pool.cc |
@@ -168,10 +168,33 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
traits, this, task_tracker_, delayed_task_manager_)); |
- case ExecutionMode::SINGLE_THREADED: |
- // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
- NOTREACHED(); |
- return nullptr; |
+ case ExecutionMode::SINGLE_THREADED: { |
+ // Acquire a lock to ensure that the number of single-thread TaskRunners |
+ // for a given SchedulerWorkerThread doesn't increase between the calls to |
+ // GetNumSingleThreadTaskRunners() and CreateTaskRunnerWithTraits() below. |
+ // This number can still decrease. |
+ AutoSchedulerLock auto_lock(single_thread_task_runner_creation_lock_); |
+ |
+ SchedulerWorkerThread* worker_thread = nullptr; |
+ size_t min_num_single_thread_task_runners = -1; |
+ for (const auto& current_worker_thread : worker_threads_) { |
gab
2016/04/18 19:06:35
Add a TODO to make this work with dynamic thread c
|
+ const size_t current_num_single_thread_task_runners = |
+ current_worker_thread->GetNumSingleThreadTaskRunners(); |
+ if (current_num_single_thread_task_runners < |
+ min_num_single_thread_task_runners) { |
+ worker_thread = current_worker_thread.get(); |
+ min_num_single_thread_task_runners = |
+ current_num_single_thread_task_runners; |
+ if (min_num_single_thread_task_runners == 0) |
+ break; |
gab
2016/04/18 19:06:35
If we assume we always assign in a loop then there
fdoray
2016/04/18 19:47:14
SingleThreadTaskRunners can be destroyed and thus
|
+ } |
+ } |
+ |
+ // |worker_thread| is the SchedulerWorkerThread with the fewer single- |
+ // thread TaskRunners. |
+ DCHECK(worker_thread); |
+ return worker_thread->CreateTaskRunnerWithTraits(traits); |
+ } |
} |
NOTREACHED(); |
@@ -199,7 +222,7 @@ void SchedulerThreadPool::EnqueueSequence( |
void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() { |
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- while (idle_worker_threads_stack_.size() < worker_threads_.size()) |
+ while (idle_worker_threads_stack_.Size() < worker_threads_.size()) |
idle_worker_threads_stack_cv_for_testing_->Wait(); |
} |
@@ -245,15 +268,26 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( |
SchedulerWorkerThread* worker_thread, |
PriorityQueue* single_threaded_priority_queue, |
bool* is_single_threaded_sequence) { |
- // TODO(fdoray): Return a Sequence from |single_threaded_priority_queue| when |
- // appropriate. |
+ DCHECK(worker_thread); |
+ DCHECK(single_threaded_priority_queue); |
+ DCHECK(is_single_threaded_sequence); |
- std::unique_ptr<PriorityQueue::Transaction> transaction( |
+ *is_single_threaded_sequence = false; |
+ |
+ std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
outer_->shared_priority_queue_.BeginTransaction()); |
- const auto sequence_and_sort_key = transaction->Peek(); |
+ const auto shared_sequence_and_sort_key = shared_transaction->Peek(); |
+ |
+ std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
+ single_threaded_priority_queue->BeginTransaction()); |
+ const auto single_threaded_sequence_and_sort_key = |
+ single_threaded_transaction->Peek(); |
+ |
+ if (shared_sequence_and_sort_key.is_null() && |
+ single_threaded_sequence_and_sort_key.is_null()) { |
+ single_threaded_transaction.reset(); |
- if (sequence_and_sort_key.is_null()) { |
- // |transaction| is kept alive while |worker_thread| is added to |
+ // |shared_transaction| is kept alive while |worker_thread| is added to |
// |idle_worker_threads_stack_| to avoid this race: |
// 1. This thread creates a Transaction, finds |shared_priority_queue_| |
// empty and ends the Transaction. |
@@ -269,8 +303,26 @@ SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( |
return nullptr; |
} |
- transaction->Pop(); |
- return sequence_and_sort_key.sequence; |
+ scoped_refptr<Sequence> sequence; |
+ |
+ if (single_threaded_sequence_and_sort_key.is_null() || |
+ (!shared_sequence_and_sort_key.is_null() && |
+ single_threaded_sequence_and_sort_key.sort_key < |
+ shared_sequence_and_sort_key.sort_key)) { |
+ shared_transaction->Pop(); |
+ sequence = std::move(shared_sequence_and_sort_key.sequence); |
+ } else { |
+ DCHECK(!single_threaded_sequence_and_sort_key.is_null()); |
+ single_threaded_transaction->Pop(); |
+ sequence = std::move(single_threaded_sequence_and_sort_key.sequence); |
+ *is_single_threaded_sequence = true; |
+ } |
+ |
+ single_threaded_transaction.reset(); |
+ shared_transaction.reset(); |
+ outer_->RemoveFromIdleWorkerThreadsStack(worker_thread); |
+ |
+ return sequence; |
} |
void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( |
@@ -308,7 +360,7 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
delayed_task_manager_, &shared_priority_queue_); |
if (!worker_thread) |
break; |
- idle_worker_threads_stack_.push(worker_thread.get()); |
+ idle_worker_threads_stack_.Push(worker_thread.get()); |
worker_threads_.push_back(std::move(worker_thread)); |
} |
@@ -316,7 +368,13 @@ bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
} |
void SchedulerThreadPool::WakeUpOneThread() { |
- SchedulerWorkerThread* worker_thread = PopOneIdleWorkerThread(); |
+ SchedulerWorkerThread* worker_thread = nullptr; |
+ { |
+ AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
+ if (!idle_worker_threads_stack_.Empty()) |
+ worker_thread = idle_worker_threads_stack_.Pop(); |
+ } |
+ |
if (worker_thread) |
worker_thread->WakeUp(); |
} |
@@ -324,22 +382,17 @@ void SchedulerThreadPool::WakeUpOneThread() { |
void SchedulerThreadPool::AddToIdleWorkerThreadsStack( |
SchedulerWorkerThread* worker_thread) { |
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- idle_worker_threads_stack_.push(worker_thread); |
- DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); |
+ idle_worker_threads_stack_.Push(worker_thread); |
+ DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size()); |
- if (idle_worker_threads_stack_.size() == worker_threads_.size()) |
+ if (idle_worker_threads_stack_.Size() == worker_threads_.size()) |
idle_worker_threads_stack_cv_for_testing_->Broadcast(); |
} |
-SchedulerWorkerThread* SchedulerThreadPool::PopOneIdleWorkerThread() { |
+void SchedulerThreadPool::RemoveFromIdleWorkerThreadsStack( |
+ SchedulerWorkerThread* worker_thread) { |
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- |
- if (idle_worker_threads_stack_.empty()) |
- return nullptr; |
- |
- auto worker_thread = idle_worker_threads_stack_.top(); |
- idle_worker_threads_stack_.pop(); |
- return worker_thread; |
+ idle_worker_threads_stack_.Remove(worker_thread); |
} |
} // namespace internal |