OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
| 12 #include "base/atomicops.h" |
12 #include "base/bind.h" | 13 #include "base/bind.h" |
13 #include "base/bind_helpers.h" | 14 #include "base/bind_helpers.h" |
14 #include "base/lazy_instance.h" | 15 #include "base/lazy_instance.h" |
15 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
16 #include "base/sequenced_task_runner.h" | 17 #include "base/sequenced_task_runner.h" |
17 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
18 #include "base/strings/stringprintf.h" | 19 #include "base/strings/stringprintf.h" |
19 #include "base/task_scheduler/delayed_task_manager.h" | 20 #include "base/task_scheduler/delayed_task_manager.h" |
20 #include "base/task_scheduler/task_tracker.h" | 21 #include "base/task_scheduler/task_tracker.h" |
21 #include "base/threading/platform_thread.h" | 22 #include "base/threading/platform_thread.h" |
22 #include "base/threading/thread_local.h" | 23 #include "base/threading/thread_local.h" |
23 #include "base/threading/thread_restrictions.h" | 24 #include "base/threading/thread_restrictions.h" |
| 25 #include "base/time/time.h" |
24 | 26 |
25 namespace base { | 27 namespace base { |
26 namespace internal { | 28 namespace internal { |
27 | 29 |
28 namespace { | 30 namespace { |
29 | 31 |
30 // SchedulerWorker that owns the current thread, if any. | 32 // SchedulerWorker that owns the current thread, if any. |
31 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky | 33 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky |
32 tls_current_worker = LAZY_INSTANCE_INITIALIZER; | 34 tls_current_worker = LAZY_INSTANCE_INITIALIZER; |
33 | 35 |
34 // SchedulerWorkerPool that owns the current thread, if any. | 36 // SchedulerWorkerPool that owns the current thread, if any. |
35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky | 37 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky |
36 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; | 38 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; |
37 | 39 |
38 // A task runner that runs tasks with the PARALLEL ExecutionMode. | 40 // A task runner that runs tasks with the PARALLEL ExecutionMode. |
39 class SchedulerParallelTaskRunner : public TaskRunner { | 41 class SchedulerParallelTaskRunner : public TaskRunner { |
40 public: | 42 public: |
41 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 43 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
42 // long as |worker_pool| is alive. | 44 // long as |worker_pool| is alive. |
43 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 45 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
44 SchedulerParallelTaskRunner(const TaskTraits& traits, | 46 SchedulerParallelTaskRunner(const TaskTraits& traits, |
45 SchedulerWorkerPool* worker_pool) | 47 SchedulerWorkerPool* worker_pool) |
46 : traits_(traits), worker_pool_(worker_pool) {} | 48 : traits_(traits), worker_pool_(worker_pool) { |
| 49 DCHECK(worker_pool_); |
| 50 } |
47 | 51 |
48 // TaskRunner: | 52 // TaskRunner: |
49 bool PostDelayedTask(const tracked_objects::Location& from_here, | 53 bool PostDelayedTask(const tracked_objects::Location& from_here, |
50 const Closure& closure, | 54 const Closure& closure, |
51 TimeDelta delay) override { | 55 TimeDelta delay) override { |
52 // Post the task as part of a one-off single-task Sequence. | 56 // Post the task as part of a one-off single-task Sequence. |
53 return worker_pool_->PostTaskWithSequence( | 57 return worker_pool_->PostTaskWithSequence( |
54 WrapUnique(new Task(from_here, closure, traits_, delay)), | 58 WrapUnique(new Task(from_here, closure, traits_, delay)), |
55 make_scoped_refptr(new Sequence), nullptr); | 59 make_scoped_refptr(new Sequence), nullptr); |
56 } | 60 } |
(...skipping 12 matching lines...) Expand all Loading... |
69 }; | 73 }; |
70 | 74 |
71 // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 75 // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
72 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 76 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
73 public: | 77 public: |
74 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks | 78 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks |
75 // so long as |worker_pool| is alive. | 79 // so long as |worker_pool| is alive. |
76 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 80 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
77 SchedulerSequencedTaskRunner(const TaskTraits& traits, | 81 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
78 SchedulerWorkerPool* worker_pool) | 82 SchedulerWorkerPool* worker_pool) |
79 : traits_(traits), worker_pool_(worker_pool) {} | 83 : traits_(traits), worker_pool_(worker_pool) { |
| 84 DCHECK(worker_pool_); |
| 85 } |
80 | 86 |
81 // SequencedTaskRunner: | 87 // SequencedTaskRunner: |
82 bool PostDelayedTask(const tracked_objects::Location& from_here, | 88 bool PostDelayedTask(const tracked_objects::Location& from_here, |
83 const Closure& closure, | 89 const Closure& closure, |
84 TimeDelta delay) override { | 90 TimeDelta delay) override { |
85 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 91 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
86 task->sequenced_task_runner_ref = this; | 92 task->sequenced_task_runner_ref = this; |
87 | 93 |
88 // Post the task as part of |sequence_|. | 94 // Post the task as part of |sequence_|. |
89 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 95 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
(...skipping 16 matching lines...) Expand all Loading... |
106 | 112 |
107 // Sequence for all Tasks posted through this TaskRunner. | 113 // Sequence for all Tasks posted through this TaskRunner. |
108 const scoped_refptr<Sequence> sequence_ = new Sequence; | 114 const scoped_refptr<Sequence> sequence_ = new Sequence; |
109 | 115 |
110 const TaskTraits traits_; | 116 const TaskTraits traits_; |
111 SchedulerWorkerPool* const worker_pool_; | 117 SchedulerWorkerPool* const worker_pool_; |
112 | 118 |
113 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 119 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
114 }; | 120 }; |
115 | 121 |
| 122 // Only used in DCHECKs. |
| 123 bool ContainsWorker( |
| 124 const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| 125 const SchedulerWorker* worker) { |
| 126 auto it = std::find_if(workers.begin(), workers.end(), |
| 127 [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| 128 return i.get() == worker; |
| 129 }); |
| 130 return it != workers.end(); |
| 131 } |
| 132 |
| 133 } // namespace |
| 134 |
116 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. | 135 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
117 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | 136 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| 137 public SingleThreadTaskRunner { |
118 public: | 138 public: |
119 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | 139 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
120 // tasks so long as |worker_pool| and |worker| are alive. | 140 // tasks so long as |worker_pool| and |worker| are alive. |
121 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| | 141 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
122 // and |worker|. | 142 // and |worker|. |
123 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | 143 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
124 SchedulerWorkerPool* worker_pool, | 144 SchedulerWorkerPool* worker_pool, |
125 SchedulerWorker* worker) | 145 SchedulerWorker* worker); |
126 : traits_(traits), | |
127 worker_pool_(worker_pool), | |
128 worker_(worker) {} | |
129 | 146 |
130 // SingleThreadTaskRunner: | 147 // SingleThreadTaskRunner: |
131 bool PostDelayedTask(const tracked_objects::Location& from_here, | 148 bool PostDelayedTask(const tracked_objects::Location& from_here, |
132 const Closure& closure, | 149 const Closure& closure, |
133 TimeDelta delay) override { | 150 TimeDelta delay) override { |
134 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 151 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
135 task->single_thread_task_runner_ref = this; | 152 task->single_thread_task_runner_ref = this; |
136 | 153 |
137 // Post the task to be executed by |worker_| as part of |sequence_|. | 154 // Post the task to be executed by |worker_| as part of |sequence_|. |
138 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 155 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
139 worker_); | 156 worker_); |
140 } | 157 } |
141 | 158 |
142 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 159 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
143 const Closure& closure, | 160 const Closure& closure, |
144 base::TimeDelta delay) override { | 161 base::TimeDelta delay) override { |
145 // Tasks are never nested within the task scheduler. | 162 // Tasks are never nested within the task scheduler. |
146 return PostDelayedTask(from_here, closure, delay); | 163 return PostDelayedTask(from_here, closure, delay); |
147 } | 164 } |
148 | 165 |
149 bool RunsTasksOnCurrentThread() const override { | 166 bool RunsTasksOnCurrentThread() const override { |
150 return tls_current_worker.Get().Get() == worker_; | 167 return tls_current_worker.Get().Get() == worker_; |
151 } | 168 } |
152 | 169 |
153 private: | 170 private: |
154 ~SchedulerSingleThreadTaskRunner() override = default; | 171 ~SchedulerSingleThreadTaskRunner() override; |
155 | 172 |
156 // Sequence for all Tasks posted through this TaskRunner. | 173 // Sequence for all Tasks posted through this TaskRunner. |
157 const scoped_refptr<Sequence> sequence_ = new Sequence; | 174 const scoped_refptr<Sequence> sequence_ = new Sequence; |
158 | 175 |
159 const TaskTraits traits_; | 176 const TaskTraits traits_; |
160 SchedulerWorkerPool* const worker_pool_; | 177 SchedulerWorkerPool* const worker_pool_; |
161 SchedulerWorker* const worker_; | 178 SchedulerWorker* const worker_; |
162 | 179 |
163 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | 180 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
164 }; | 181 }; |
165 | 182 |
166 // Only used in DCHECKs. | |
167 bool ContainsWorker( | |
168 const std::vector<std::unique_ptr<SchedulerWorker>>& workers, | |
169 const SchedulerWorker* worker) { | |
170 auto it = std::find_if(workers.begin(), workers.end(), | |
171 [worker](const std::unique_ptr<SchedulerWorker>& i) { | |
172 return i.get() == worker; | |
173 }); | |
174 return it != workers.end(); | |
175 } | |
176 | |
177 } // namespace | |
178 | |
179 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 183 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
180 : public SchedulerWorker::Delegate { | 184 : public SchedulerWorker::Delegate { |
181 public: | 185 public: |
182 // |outer| owns the worker for which this delegate is constructed. | 186 // |outer| owns the worker for which this delegate is constructed. |
183 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 187 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
184 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 188 // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
185 // PriorityQueue whose transactions may overlap with the worker's | 189 // PriorityQueue whose transactions may overlap with the worker's |
186 // single-threaded PriorityQueue's transactions. |index| will be appended to | 190 // single-threaded PriorityQueue's transactions. |index| will be appended to |
187 // the pool name to label the underlying worker threads. | 191 // the pool name to label the underlying worker threads. |
188 SchedulerWorkerDelegateImpl( | 192 SchedulerWorkerDelegateImpl( |
189 SchedulerWorkerPoolImpl* outer, | 193 SchedulerWorkerPoolImpl* outer, |
190 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 194 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
191 const PriorityQueue* shared_priority_queue, | 195 const PriorityQueue* shared_priority_queue, |
192 int index); | 196 int index); |
193 ~SchedulerWorkerDelegateImpl() override; | 197 ~SchedulerWorkerDelegateImpl() override; |
194 | 198 |
195 PriorityQueue* single_threaded_priority_queue() { | 199 PriorityQueue* single_threaded_priority_queue() { |
196 return &single_threaded_priority_queue_; | 200 return &single_threaded_priority_queue_; |
197 } | 201 } |
198 | 202 |
199 // SchedulerWorker::Delegate: | 203 // SchedulerWorker::Delegate: |
200 void OnMainEntry(SchedulerWorker* worker) override; | 204 void OnMainEntry(SchedulerWorker* worker) override; |
201 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 205 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
202 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 206 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
203 TimeDelta GetSleepTimeout() override; | 207 TimeDelta GetSleepTimeout() override; |
204 bool CanDetach(SchedulerWorker* worker) override; | 208 bool CanDetach(SchedulerWorker* worker) override; |
205 | 209 |
| 210 void RegisterSingleThreadTaskRunner() { |
| 211 subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
| 212 } |
| 213 |
| 214 void UnregisterSingleThreadTaskRunner() { |
| 215 subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| 216 } |
| 217 |
206 private: | 218 private: |
207 SchedulerWorkerPoolImpl* outer_; | 219 SchedulerWorkerPoolImpl* outer_; |
208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 220 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
209 | 221 |
210 // Single-threaded PriorityQueue for the worker. | 222 // Single-threaded PriorityQueue for the worker. |
211 PriorityQueue single_threaded_priority_queue_; | 223 PriorityQueue single_threaded_priority_queue_; |
212 | 224 |
213 // True if the last Sequence returned by GetWork() was extracted from | 225 // True if the last Sequence returned by GetWork() was extracted from |
214 // |single_threaded_priority_queue_|. | 226 // |single_threaded_priority_queue_|. |
215 bool last_sequence_is_single_threaded_ = false; | 227 bool last_sequence_is_single_threaded_ = false; |
216 | 228 |
| 229 // Time when GetWork() first returned nullptr. |
| 230 TimeTicks idle_start_time_; |
| 231 |
| 232 subtle::Atomic32 num_single_threaded_runners_ = 0; |
| 233 |
217 const int index_; | 234 const int index_; |
218 | 235 |
219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 236 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
220 }; | 237 }; |
221 | 238 |
222 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 239 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
223 // SchedulerWorkerPool should never be deleted in production unless its | 240 // SchedulerWorkerPool should never be deleted in production unless its |
224 // initialization failed. | 241 // initialization failed. |
225 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 242 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
226 } | 243 } |
227 | 244 |
228 // static | 245 // static |
229 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( | 246 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
230 const SchedulerWorkerPoolParams& params, | 247 const SchedulerWorkerPoolParams& params, |
231 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 248 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
232 TaskTracker* task_tracker, | 249 TaskTracker* task_tracker, |
233 DelayedTaskManager* delayed_task_manager) { | 250 DelayedTaskManager* delayed_task_manager) { |
234 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( | 251 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
235 new SchedulerWorkerPoolImpl(params.name(), | 252 new SchedulerWorkerPoolImpl(params.name(), |
236 params.io_restriction(), | 253 params.io_restriction(), |
| 254 params.suggested_reclaim_time(), |
237 task_tracker, delayed_task_manager)); | 255 task_tracker, delayed_task_manager)); |
238 if (worker_pool->Initialize(params.thread_priority(), | 256 if (worker_pool->Initialize(params.thread_priority(), |
239 params.max_threads(), | 257 params.max_threads(), |
240 re_enqueue_sequence_callback)) { | 258 re_enqueue_sequence_callback)) { |
241 return worker_pool; | 259 return worker_pool; |
242 } | 260 } |
243 return nullptr; | 261 return nullptr; |
244 } | 262 } |
245 | 263 |
246 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 264 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
247 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 265 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
248 while (idle_workers_stack_.Size() < workers_.size()) | 266 while (idle_workers_stack_.Size() < workers_.size()) |
249 idle_workers_stack_cv_for_testing_->Wait(); | 267 idle_workers_stack_cv_for_testing_->Wait(); |
250 } | 268 } |
251 | 269 |
252 void SchedulerWorkerPoolImpl::JoinForTesting() { | 270 void SchedulerWorkerPoolImpl::JoinForTesting() { |
| 271 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) << |
| 272 "Workers can detach during join."; |
253 for (const auto& worker : workers_) | 273 for (const auto& worker : workers_) |
254 worker->JoinForTesting(); | 274 worker->JoinForTesting(); |
255 | 275 |
256 DCHECK(!join_for_testing_returned_.IsSignaled()); | 276 DCHECK(!join_for_testing_returned_.IsSignaled()); |
257 join_for_testing_returned_.Signal(); | 277 join_for_testing_returned_.Signal(); |
258 } | 278 } |
259 | 279 |
| 280 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
| 281 AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| 282 worker_detachment_allowed_ = false; |
| 283 } |
| 284 |
260 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 285 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
261 const TaskTraits& traits, | 286 const TaskTraits& traits, |
262 ExecutionMode execution_mode) { | 287 ExecutionMode execution_mode) { |
263 switch (execution_mode) { | 288 switch (execution_mode) { |
264 case ExecutionMode::PARALLEL: | 289 case ExecutionMode::PARALLEL: |
265 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 290 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
266 | 291 |
267 case ExecutionMode::SEQUENCED: | 292 case ExecutionMode::SEQUENCED: |
268 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 293 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
269 | 294 |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
360 sequence_sort_key); | 385 sequence_sort_key); |
361 | 386 |
362 // Wake up a worker to process |sequence|. | 387 // Wake up a worker to process |sequence|. |
363 if (worker) | 388 if (worker) |
364 worker->WakeUp(); | 389 worker->WakeUp(); |
365 else | 390 else |
366 WakeUpOneWorker(); | 391 WakeUpOneWorker(); |
367 } | 392 } |
368 } | 393 } |
369 | 394 |
| 395 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| 396 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| 397 SchedulerWorkerPool* worker_pool, |
| 398 SchedulerWorker* worker) |
| 399 : traits_(traits), |
| 400 worker_pool_(worker_pool), |
| 401 worker_(worker) { |
| 402 DCHECK(worker_pool_); |
| 403 DCHECK(worker_); |
| 404 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| 405 RegisterSingleThreadTaskRunner(); |
| 406 } |
| 407 |
| 408 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| 409 ~SchedulerSingleThreadTaskRunner() { |
| 410 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| 411 UnregisterSingleThreadTaskRunner(); |
| 412 } |
| 413 |
370 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 414 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
371 SchedulerWorkerDelegateImpl( | 415 SchedulerWorkerDelegateImpl( |
372 SchedulerWorkerPoolImpl* outer, | 416 SchedulerWorkerPoolImpl* outer, |
373 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 417 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
374 const PriorityQueue* shared_priority_queue, | 418 const PriorityQueue* shared_priority_queue, |
375 int index) | 419 int index) |
376 : outer_(outer), | 420 : outer_(outer), |
377 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 421 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
378 single_threaded_priority_queue_(shared_priority_queue), | 422 single_threaded_priority_queue_(shared_priority_queue), |
379 index_(index) {} | 423 index_(index) {} |
(...skipping 11 matching lines...) Expand all Loading... |
391 #endif | 435 #endif |
392 | 436 |
393 PlatformThread::SetName( | 437 PlatformThread::SetName( |
394 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); | 438 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); |
395 | 439 |
396 DCHECK(!tls_current_worker.Get().Get()); | 440 DCHECK(!tls_current_worker.Get().Get()); |
397 DCHECK(!tls_current_worker_pool.Get().Get()); | 441 DCHECK(!tls_current_worker_pool.Get().Get()); |
398 tls_current_worker.Get().Set(worker); | 442 tls_current_worker.Get().Set(worker); |
399 tls_current_worker_pool.Get().Set(outer_); | 443 tls_current_worker_pool.Get().Set(outer_); |
400 | 444 |
| 445 // New threads haven't run GetWork() yet, so reset the idle_start_time_. |
| 446 idle_start_time_ = TimeTicks(); |
| 447 |
401 ThreadRestrictions::SetIOAllowed( | 448 ThreadRestrictions::SetIOAllowed( |
402 outer_->io_restriction_ == | 449 outer_->io_restriction_ == |
403 SchedulerWorkerPoolParams::IORestriction::ALLOWED); | 450 SchedulerWorkerPoolParams::IORestriction::ALLOWED); |
404 } | 451 } |
405 | 452 |
406 scoped_refptr<Sequence> | 453 scoped_refptr<Sequence> |
407 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( | 454 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
408 SchedulerWorker* worker) { | 455 SchedulerWorker* worker) { |
409 DCHECK(ContainsWorker(outer_->workers_, worker)); | 456 DCHECK(ContainsWorker(outer_->workers_, worker)); |
410 | 457 |
(...skipping 14 matching lines...) Expand all Loading... |
425 // empty and ends the Transaction. | 472 // empty and ends the Transaction. |
426 // 2. Other thread creates a Transaction, inserts a Sequence into | 473 // 2. Other thread creates a Transaction, inserts a Sequence into |
427 // |shared_priority_queue_| and ends the Transaction. This can't happen | 474 // |shared_priority_queue_| and ends the Transaction. This can't happen |
428 // if the Transaction of step 1 is still active because because there | 475 // if the Transaction of step 1 is still active because because there |
429 // can only be one active Transaction per PriorityQueue at a time. | 476 // can only be one active Transaction per PriorityQueue at a time. |
430 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because | 477 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because |
431 // |idle_workers_stack_| is empty. | 478 // |idle_workers_stack_| is empty. |
432 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. | 479 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. |
433 // No thread runs the Sequence inserted in step 2. | 480 // No thread runs the Sequence inserted in step 2. |
434 outer_->AddToIdleWorkersStack(worker); | 481 outer_->AddToIdleWorkersStack(worker); |
| 482 if (idle_start_time_.is_null()) |
| 483 idle_start_time_ = TimeTicks::Now(); |
435 return nullptr; | 484 return nullptr; |
436 } | 485 } |
437 | 486 |
438 // True if both PriorityQueues have Sequences and the Sequence at the top of | 487 // True if both PriorityQueues have Sequences and the Sequence at the top of |
439 // the shared PriorityQueue is more important. | 488 // the shared PriorityQueue is more important. |
440 const bool shared_sequence_is_more_important = | 489 const bool shared_sequence_is_more_important = |
441 !shared_transaction->IsEmpty() && | 490 !shared_transaction->IsEmpty() && |
442 !single_threaded_transaction->IsEmpty() && | 491 !single_threaded_transaction->IsEmpty() && |
443 shared_transaction->PeekSortKey() > | 492 shared_transaction->PeekSortKey() > |
444 single_threaded_transaction->PeekSortKey(); | 493 single_threaded_transaction->PeekSortKey(); |
445 | 494 |
446 if (single_threaded_transaction->IsEmpty() || | 495 if (single_threaded_transaction->IsEmpty() || |
447 shared_sequence_is_more_important) { | 496 shared_sequence_is_more_important) { |
448 sequence = shared_transaction->PopSequence(); | 497 sequence = shared_transaction->PopSequence(); |
449 last_sequence_is_single_threaded_ = false; | 498 last_sequence_is_single_threaded_ = false; |
450 } else { | 499 } else { |
451 DCHECK(!single_threaded_transaction->IsEmpty()); | 500 DCHECK(!single_threaded_transaction->IsEmpty()); |
452 sequence = single_threaded_transaction->PopSequence(); | 501 sequence = single_threaded_transaction->PopSequence(); |
453 last_sequence_is_single_threaded_ = true; | 502 last_sequence_is_single_threaded_ = true; |
454 } | 503 } |
455 } | 504 } |
456 DCHECK(sequence); | 505 DCHECK(sequence); |
457 | 506 |
| 507 idle_start_time_ = TimeTicks(); |
| 508 |
458 outer_->RemoveFromIdleWorkersStack(worker); | 509 outer_->RemoveFromIdleWorkersStack(worker); |
459 return sequence; | 510 return sequence; |
460 } | 511 } |
461 | 512 |
462 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 513 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
463 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 514 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
464 if (last_sequence_is_single_threaded_) { | 515 if (last_sequence_is_single_threaded_) { |
465 // A single-threaded Sequence is always re-enqueued in the single-threaded | 516 // A single-threaded Sequence is always re-enqueued in the single-threaded |
466 // PriorityQueue from which it was extracted. | 517 // PriorityQueue from which it was extracted. |
467 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); | 518 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
468 single_threaded_priority_queue_.BeginTransaction()->Push( | 519 single_threaded_priority_queue_.BeginTransaction()->Push( |
469 std::move(sequence), sequence_sort_key); | 520 std::move(sequence), sequence_sort_key); |
470 } else { | 521 } else { |
471 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 522 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
472 // |sequence| must be enqueued. | 523 // |sequence| must be enqueued. |
473 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 524 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
474 } | 525 } |
475 } | 526 } |
476 | 527 |
477 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 528 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
478 GetSleepTimeout() { | 529 GetSleepTimeout() { |
479 return TimeDelta::Max(); | 530 return outer_->suggested_reclaim_time_; |
480 } | 531 } |
481 | 532 |
482 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 533 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
483 SchedulerWorker* worker) { | 534 SchedulerWorker* worker) { |
484 return false; | 535 // It's not an issue if |num_single_threaded_runners_| is incremented after |
| 536 // this because the newly created TaskRunner (from which no task has run yet) |
| 537 // will simply run all its tasks on the next physical thread created by the |
| 538 // worker. |
| 539 const bool can_detach = |
| 540 !idle_start_time_.is_null() && |
| 541 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
| 542 worker != outer_->PeekAtIdleWorkersStack() && |
| 543 !subtle::Acquire_Load(&num_single_threaded_runners_) && |
| 544 outer_->CanWorkerDetachForTesting(); |
| 545 return can_detach; |
485 } | 546 } |
486 | 547 |
487 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 548 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
488 StringPiece name, | 549 StringPiece name, |
489 SchedulerWorkerPoolParams::IORestriction io_restriction, | 550 SchedulerWorkerPoolParams::IORestriction io_restriction, |
| 551 const TimeDelta& suggested_reclaim_time, |
490 TaskTracker* task_tracker, | 552 TaskTracker* task_tracker, |
491 DelayedTaskManager* delayed_task_manager) | 553 DelayedTaskManager* delayed_task_manager) |
492 : name_(name.as_string()), | 554 : name_(name.as_string()), |
493 io_restriction_(io_restriction), | 555 io_restriction_(io_restriction), |
| 556 suggested_reclaim_time_(suggested_reclaim_time), |
494 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 557 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
495 idle_workers_stack_cv_for_testing_( | 558 idle_workers_stack_cv_for_testing_( |
496 idle_workers_stack_lock_.CreateConditionVariable()), | 559 idle_workers_stack_lock_.CreateConditionVariable()), |
497 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 560 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
498 WaitableEvent::InitialState::NOT_SIGNALED), | 561 WaitableEvent::InitialState::NOT_SIGNALED), |
499 #if DCHECK_IS_ON() | 562 #if DCHECK_IS_ON() |
500 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | 563 workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
501 WaitableEvent::InitialState::NOT_SIGNALED), | 564 WaitableEvent::InitialState::NOT_SIGNALED), |
502 #endif | 565 #endif |
503 task_tracker_(task_tracker), | 566 task_tracker_(task_tracker), |
(...skipping 10 matching lines...) Expand all Loading... |
514 | 577 |
515 DCHECK(workers_.empty()); | 578 DCHECK(workers_.empty()); |
516 | 579 |
517 for (size_t i = 0; i < max_threads; ++i) { | 580 for (size_t i = 0; i < max_threads; ++i) { |
518 std::unique_ptr<SchedulerWorker> worker = | 581 std::unique_ptr<SchedulerWorker> worker = |
519 SchedulerWorker::Create( | 582 SchedulerWorker::Create( |
520 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( | 583 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
521 this, re_enqueue_sequence_callback, | 584 this, re_enqueue_sequence_callback, |
522 &shared_priority_queue_, static_cast<int>(i))), | 585 &shared_priority_queue_, static_cast<int>(i))), |
523 task_tracker_, | 586 task_tracker_, |
524 SchedulerWorker::InitialState::ALIVE); | 587 i == 0 |
| 588 ? SchedulerWorker::InitialState::ALIVE |
| 589 : SchedulerWorker::InitialState::DETACHED); |
525 if (!worker) | 590 if (!worker) |
526 break; | 591 break; |
527 idle_workers_stack_.Push(worker.get()); | 592 idle_workers_stack_.Push(worker.get()); |
528 workers_.push_back(std::move(worker)); | 593 workers_.push_back(std::move(worker)); |
529 } | 594 } |
530 | 595 |
531 #if DCHECK_IS_ON() | 596 #if DCHECK_IS_ON() |
532 workers_created_.Signal(); | 597 workers_created_.Signal(); |
533 #endif | 598 #endif |
534 | 599 |
535 return !workers_.empty(); | 600 return !workers_.empty(); |
536 } | 601 } |
537 | 602 |
538 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 603 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
539 SchedulerWorker* worker; | 604 SchedulerWorker* worker; |
540 { | 605 { |
541 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 606 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
542 worker = idle_workers_stack_.Pop(); | 607 worker = idle_workers_stack_.Pop(); |
543 } | 608 } |
544 if (worker) | 609 if (worker) |
545 worker->WakeUp(); | 610 worker->WakeUp(); |
546 } | 611 } |
547 | 612 |
548 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( | 613 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
549 SchedulerWorker* worker) { | 614 SchedulerWorker* worker) { |
550 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 615 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
551 idle_workers_stack_.Push(worker); | 616 // Detachment may cause multiple attempts to add because the delegate cannot |
| 617 // determine who woke it up. As a result, when it wakes up, it may conclude |
| 618 // there's no work to be done and attempt to add itself to the idle stack |
| 619 // again. |
| 620 if (!idle_workers_stack_.Contains(worker)) |
| 621 idle_workers_stack_.Push(worker); |
| 622 |
552 DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); | 623 DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
553 | 624 |
554 if (idle_workers_stack_.Size() == workers_.size()) | 625 if (idle_workers_stack_.Size() == workers_.size()) |
555 idle_workers_stack_cv_for_testing_->Broadcast(); | 626 idle_workers_stack_cv_for_testing_->Broadcast(); |
556 } | 627 } |
557 | 628 |
| 629 const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
| 630 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 631 return idle_workers_stack_.Peek(); |
| 632 } |
| 633 |
558 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( | 634 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
559 SchedulerWorker* worker) { | 635 SchedulerWorker* worker) { |
560 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 636 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
561 idle_workers_stack_.Remove(worker); | 637 idle_workers_stack_.Remove(worker); |
562 } | 638 } |
563 | 639 |
| 640 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| 641 AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| 642 return worker_detachment_allowed_; |
| 643 } |
| 644 |
564 } // namespace internal | 645 } // namespace internal |
565 } // namespace base | 646 } // namespace base |
OLD | NEW |