Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "base/task_scheduler/scheduler_thread_pool.h" | |
| 6 | |
| 7 #include <utility> | |
| 8 | |
| 9 #include "base/bind.h" | |
| 10 #include "base/bind_helpers.h" | |
| 11 #include "base/lazy_instance.h" | |
| 12 #include "base/logging.h" | |
| 13 #include "base/memory/ptr_util.h" | |
| 14 #include "base/task_scheduler/task_tracker.h" | |
| 15 | |
| 16 namespace base { | |
| 17 namespace internal { | |
| 18 | |
| 19 namespace { | |
| 20 | |
| 21 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads | |
| 22 // that don't belong to a SchedulerThreadPool. | |
| 23 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky | |
| 24 g_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER; | |
| 25 | |
| 26 // A task runner that runs tasks with the PARALLEL ExecutionMode. | |
| 27 class SchedulerParallelTaskRunner : public TaskRunner { | |
| 28 public: | |
| 29 SchedulerParallelTaskRunner(const TaskTraits& traits, | |
| 30 PriorityQueue* priority_queue, | |
| 31 TaskTracker* task_tracker) | |
| 32 : traits_(traits), | |
| 33 priority_queue_(priority_queue), | |
| 34 task_tracker_(task_tracker) {} | |
| 35 | |
| 36 // TaskRunner: | |
| 37 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
| 38 const Closure& closure, | |
| 39 TimeDelta delay) override { | |
| 40 // TODO(fdoray): Support delayed tasks. | |
| 41 DCHECK(delay.is_zero()); | |
| 42 PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), | |
| 43 make_scoped_refptr(new Sequence), priority_queue_, | |
| 44 task_tracker_); | |
| 45 return true; | |
| 46 } | |
| 47 | |
| 48 bool RunsTasksOnCurrentThread() const override { | |
| 49 return g_current_shared_priority_queue.Get().Get() == priority_queue_; | |
| 50 } | |
| 51 | |
| 52 private: | |
| 53 ~SchedulerParallelTaskRunner() override = default; | |
| 54 | |
| 55 const TaskTraits traits_; | |
| 56 PriorityQueue* const priority_queue_; | |
| 57 TaskTracker* const task_tracker_; | |
| 58 | |
| 59 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | |
| 60 }; | |
| 61 | |
| 62 void PostTaskCallback(scoped_refptr<Sequence> sequence, | |
| 63 PriorityQueue* priority_queue, | |
| 64 std::unique_ptr<Task> task) { | |
| 65 DCHECK(sequence); | |
| 66 DCHECK(priority_queue); | |
| 67 DCHECK(task); | |
| 68 | |
| 69 if (sequence->PushTask(std::move(task))) { | |
| 70 // |sequence| must be inserted into |priority_queue| because it was empty | |
| 71 // before |task| was inserted into it. | |
| 72 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); | |
| 73 priority_queue->BeginTransaction()->Push( | |
| 74 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), | |
| 75 sequence_sort_key))); | |
| 76 } | |
| 77 } | |
| 78 | |
| 79 } // namespace | |
| 80 | |
| 81 SchedulerThreadPool::~SchedulerThreadPool() { | |
| 82 AutoSchedulerLock auto_lock(join_for_testing_returned_lock_); | |
| 83 DCHECK(join_for_testing_returned_); | |
| 84 } | |
| 85 | |
| 86 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( | |
| 87 ThreadPriority thread_priority, | |
| 88 size_t max_threads, | |
| 89 const SchedulerWorkerThread::RanTaskFromSequenceCallback& | |
| 90 ran_task_from_sequence_callback, | |
| 91 TaskTracker* task_tracker) { | |
| 92 std::unique_ptr<SchedulerThreadPool> thread_pool( | |
| 93 new SchedulerThreadPool(thread_priority, max_threads, | |
| 94 ran_task_from_sequence_callback, task_tracker)); | |
| 95 | |
| 96 if (thread_pool->worker_threads_.empty()) | |
| 97 return nullptr; | |
| 98 return thread_pool; | |
| 99 } | |
| 100 | |
| 101 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( | |
| 102 const TaskTraits& traits, | |
| 103 ExecutionMode execution_mode) { | |
| 104 switch (execution_mode) { | |
| 105 case ExecutionMode::PARALLEL: | |
| 106 return make_scoped_refptr(new SchedulerParallelTaskRunner( | |
| 107 traits, &shared_priority_queue_, task_tracker_)); | |
| 108 | |
| 109 case ExecutionMode::SEQUENCED: | |
| 110 case ExecutionMode::SINGLE_THREADED: | |
| 111 NOTIMPLEMENTED(); | |
| 112 return nullptr; | |
| 113 } | |
| 114 | |
| 115 NOTREACHED(); | |
| 116 return nullptr; | |
| 117 } | |
| 118 | |
| 119 void SchedulerThreadPool::ReinsertSequence( | |
| 120 scoped_refptr<Sequence> sequence, | |
| 121 const SequenceSortKey& sequence_sort_key) { | |
| 122 DCHECK(!g_current_shared_priority_queue.Get().Get()); | |
| 123 | |
| 124 // If |worker_thread| belongs to this thread pool, set a flag to avoid waking | |
|
robliao
2016/04/01 21:05:51
Update this comment.
fdoray
2016/04/01 21:45:30
Done.
| |
| 125 // up a SchedulerWorkerThread when |sequence| is reinserted in | |
| 126 // |shared_priority_queue_|. In such cases, |worker_thread| will soon get a | |
| 127 // Sequence from |shared_priority_queue_| via GetWorkCallback() and there is | |
| 128 // no need to wake up another SchedulerWorkerThread to do so. | |
| 129 if (g_current_shared_priority_queue.Get().Get() == &shared_priority_queue_) | |
| 130 no_wake_up_on_sequence_insertion_.Set(true); | |
| 131 | |
| 132 shared_priority_queue_.BeginTransaction()->Push( | |
| 133 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), | |
| 134 sequence_sort_key))); | |
| 135 no_wake_up_on_sequence_insertion_.Set(false); | |
| 136 } | |
| 137 | |
| 138 void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() { | |
| 139 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | |
| 140 while (idle_worker_threads_stack_.size() < worker_threads_.size()) | |
| 141 idle_worker_threads_stack_cv_->Wait(); | |
| 142 } | |
| 143 | |
| 144 void SchedulerThreadPool::JoinForTesting() { | |
| 145 for (const auto& worker_thread : worker_threads_) | |
| 146 worker_thread->JoinForTesting(); | |
| 147 | |
| 148 AutoSchedulerLock auto_lock(join_for_testing_returned_lock_); | |
| 149 DCHECK(!join_for_testing_returned_); | |
| 150 join_for_testing_returned_ = true; | |
| 151 } | |
| 152 | |
| 153 SchedulerThreadPool::SchedulerThreadPool( | |
| 154 ThreadPriority thread_priority, | |
| 155 size_t max_threads, | |
| 156 const SchedulerWorkerThread::RanTaskFromSequenceCallback& | |
| 157 ran_task_from_sequence_callback, | |
| 158 TaskTracker* task_tracker) | |
| 159 : shared_priority_queue_(Bind( | |
| 160 &SchedulerThreadPool::SequenceInsertedInSharedPriorityQueueCallback, | |
| 161 Unretained(this))), | |
| 162 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), | |
| 163 idle_worker_threads_stack_cv_( | |
| 164 idle_worker_threads_stack_lock_.CreateConditionVariable()), | |
| 165 task_tracker_(task_tracker) { | |
| 166 DCHECK_GT(max_threads, 0U); | |
| 167 DCHECK(!ran_task_from_sequence_callback.is_null()); | |
| 168 DCHECK(task_tracker_); | |
| 169 | |
| 170 // |this| always outlives the worker threads to which these callbacks are | |
| 171 // passed. | |
| 172 const Closure main_entry_callback( | |
| 173 Bind(&SchedulerThreadPool::MainEntryCallback, Unretained(this))); | |
| 174 const SchedulerWorkerThread::GetWorkCallback get_work_callback( | |
| 175 Bind(&SchedulerThreadPool::GetWorkCallback, Unretained(this))); | |
| 176 | |
| 177 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | |
| 178 | |
| 179 for (size_t i = 0; i < max_threads; ++i) { | |
| 180 std::unique_ptr<SchedulerWorkerThread> worker_thread = | |
| 181 SchedulerWorkerThread::CreateSchedulerWorkerThread( | |
| 182 thread_priority, main_entry_callback, get_work_callback, | |
| 183 ran_task_from_sequence_callback, task_tracker); | |
| 184 if (!worker_thread) | |
| 185 break; | |
| 186 idle_worker_threads_stack_.push(worker_thread.get()); | |
| 187 worker_threads_.push_back(std::move(worker_thread)); | |
| 188 } | |
| 189 } | |
| 190 | |
| 191 void SchedulerThreadPool::WakeUpOneThread() { | |
| 192 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | |
| 193 | |
| 194 if (idle_worker_threads_stack_.empty()) | |
| 195 return; | |
| 196 | |
| 197 SchedulerWorkerThread* worker_thread = idle_worker_threads_stack_.top(); | |
| 198 idle_worker_threads_stack_.pop(); | |
| 199 worker_thread->WakeUp(); | |
| 200 } | |
| 201 | |
| 202 void SchedulerThreadPool::AddToIdleSchedulerWorkerThreadsStack( | |
| 203 SchedulerWorkerThread* worker_thread) { | |
| 204 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | |
| 205 idle_worker_threads_stack_.push(worker_thread); | |
| 206 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); | |
| 207 | |
| 208 if (idle_worker_threads_stack_.size() == worker_threads_.size()) | |
| 209 idle_worker_threads_stack_cv_->Signal(); | |
| 210 } | |
| 211 | |
| 212 void SchedulerThreadPool::SequenceInsertedInSharedPriorityQueueCallback() { | |
| 213 if (!no_wake_up_on_sequence_insertion_.Get()) | |
| 214 WakeUpOneThread(); | |
| 215 } | |
| 216 | |
| 217 void SchedulerThreadPool::MainEntryCallback() const { | |
| 218 DCHECK(!g_current_shared_priority_queue.Get().Get()); | |
| 219 g_current_shared_priority_queue.Get().Set(&shared_priority_queue_); | |
| 220 } | |
| 221 | |
| 222 scoped_refptr<Sequence> SchedulerThreadPool::GetWorkCallback( | |
| 223 SchedulerWorkerThread* worker_thread) { | |
| 224 std::unique_ptr<PriorityQueue::Transaction> transaction( | |
| 225 shared_priority_queue_.BeginTransaction()); | |
| 226 const PriorityQueue::SequenceAndSortKey sequence_and_sort_key( | |
| 227 transaction->Peek()); | |
| 228 | |
| 229 if (sequence_and_sort_key.is_null()) { | |
| 230 // |transaction| is kept alive while |worker_thread| is added to | |
| 231 // |idle_worker_threads_stack_| to avoid this race: | |
| 232 // 1. This thread creates a Transaction, finds |shared_priority_queue_| | |
| 233 // empty and ends the Transaction. | |
| 234 // 2. Other thread creates a Transaction, inserts a Sequence into | |
| 235 // |shared_priority_queue_| and ends the Transaction. This can't happen | |
| 236 // if the Transaction of step 1 is still active. | |
| 237 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because | |
| 238 // |idle_worker_threads_stack_| is empty. | |
| 239 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to | |
| 240 // sleep. No thread runs the Sequence inserted in step 2. | |
| 241 AddToIdleSchedulerWorkerThreadsStack(worker_thread); | |
| 242 return nullptr; | |
| 243 } | |
| 244 | |
| 245 transaction->Pop(); | |
| 246 return sequence_and_sort_key.sequence; | |
| 247 } | |
| 248 | |
| 249 void PostTaskHelper(std::unique_ptr<Task> task, | |
| 250 scoped_refptr<Sequence> sequence, | |
| 251 PriorityQueue* priority_queue, | |
| 252 TaskTracker* task_tracker) { | |
| 253 DCHECK(task); | |
| 254 DCHECK(sequence); | |
| 255 DCHECK(priority_queue); | |
| 256 DCHECK(task_tracker); | |
| 257 | |
| 258 task_tracker->PostTask( | |
| 259 Bind(&PostTaskCallback, std::move(sequence), priority_queue), | |
| 260 std::move(task)); | |
| 261 } | |
| 262 | |
| 263 } // namespace internal | |
| 264 } // namespace base | |
| OLD | NEW |