OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "base/task_scheduler/thread_pool.h" |
| 6 |
| 7 #include <utility> |
| 8 |
| 9 #include "base/bind.h" |
| 10 #include "base/logging.h" |
| 11 #include "base/task_scheduler/utils.h" |
| 12 |
| 13 namespace base { |
| 14 namespace internal { |
| 15 |
| 16 namespace { |
| 17 |
| 18 bool WorkerThreadIsInVector( |
| 19 const WorkerThread* worker_thread, |
| 20 const std::vector<scoped_ptr<WorkerThread>>& worker_threads) { |
| 21 for (const auto& current_worker_thread : worker_threads) { |
| 22 if (current_worker_thread.get() == worker_thread) |
| 23 return true; |
| 24 } |
| 25 return false; |
| 26 } |
| 27 |
| 28 // A task runner that runs tasks with the PARALLEL strategy. |
| 29 class SchedulerParallelTaskRunner : public TaskRunner { |
| 30 public: |
| 31 // Tasks posted through this task runner have |traits| and are inserted in |
| 32 // |shared_priority_queue|. |delayed_task_manager| is used to post delayed |
| 33 // tasks. |shutdown_manager| is notified when a task is posted. |
| 34 SchedulerParallelTaskRunner(const TaskTraits& traits, |
| 35 PriorityQueue* priority_queue, |
| 36 DelayedTaskManager* delayed_task_manager, |
| 37 ShutdownManager* shutdown_manager); |
| 38 |
| 39 // TaskRunner: |
| 40 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 41 const Closure& closure, |
| 42 TimeDelta delay) override; |
| 43 bool RunsTasksOnCurrentThread() const override; |
| 44 |
| 45 private: |
| 46 ~SchedulerParallelTaskRunner() override; |
| 47 |
| 48 TaskTraits traits_; |
| 49 PriorityQueue* priority_queue_; |
| 50 DelayedTaskManager* delayed_task_manager_; |
| 51 ShutdownManager* shutdown_manager_; |
| 52 |
| 53 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
| 54 }; |
| 55 |
| 56 SchedulerParallelTaskRunner::SchedulerParallelTaskRunner( |
| 57 const TaskTraits& traits, |
| 58 PriorityQueue* priority_queue, |
| 59 DelayedTaskManager* delayed_task_manager, |
| 60 ShutdownManager* shutdown_manager) |
| 61 : traits_(traits), |
| 62 priority_queue_(priority_queue), |
| 63 delayed_task_manager_(delayed_task_manager), |
| 64 shutdown_manager_(shutdown_manager) {} |
| 65 |
| 66 bool SchedulerParallelTaskRunner::PostDelayedTask( |
| 67 const tracked_objects::Location& from_here, |
| 68 const Closure& closure, |
| 69 TimeDelta delay) { |
| 70 Task task(from_here, closure, traits_, TimeTicks::Now()); |
| 71 if (!delay.is_zero()) |
| 72 task.delayed_run_time = task.post_time + delay; |
| 73 PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_, |
| 74 shutdown_manager_, delayed_task_manager_); |
| 75 return true; |
| 76 } |
| 77 |
| 78 bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const { |
| 79 // TODO(fdoray): Return true only if tasks posted may actually run on the |
| 80 // current thread. It is valid, but not ideal, to always return true. |
| 81 return true; |
| 82 } |
| 83 |
| 84 SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default; |
| 85 |
| 86 // A task runner that runs tasks in with the SEQUENCED strategy. |
| 87 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| 88 public: |
| 89 // Tasks posted through this task runner have |traits| and are inserted in |
| 90 // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|. |
| 91 // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is |
| 92 // notified when a task is posted. |
| 93 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| 94 scoped_refptr<Sequence> sequence, |
| 95 PriorityQueue* priority_queue, |
| 96 DelayedTaskManager* delayed_task_manager, |
| 97 ShutdownManager* shutdown_manager); |
| 98 |
| 99 // SequencedTaskRunner: |
| 100 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 101 const Closure& task, |
| 102 TimeDelta delay) override; |
| 103 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 104 const Closure& closure, |
| 105 TimeDelta delay) override; |
| 106 bool RunsTasksOnCurrentThread() const override; |
| 107 |
| 108 private: |
| 109 ~SchedulerSequencedTaskRunner() override; |
| 110 |
| 111 TaskTraits traits_; |
| 112 scoped_refptr<Sequence> sequence_; |
| 113 PriorityQueue* priority_queue_; |
| 114 DelayedTaskManager* delayed_task_manager_; |
| 115 ShutdownManager* shutdown_manager_; |
| 116 |
| 117 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| 118 }; |
| 119 |
| 120 SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner( |
| 121 const TaskTraits& traits, |
| 122 scoped_refptr<Sequence> sequence, |
| 123 PriorityQueue* priority_queue, |
| 124 DelayedTaskManager* delayed_task_manager, |
| 125 ShutdownManager* shutdown_manager) |
| 126 : traits_(traits), |
| 127 sequence_(sequence), |
| 128 priority_queue_(priority_queue), |
| 129 delayed_task_manager_(delayed_task_manager), |
| 130 shutdown_manager_(shutdown_manager) {} |
| 131 |
| 132 bool SchedulerSequencedTaskRunner::PostDelayedTask( |
| 133 const tracked_objects::Location& from_here, |
| 134 const Closure& closure, |
| 135 TimeDelta delay) { |
| 136 Task task(from_here, closure, traits_, TimeTicks::Now()); |
| 137 if (!delay.is_zero()) |
| 138 task.delayed_run_time = task.post_time + delay; |
| 139 PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_, |
| 140 delayed_task_manager_); |
| 141 return true; |
| 142 } |
| 143 |
| 144 bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const { |
| 145 // TODO(fdoray): Return true only if tasks posted may actually run on the |
| 146 // current thread. It is valid, but not ideal, to always return true. |
| 147 return true; |
| 148 } |
| 149 |
| 150 bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask( |
| 151 const tracked_objects::Location& from_here, |
| 152 const Closure& task, |
| 153 TimeDelta delay) { |
| 154 return PostDelayedTask(from_here, task, delay); |
| 155 } |
| 156 |
| 157 SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default; |
| 158 |
| 159 } // namespace |
| 160 |
| 161 |
| 162 ThreadPool::~ThreadPool() = default; |
| 163 |
| 164 scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool( |
| 165 ThreadPriority thread_priority, |
| 166 size_t num_threads, |
| 167 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, |
| 168 ShutdownManager* shutdown_manager) { |
| 169 scoped_ptr<ThreadPool> thread_pool( |
| 170 new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback, |
| 171 shutdown_manager)); |
| 172 return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool) |
| 173 : scoped_ptr<ThreadPool>(); |
| 174 } |
| 175 |
| 176 size_t ThreadPool::GetNumThreads() const { |
| 177 return worker_threads_.size(); |
| 178 } |
| 179 |
| 180 scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits( |
| 181 const TaskTraits& traits, |
| 182 ExecutionMode execution_mode) { |
| 183 switch (execution_mode) { |
| 184 case ExecutionMode::PARALLEL: { |
| 185 return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner( |
| 186 traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_)); |
| 187 } |
| 188 |
| 189 case ExecutionMode::SEQUENCED: { |
| 190 return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner( |
| 191 traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_, |
| 192 &delayed_task_manager_, shutdown_manager_)); |
| 193 } |
| 194 |
| 195 case ExecutionMode::SINGLE_THREADED: { |
| 196 DCHECK(!worker_threads_.empty()); |
| 197 // TODO(fdoray): Better thread assignment. |
| 198 return scoped_refptr<TaskRunner>( |
| 199 worker_threads_.front() |
| 200 ->CreateTaskRunnerWithTraits(traits, execution_mode) |
| 201 .get()); |
| 202 } |
| 203 |
| 204 #if defined(OS_WIN) |
| 205 case ExecutionMode::SINGLE_THREADED_COM_STA: { |
| 206 // TODO(fdoray): Implement COM. |
| 207 NOTIMPLEMENTED(); |
| 208 return scoped_refptr<TaskRunner>(); |
| 209 } |
| 210 #endif |
| 211 |
| 212 default: { |
| 213 NOTREACHED(); |
| 214 return scoped_refptr<TaskRunner>(); |
| 215 } |
| 216 } |
| 217 } |
| 218 |
| 219 void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence, |
| 220 const SequenceSortKey& sequence_sort_key, |
| 221 const WorkerThread* worker_thread) { |
| 222 DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get()); |
| 223 |
| 224 // Set a flag to avoid waking up a thread when reinserting |sequence| in |
| 225 // |priority_queue_| if the thread doing the reinsertion: |
| 226 // - Can run tasks from |priority_queue_|, and, |
| 227 // - Doesn't have pending single-threaded tasks. |
| 228 // If these conditions are met, the thread doing the reinsertion will soon |
| 229 // pop a sequence from |priority_queue_|. There is no need to wake up a new |
| 230 // thread to do it. |
| 231 if (worker_thread->shared_priority_queue() == &priority_queue_ && |
| 232 !worker_thread->HasSingleThreadedTasks()) { |
| 233 disable_wake_up_thread_on_sequence_insertion_.Set(true); |
| 234 } |
| 235 |
| 236 // Insert the sequence in the priority queue. |
| 237 priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key); |
| 238 |
| 239 disable_wake_up_thread_on_sequence_insertion_.Set(false); |
| 240 } |
| 241 |
| 242 void ThreadPool::ShutdownAndJoinAllThreadsForTesting() { |
| 243 for (const auto& worker_thread : worker_threads_) |
| 244 worker_thread->ShutdownAndJoinForTesting(); |
| 245 } |
| 246 |
| 247 ThreadPool::ThreadPool( |
| 248 ThreadPriority thread_priority, |
| 249 size_t num_threads, |
| 250 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, |
| 251 ShutdownManager* shutdown_manager) |
| 252 : thread_pool_ready_(true, false), |
| 253 priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue, |
| 254 Unretained(this))), |
| 255 shutdown_manager_(shutdown_manager), |
| 256 delayed_task_manager_( |
| 257 Bind(&ThreadPool::WakeUpOneThread, Unretained(this)), |
| 258 shutdown_manager_) { |
| 259 DCHECK_GT(num_threads, 0u); |
| 260 DCHECK(shutdown_manager); |
| 261 |
| 262 // The platform threads reference thread pool data structures and there's |
| 263 // currently no way for us to create them in a suspended state. We'll use the |
| 264 // main entry callback to have the threads wait and then signal them when |
| 265 // we're ready. |
| 266 const WorkerThread::MainEntryCallback main_entry_callback = |
| 267 Bind(&WaitableEvent::Wait, Unretained(&thread_pool_ready_)); |
| 268 |
| 269 const WorkerThread::BecomesIdleCallback becomes_idle_callback = |
| 270 Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this)); |
| 271 worker_threads_.reserve(num_threads); |
| 272 |
| 273 for (size_t i = 0; i < num_threads; ++i) { |
| 274 scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread( |
| 275 thread_priority, &priority_queue_, main_entry_callback, |
| 276 reinsert_sequence_callback, becomes_idle_callback, |
| 277 &delayed_task_manager_, shutdown_manager_); |
| 278 if (worker_thread.get() != nullptr) |
| 279 worker_threads_.push_back(std::move(worker_thread)); |
| 280 } |
| 281 |
| 282 thread_pool_ready_.Signal(); |
| 283 } |
| 284 |
| 285 void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) { |
| 286 DCHECK(WorkerThreadIsInVector(worker_thread, worker_threads_)); |
| 287 |
| 288 AutoSchedulerLock auto_lock_(idle_worker_threads_lock_); |
| 289 |
| 290 if (idle_worker_threads_set_.find(worker_thread) != |
| 291 idle_worker_threads_set_.end()) { |
| 292 // The worker thread is already on the stack of idle threads. |
| 293 return; |
| 294 } |
| 295 |
| 296 // Add the worker thread to the stack of idle threads. |
| 297 idle_worker_threads_stack_.push(worker_thread); |
| 298 idle_worker_threads_set_.insert(worker_thread); |
| 299 } |
| 300 |
| 301 void ThreadPool::WakeUpOneThread() { |
| 302 // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't |
| 303 // have pending or running single-threaded tasks. |
| 304 AutoSchedulerLock auto_lock(idle_worker_threads_lock_); |
| 305 while (!idle_worker_threads_stack_.empty()) { |
| 306 WorkerThread* worker_thread = idle_worker_threads_stack_.top(); |
| 307 |
| 308 idle_worker_threads_stack_.pop(); |
| 309 idle_worker_threads_set_.erase(worker_thread); |
| 310 |
| 311 // HasSingleThreadedTasks() can return stale results. However, when it |
| 312 // returns true below, it is guaranteed that |worker_thread| is either awake |
| 313 // or about to be woken up and that it will not enter |
| 314 // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This |
| 315 // is important because if all threads in |idle_worker_threads_stack_| |
| 316 // report that they have single-threaded tasks, no thread is woken up by |
| 317 // this method. If these threads don't check |priority_queue_| before |
| 318 // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could |
| 319 // end up never being done. The guarantee works because between the moment |
| 320 // HasSingleThreadedTasks() goes from true to false and the moment |
| 321 // |worker_thread| enters WaitUntilWorkIsAvailable(), |
| 322 // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool. |
| 323 // Both WorkerThreadBecomesIdleCallback() and the current method acquire |
| 324 // |idle_worker_threads_lock_|, which synchronizes the value returned by |
| 325 // HasSingleThreadedTasks(). |
| 326 // |
| 327 // TODO(fdoray): A single-threaded task can be posted to |worker_thread| |
| 328 // immediately after HasSingleThreadedTasks() has returned false. Ideally, |
| 329 // when this happens, another worker thread should be woken up. |
| 330 if (!worker_thread->HasSingleThreadedTasks()) { |
| 331 worker_thread->WakeUp(); |
| 332 break; |
| 333 } |
| 334 } |
| 335 } |
| 336 |
| 337 void ThreadPool::OnSequenceInsertedInPriorityQueue() { |
| 338 if (disable_wake_up_thread_on_sequence_insertion_.Get()) |
| 339 return; |
| 340 |
| 341 WakeUpOneThread(); |
| 342 } |
| 343 |
| 344 } // namespace internal |
| 345 } // namespace base |
OLD | NEW |