Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_thread_pool.h" | 5 #include "base/task_scheduler/scheduler_thread_pool.h" |
| 6 | 6 |
| 7 #include <utility> | 7 #include <utility> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/bind_helpers.h" | 10 #include "base/bind_helpers.h" |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 23 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads | 23 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads |
| 24 // that don't belong to a SchedulerThreadPool. | 24 // that don't belong to a SchedulerThreadPool. |
| 25 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky | 25 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky |
| 26 tls_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER; | 26 tls_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER; |
| 27 | 27 |
| 28 // A task runner that runs tasks with the PARALLEL ExecutionMode. | 28 // A task runner that runs tasks with the PARALLEL ExecutionMode. |
| 29 class SchedulerParallelTaskRunner : public TaskRunner { | 29 class SchedulerParallelTaskRunner : public TaskRunner { |
| 30 public: | 30 public: |
| 31 SchedulerParallelTaskRunner(const TaskTraits& traits, | 31 SchedulerParallelTaskRunner(const TaskTraits& traits, |
| 32 PriorityQueue* priority_queue, | 32 PriorityQueue* priority_queue, |
| 33 TaskTracker* task_tracker) | 33 TaskTracker* task_tracker, |
| 34 DelayedTaskManager* delayed_task_manager) | |
| 34 : traits_(traits), | 35 : traits_(traits), |
| 35 priority_queue_(priority_queue), | 36 priority_queue_(priority_queue), |
| 36 task_tracker_(task_tracker) {} | 37 task_tracker_(task_tracker), |
| 38 delayed_task_manager_(delayed_task_manager) {} | |
| 37 | 39 |
| 38 // TaskRunner: | 40 // TaskRunner: |
| 39 bool PostDelayedTask(const tracked_objects::Location& from_here, | 41 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 40 const Closure& closure, | 42 const Closure& closure, |
| 41 TimeDelta delay) override { | 43 TimeDelta delay) override { |
| 42 // TODO(fdoray): Support delayed tasks. | 44 // TODO(fdoray): Support delayed tasks. |
|
gab
2016/04/11 18:28:30
Remove TODO
fdoray
2016/04/11 19:57:06
Delayed tasks are not supported until we have the
gab
2016/04/12 13:35:47
Ah right, but from this component's POV it *is* po
fdoray
2016/04/12 14:43:56
Done. Removed the DCHECK here and put it above Pos
| |
| 43 DCHECK(delay.is_zero()); | 45 DCHECK(delay.is_zero()); |
| 44 | 46 |
| 45 // Post the task as part of a one-off single-task Sequence. | 47 // Post the task as part of a one-off single-task Sequence. |
| 46 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), | 48 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), |
| 47 make_scoped_refptr(new Sequence), priority_queue_, | 49 make_scoped_refptr(new Sequence), priority_queue_, |
| 48 task_tracker_); | 50 task_tracker_, delayed_task_manager_); |
| 49 } | 51 } |
| 50 | 52 |
| 51 bool RunsTasksOnCurrentThread() const override { | 53 bool RunsTasksOnCurrentThread() const override { |
| 52 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; | 54 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; |
| 53 } | 55 } |
| 54 | 56 |
| 55 private: | 57 private: |
| 56 ~SchedulerParallelTaskRunner() override = default; | 58 ~SchedulerParallelTaskRunner() override = default; |
| 57 | 59 |
| 58 const TaskTraits traits_; | 60 const TaskTraits traits_; |
| 59 PriorityQueue* const priority_queue_; | 61 PriorityQueue* const priority_queue_; |
| 60 TaskTracker* const task_tracker_; | 62 TaskTracker* const task_tracker_; |
| 63 DelayedTaskManager* const delayed_task_manager_; | |
| 61 | 64 |
| 62 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 65 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
| 63 }; | 66 }; |
| 64 | 67 |
| 65 // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 68 // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
| 66 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 69 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| 67 public: | 70 public: |
| 68 SchedulerSequencedTaskRunner(const TaskTraits& traits, | 71 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| 69 PriorityQueue* priority_queue, | 72 PriorityQueue* priority_queue, |
| 70 TaskTracker* task_tracker) | 73 TaskTracker* task_tracker, |
| 74 DelayedTaskManager* delayed_task_manager) | |
| 71 : traits_(traits), | 75 : traits_(traits), |
| 72 priority_queue_(priority_queue), | 76 priority_queue_(priority_queue), |
| 73 task_tracker_(task_tracker) {} | 77 task_tracker_(task_tracker), |
| 78 delayed_task_manager_(delayed_task_manager) {} | |
| 74 | 79 |
| 75 // SequencedTaskRunner: | 80 // SequencedTaskRunner: |
| 76 bool PostDelayedTask(const tracked_objects::Location& from_here, | 81 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 77 const Closure& closure, | 82 const Closure& closure, |
| 78 TimeDelta delay) override { | 83 TimeDelta delay) override { |
| 79 // TODO(fdoray): Support delayed tasks. | 84 // TODO(fdoray): Support delayed tasks. |
| 80 DCHECK(delay.is_zero()); | 85 DCHECK(delay.is_zero()); |
| 81 | 86 |
| 82 // Post the task as part of |sequence_|. | 87 // Post the task as part of |sequence_|. |
| 83 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), | 88 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), |
| 84 sequence_, priority_queue_, task_tracker_); | 89 sequence_, priority_queue_, task_tracker_, |
| 90 delayed_task_manager_); | |
| 85 } | 91 } |
| 86 | 92 |
| 87 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 93 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 88 const Closure& closure, | 94 const Closure& closure, |
| 89 base::TimeDelta delay) override { | 95 base::TimeDelta delay) override { |
| 90 // Tasks are never nested within the task scheduler. | 96 // Tasks are never nested within the task scheduler. |
| 91 return PostDelayedTask(from_here, closure, delay); | 97 return PostDelayedTask(from_here, closure, delay); |
| 92 } | 98 } |
| 93 | 99 |
| 94 bool RunsTasksOnCurrentThread() const override { | 100 bool RunsTasksOnCurrentThread() const override { |
| 95 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; | 101 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; |
| 96 } | 102 } |
| 97 | 103 |
| 98 private: | 104 private: |
| 99 ~SchedulerSequencedTaskRunner() override = default; | 105 ~SchedulerSequencedTaskRunner() override = default; |
| 100 | 106 |
| 101 // Sequence in which all Tasks posted through this TaskRunner are inserted. | 107 // Sequence in which all Tasks posted through this TaskRunner are inserted. |
| 102 const scoped_refptr<Sequence> sequence_ = new Sequence; | 108 const scoped_refptr<Sequence> sequence_ = new Sequence; |
| 103 | 109 |
| 104 const TaskTraits traits_; | 110 const TaskTraits traits_; |
| 105 PriorityQueue* const priority_queue_; | 111 PriorityQueue* const priority_queue_; |
| 106 TaskTracker* const task_tracker_; | 112 TaskTracker* const task_tracker_; |
| 113 DelayedTaskManager* const delayed_task_manager_; | |
| 107 | 114 |
| 108 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 115 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| 109 }; | 116 }; |
| 110 | 117 |
| 111 } // namespace | 118 } // namespace |
| 112 | 119 |
| 113 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl | 120 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl |
| 114 : public SchedulerWorkerThread::Delegate { | 121 : public SchedulerWorkerThread::Delegate { |
| 115 public: | 122 public: |
| 116 SchedulerWorkerThreadDelegateImpl( | 123 SchedulerWorkerThreadDelegateImpl( |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 128 SchedulerThreadPool* outer_; | 135 SchedulerThreadPool* outer_; |
| 129 const EnqueueSequenceCallback enqueue_sequence_callback_; | 136 const EnqueueSequenceCallback enqueue_sequence_callback_; |
| 130 | 137 |
| 131 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); | 138 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
| 132 }; | 139 }; |
| 133 | 140 |
| 134 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( | 141 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( |
| 135 ThreadPriority thread_priority, | 142 ThreadPriority thread_priority, |
| 136 size_t max_threads, | 143 size_t max_threads, |
| 137 const EnqueueSequenceCallback& enqueue_sequence_callback, | 144 const EnqueueSequenceCallback& enqueue_sequence_callback, |
| 138 TaskTracker* task_tracker) { | 145 TaskTracker* task_tracker, |
| 139 std::unique_ptr<SchedulerThreadPool> thread_pool( | 146 DelayedTaskManager* delayed_task_manager) { |
| 140 new SchedulerThreadPool(enqueue_sequence_callback, task_tracker)); | 147 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool( |
| 148 enqueue_sequence_callback, task_tracker, delayed_task_manager)); | |
| 141 if (thread_pool->Initialize(thread_priority, max_threads)) | 149 if (thread_pool->Initialize(thread_priority, max_threads)) |
| 142 return thread_pool; | 150 return thread_pool; |
| 143 return nullptr; | 151 return nullptr; |
| 144 } | 152 } |
| 145 | 153 |
| 146 SchedulerThreadPool::~SchedulerThreadPool() { | 154 SchedulerThreadPool::~SchedulerThreadPool() { |
| 147 #if DCHECK_IS_ON() | 155 #if DCHECK_IS_ON() |
| 148 // SchedulerThreadPool should never be deleted in production unless its | 156 // SchedulerThreadPool should never be deleted in production unless its |
| 149 // initialization failed. | 157 // initialization failed. |
| 150 AutoSchedulerLock auto_lock(worker_threads_lock_); | 158 AutoSchedulerLock auto_lock(worker_threads_lock_); |
| 151 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); | 159 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
| 152 #endif // DCHECK_IS_ON() | 160 #endif // DCHECK_IS_ON() |
| 153 } | 161 } |
| 154 | 162 |
| 155 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( | 163 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
| 156 const TaskTraits& traits, | 164 const TaskTraits& traits, |
| 157 ExecutionMode execution_mode) { | 165 ExecutionMode execution_mode) { |
| 158 switch (execution_mode) { | 166 switch (execution_mode) { |
| 159 case ExecutionMode::PARALLEL: | 167 case ExecutionMode::PARALLEL: |
| 160 return make_scoped_refptr(new SchedulerParallelTaskRunner( | 168 return make_scoped_refptr(new SchedulerParallelTaskRunner( |
| 161 traits, &shared_priority_queue_, task_tracker_)); | 169 traits, &shared_priority_queue_, task_tracker_, |
| 170 delayed_task_manager_)); | |
| 162 | 171 |
| 163 case ExecutionMode::SEQUENCED: | 172 case ExecutionMode::SEQUENCED: |
| 164 return make_scoped_refptr(new SchedulerSequencedTaskRunner( | 173 return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
| 165 traits, &shared_priority_queue_, task_tracker_)); | 174 traits, &shared_priority_queue_, task_tracker_, |
| 175 delayed_task_manager_)); | |
| 166 | 176 |
| 167 case ExecutionMode::SINGLE_THREADED: | 177 case ExecutionMode::SINGLE_THREADED: |
| 168 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. | 178 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
| 169 NOTREACHED(); | 179 NOTREACHED(); |
| 170 return nullptr; | 180 return nullptr; |
| 171 } | 181 } |
| 172 | 182 |
| 173 NOTREACHED(); | 183 NOTREACHED(); |
| 174 return nullptr; | 184 return nullptr; |
| 175 } | 185 } |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 251 return sequence_and_sort_key.sequence; | 261 return sequence_and_sort_key.sequence; |
| 252 } | 262 } |
| 253 | 263 |
| 254 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( | 264 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( |
| 255 scoped_refptr<Sequence> sequence) { | 265 scoped_refptr<Sequence> sequence) { |
| 256 enqueue_sequence_callback_.Run(std::move(sequence)); | 266 enqueue_sequence_callback_.Run(std::move(sequence)); |
| 257 } | 267 } |
| 258 | 268 |
| 259 SchedulerThreadPool::SchedulerThreadPool( | 269 SchedulerThreadPool::SchedulerThreadPool( |
| 260 const EnqueueSequenceCallback& enqueue_sequence_callback, | 270 const EnqueueSequenceCallback& enqueue_sequence_callback, |
| 261 TaskTracker* task_tracker) | 271 TaskTracker* task_tracker, |
| 272 DelayedTaskManager* delayed_task_manager) | |
| 262 : shared_priority_queue_( | 273 : shared_priority_queue_( |
| 263 Bind(&SchedulerThreadPool::WakeUpOneThread, Unretained(this))), | 274 Bind(&SchedulerThreadPool::WakeUpOneThread, Unretained(this))), |
| 264 worker_threads_lock_(shared_priority_queue_.container_lock()), | 275 worker_threads_lock_(shared_priority_queue_.container_lock()), |
| 265 idle_worker_threads_stack_cv_for_testing_( | 276 idle_worker_threads_stack_cv_for_testing_( |
| 266 worker_threads_lock_.CreateConditionVariable()), | 277 worker_threads_lock_.CreateConditionVariable()), |
| 267 join_for_testing_returned_(true, false), | 278 join_for_testing_returned_(true, false), |
| 268 worker_thread_delegate_( | 279 worker_thread_delegate_( |
| 269 new SchedulerWorkerThreadDelegateImpl(this, | 280 new SchedulerWorkerThreadDelegateImpl(this, |
| 270 enqueue_sequence_callback)), | 281 enqueue_sequence_callback)), |
| 271 task_tracker_(task_tracker) { | 282 task_tracker_(task_tracker), |
| 283 delayed_task_manager_(delayed_task_manager) { | |
| 272 DCHECK(task_tracker_); | 284 DCHECK(task_tracker_); |
| 285 DCHECK(delayed_task_manager_); | |
| 273 } | 286 } |
| 274 | 287 |
| 275 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, | 288 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
| 276 size_t max_threads) { | 289 size_t max_threads) { |
| 277 AutoSchedulerLock auto_lock(worker_threads_lock_); | 290 AutoSchedulerLock auto_lock(worker_threads_lock_); |
| 278 | 291 |
| 279 DCHECK(worker_threads_.empty()); | 292 DCHECK(worker_threads_.empty()); |
| 280 | 293 |
| 281 for (size_t i = 0; i < max_threads; ++i) { | 294 for (size_t i = 0; i < max_threads; ++i) { |
| 282 std::unique_ptr<SchedulerWorkerThread> worker_thread = | 295 std::unique_ptr<SchedulerWorkerThread> worker_thread = |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 310 AutoSchedulerLock auto_lock(worker_threads_lock_); | 323 AutoSchedulerLock auto_lock(worker_threads_lock_); |
| 311 idle_worker_threads_stack_.push(worker_thread); | 324 idle_worker_threads_stack_.push(worker_thread); |
| 312 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); | 325 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); |
| 313 | 326 |
| 314 if (idle_worker_threads_stack_.size() == worker_threads_.size()) | 327 if (idle_worker_threads_stack_.size() == worker_threads_.size()) |
| 315 idle_worker_threads_stack_cv_for_testing_->Broadcast(); | 328 idle_worker_threads_stack_cv_for_testing_->Broadcast(); |
| 316 } | 329 } |
| 317 | 330 |
| 318 } // namespace internal | 331 } // namespace internal |
| 319 } // namespace base | 332 } // namespace base |
| OLD | NEW |