OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_thread_pool.h" | 5 #include "base/task_scheduler/scheduler_thread_pool.h" |
6 | 6 |
7 #include <utility> | 7 #include <utility> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/bind_helpers.h" | 10 #include "base/bind_helpers.h" |
(...skipping 12 matching lines...) Expand all Loading... | |
23 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads | 23 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads |
24 // that don't belong to a SchedulerThreadPool. | 24 // that don't belong to a SchedulerThreadPool. |
25 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky | 25 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky |
26 tls_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER; | 26 tls_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER; |
27 | 27 |
28 // A task runner that runs tasks with the PARALLEL ExecutionMode. | 28 // A task runner that runs tasks with the PARALLEL ExecutionMode. |
29 class SchedulerParallelTaskRunner : public TaskRunner { | 29 class SchedulerParallelTaskRunner : public TaskRunner { |
30 public: | 30 public: |
31 SchedulerParallelTaskRunner(const TaskTraits& traits, | 31 SchedulerParallelTaskRunner(const TaskTraits& traits, |
32 PriorityQueue* priority_queue, | 32 PriorityQueue* priority_queue, |
33 TaskTracker* task_tracker) | 33 TaskTracker* task_tracker, |
34 DelayedTaskManager* delayed_task_manager) | |
34 : traits_(traits), | 35 : traits_(traits), |
35 priority_queue_(priority_queue), | 36 priority_queue_(priority_queue), |
36 task_tracker_(task_tracker) {} | 37 task_tracker_(task_tracker), |
38 delayed_task_manager_(delayed_task_manager) {} | |
37 | 39 |
38 // TaskRunner: | 40 // TaskRunner: |
39 bool PostDelayedTask(const tracked_objects::Location& from_here, | 41 bool PostDelayedTask(const tracked_objects::Location& from_here, |
40 const Closure& closure, | 42 const Closure& closure, |
41 TimeDelta delay) override { | 43 TimeDelta delay) override { |
42 // TODO(fdoray): Support delayed tasks. | 44 // TODO(fdoray): Support delayed tasks. |
gab
2016/04/11 18:28:30
Remove TODO
fdoray
2016/04/11 19:57:06
Delayed tasks are not supported until we have the
gab
2016/04/12 13:35:47
Ah right, but from this component's POV it *is* po
fdoray
2016/04/12 14:43:56
Done. Removed the DCHECK here and put it above Pos
| |
43 DCHECK(delay.is_zero()); | 45 DCHECK(delay.is_zero()); |
44 | 46 |
45 // Post the task as part of a one-off single-task Sequence. | 47 // Post the task as part of a one-off single-task Sequence. |
46 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), | 48 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), |
47 make_scoped_refptr(new Sequence), priority_queue_, | 49 make_scoped_refptr(new Sequence), priority_queue_, |
48 task_tracker_); | 50 task_tracker_, delayed_task_manager_); |
49 } | 51 } |
50 | 52 |
51 bool RunsTasksOnCurrentThread() const override { | 53 bool RunsTasksOnCurrentThread() const override { |
52 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; | 54 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; |
53 } | 55 } |
54 | 56 |
55 private: | 57 private: |
56 ~SchedulerParallelTaskRunner() override = default; | 58 ~SchedulerParallelTaskRunner() override = default; |
57 | 59 |
58 const TaskTraits traits_; | 60 const TaskTraits traits_; |
59 PriorityQueue* const priority_queue_; | 61 PriorityQueue* const priority_queue_; |
60 TaskTracker* const task_tracker_; | 62 TaskTracker* const task_tracker_; |
63 DelayedTaskManager* const delayed_task_manager_; | |
61 | 64 |
62 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 65 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
63 }; | 66 }; |
64 | 67 |
65 // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 68 // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
66 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 69 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
67 public: | 70 public: |
68 SchedulerSequencedTaskRunner(const TaskTraits& traits, | 71 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
69 PriorityQueue* priority_queue, | 72 PriorityQueue* priority_queue, |
70 TaskTracker* task_tracker) | 73 TaskTracker* task_tracker, |
74 DelayedTaskManager* delayed_task_manager) | |
71 : traits_(traits), | 75 : traits_(traits), |
72 priority_queue_(priority_queue), | 76 priority_queue_(priority_queue), |
73 task_tracker_(task_tracker) {} | 77 task_tracker_(task_tracker), |
78 delayed_task_manager_(delayed_task_manager) {} | |
74 | 79 |
75 // SequencedTaskRunner: | 80 // SequencedTaskRunner: |
76 bool PostDelayedTask(const tracked_objects::Location& from_here, | 81 bool PostDelayedTask(const tracked_objects::Location& from_here, |
77 const Closure& closure, | 82 const Closure& closure, |
78 TimeDelta delay) override { | 83 TimeDelta delay) override { |
79 // TODO(fdoray): Support delayed tasks. | 84 // TODO(fdoray): Support delayed tasks. |
80 DCHECK(delay.is_zero()); | 85 DCHECK(delay.is_zero()); |
81 | 86 |
82 // Post the task as part of |sequence_|. | 87 // Post the task as part of |sequence_|. |
83 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), | 88 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), |
84 sequence_, priority_queue_, task_tracker_); | 89 sequence_, priority_queue_, task_tracker_, |
90 delayed_task_manager_); | |
85 } | 91 } |
86 | 92 |
87 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 93 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
88 const Closure& closure, | 94 const Closure& closure, |
89 base::TimeDelta delay) override { | 95 base::TimeDelta delay) override { |
90 // Tasks are never nested within the task scheduler. | 96 // Tasks are never nested within the task scheduler. |
91 return PostDelayedTask(from_here, closure, delay); | 97 return PostDelayedTask(from_here, closure, delay); |
92 } | 98 } |
93 | 99 |
94 bool RunsTasksOnCurrentThread() const override { | 100 bool RunsTasksOnCurrentThread() const override { |
95 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; | 101 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; |
96 } | 102 } |
97 | 103 |
98 private: | 104 private: |
99 ~SchedulerSequencedTaskRunner() override = default; | 105 ~SchedulerSequencedTaskRunner() override = default; |
100 | 106 |
101 // Sequence in which all Tasks posted through this TaskRunner are inserted. | 107 // Sequence in which all Tasks posted through this TaskRunner are inserted. |
102 const scoped_refptr<Sequence> sequence_ = new Sequence; | 108 const scoped_refptr<Sequence> sequence_ = new Sequence; |
103 | 109 |
104 const TaskTraits traits_; | 110 const TaskTraits traits_; |
105 PriorityQueue* const priority_queue_; | 111 PriorityQueue* const priority_queue_; |
106 TaskTracker* const task_tracker_; | 112 TaskTracker* const task_tracker_; |
113 DelayedTaskManager* const delayed_task_manager_; | |
107 | 114 |
108 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 115 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
109 }; | 116 }; |
110 | 117 |
111 } // namespace | 118 } // namespace |
112 | 119 |
113 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl | 120 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl |
114 : public SchedulerWorkerThread::Delegate { | 121 : public SchedulerWorkerThread::Delegate { |
115 public: | 122 public: |
116 SchedulerWorkerThreadDelegateImpl( | 123 SchedulerWorkerThreadDelegateImpl( |
(...skipping 11 matching lines...) Expand all Loading... | |
128 SchedulerThreadPool* outer_; | 135 SchedulerThreadPool* outer_; |
129 const EnqueueSequenceCallback enqueue_sequence_callback_; | 136 const EnqueueSequenceCallback enqueue_sequence_callback_; |
130 | 137 |
131 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); | 138 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
132 }; | 139 }; |
133 | 140 |
134 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( | 141 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( |
135 ThreadPriority thread_priority, | 142 ThreadPriority thread_priority, |
136 size_t max_threads, | 143 size_t max_threads, |
137 const EnqueueSequenceCallback& enqueue_sequence_callback, | 144 const EnqueueSequenceCallback& enqueue_sequence_callback, |
138 TaskTracker* task_tracker) { | 145 TaskTracker* task_tracker, |
139 std::unique_ptr<SchedulerThreadPool> thread_pool( | 146 DelayedTaskManager* delayed_task_manager) { |
140 new SchedulerThreadPool(enqueue_sequence_callback, task_tracker)); | 147 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool( |
148 enqueue_sequence_callback, task_tracker, delayed_task_manager)); | |
141 if (thread_pool->Initialize(thread_priority, max_threads)) | 149 if (thread_pool->Initialize(thread_priority, max_threads)) |
142 return thread_pool; | 150 return thread_pool; |
143 return nullptr; | 151 return nullptr; |
144 } | 152 } |
145 | 153 |
146 SchedulerThreadPool::~SchedulerThreadPool() { | 154 SchedulerThreadPool::~SchedulerThreadPool() { |
147 #if DCHECK_IS_ON() | 155 #if DCHECK_IS_ON() |
148 // SchedulerThreadPool should never be deleted in production unless its | 156 // SchedulerThreadPool should never be deleted in production unless its |
149 // initialization failed. | 157 // initialization failed. |
150 AutoSchedulerLock auto_lock(worker_threads_lock_); | 158 AutoSchedulerLock auto_lock(worker_threads_lock_); |
151 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); | 159 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
152 #endif // DCHECK_IS_ON() | 160 #endif // DCHECK_IS_ON() |
153 } | 161 } |
154 | 162 |
155 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( | 163 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
156 const TaskTraits& traits, | 164 const TaskTraits& traits, |
157 ExecutionMode execution_mode) { | 165 ExecutionMode execution_mode) { |
158 switch (execution_mode) { | 166 switch (execution_mode) { |
159 case ExecutionMode::PARALLEL: | 167 case ExecutionMode::PARALLEL: |
160 return make_scoped_refptr(new SchedulerParallelTaskRunner( | 168 return make_scoped_refptr(new SchedulerParallelTaskRunner( |
161 traits, &shared_priority_queue_, task_tracker_)); | 169 traits, &shared_priority_queue_, task_tracker_, |
170 delayed_task_manager_)); | |
162 | 171 |
163 case ExecutionMode::SEQUENCED: | 172 case ExecutionMode::SEQUENCED: |
164 return make_scoped_refptr(new SchedulerSequencedTaskRunner( | 173 return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
165 traits, &shared_priority_queue_, task_tracker_)); | 174 traits, &shared_priority_queue_, task_tracker_, |
175 delayed_task_manager_)); | |
166 | 176 |
167 case ExecutionMode::SINGLE_THREADED: | 177 case ExecutionMode::SINGLE_THREADED: |
168 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. | 178 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
169 NOTREACHED(); | 179 NOTREACHED(); |
170 return nullptr; | 180 return nullptr; |
171 } | 181 } |
172 | 182 |
173 NOTREACHED(); | 183 NOTREACHED(); |
174 return nullptr; | 184 return nullptr; |
175 } | 185 } |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
251 return sequence_and_sort_key.sequence; | 261 return sequence_and_sort_key.sequence; |
252 } | 262 } |
253 | 263 |
254 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( | 264 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( |
255 scoped_refptr<Sequence> sequence) { | 265 scoped_refptr<Sequence> sequence) { |
256 enqueue_sequence_callback_.Run(std::move(sequence)); | 266 enqueue_sequence_callback_.Run(std::move(sequence)); |
257 } | 267 } |
258 | 268 |
259 SchedulerThreadPool::SchedulerThreadPool( | 269 SchedulerThreadPool::SchedulerThreadPool( |
260 const EnqueueSequenceCallback& enqueue_sequence_callback, | 270 const EnqueueSequenceCallback& enqueue_sequence_callback, |
261 TaskTracker* task_tracker) | 271 TaskTracker* task_tracker, |
272 DelayedTaskManager* delayed_task_manager) | |
262 : shared_priority_queue_( | 273 : shared_priority_queue_( |
263 Bind(&SchedulerThreadPool::WakeUpOneThread, Unretained(this))), | 274 Bind(&SchedulerThreadPool::WakeUpOneThread, Unretained(this))), |
264 worker_threads_lock_(shared_priority_queue_.container_lock()), | 275 worker_threads_lock_(shared_priority_queue_.container_lock()), |
265 idle_worker_threads_stack_cv_for_testing_( | 276 idle_worker_threads_stack_cv_for_testing_( |
266 worker_threads_lock_.CreateConditionVariable()), | 277 worker_threads_lock_.CreateConditionVariable()), |
267 join_for_testing_returned_(true, false), | 278 join_for_testing_returned_(true, false), |
268 worker_thread_delegate_( | 279 worker_thread_delegate_( |
269 new SchedulerWorkerThreadDelegateImpl(this, | 280 new SchedulerWorkerThreadDelegateImpl(this, |
270 enqueue_sequence_callback)), | 281 enqueue_sequence_callback)), |
271 task_tracker_(task_tracker) { | 282 task_tracker_(task_tracker), |
283 delayed_task_manager_(delayed_task_manager) { | |
272 DCHECK(task_tracker_); | 284 DCHECK(task_tracker_); |
285 DCHECK(delayed_task_manager_); | |
273 } | 286 } |
274 | 287 |
275 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, | 288 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
276 size_t max_threads) { | 289 size_t max_threads) { |
277 AutoSchedulerLock auto_lock(worker_threads_lock_); | 290 AutoSchedulerLock auto_lock(worker_threads_lock_); |
278 | 291 |
279 DCHECK(worker_threads_.empty()); | 292 DCHECK(worker_threads_.empty()); |
280 | 293 |
281 for (size_t i = 0; i < max_threads; ++i) { | 294 for (size_t i = 0; i < max_threads; ++i) { |
282 std::unique_ptr<SchedulerWorkerThread> worker_thread = | 295 std::unique_ptr<SchedulerWorkerThread> worker_thread = |
(...skipping 27 matching lines...) Expand all Loading... | |
310 AutoSchedulerLock auto_lock(worker_threads_lock_); | 323 AutoSchedulerLock auto_lock(worker_threads_lock_); |
311 idle_worker_threads_stack_.push(worker_thread); | 324 idle_worker_threads_stack_.push(worker_thread); |
312 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); | 325 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); |
313 | 326 |
314 if (idle_worker_threads_stack_.size() == worker_threads_.size()) | 327 if (idle_worker_threads_stack_.size() == worker_threads_.size()) |
315 idle_worker_threads_stack_cv_for_testing_->Broadcast(); | 328 idle_worker_threads_stack_cv_for_testing_->Broadcast(); |
316 } | 329 } |
317 | 330 |
318 } // namespace internal | 331 } // namespace internal |
319 } // namespace base | 332 } // namespace base |
OLD | NEW |