OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/atomicops.h" | |
12 #include "base/bind.h" | 13 #include "base/bind.h" |
13 #include "base/bind_helpers.h" | 14 #include "base/bind_helpers.h" |
14 #include "base/lazy_instance.h" | 15 #include "base/lazy_instance.h" |
15 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
16 #include "base/sequenced_task_runner.h" | 17 #include "base/sequenced_task_runner.h" |
17 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
18 #include "base/strings/stringprintf.h" | 19 #include "base/strings/stringprintf.h" |
19 #include "base/task_scheduler/delayed_task_manager.h" | 20 #include "base/task_scheduler/delayed_task_manager.h" |
20 #include "base/task_scheduler/task_tracker.h" | 21 #include "base/task_scheduler/task_tracker.h" |
21 #include "base/threading/platform_thread.h" | 22 #include "base/threading/platform_thread.h" |
22 #include "base/threading/thread_local.h" | 23 #include "base/threading/thread_local.h" |
23 #include "base/threading/thread_restrictions.h" | 24 #include "base/threading/thread_restrictions.h" |
25 #include "base/time/time.h" | |
24 | 26 |
25 namespace base { | 27 namespace base { |
26 namespace internal { | 28 namespace internal { |
27 | 29 |
28 namespace { | 30 namespace { |
29 | 31 |
30 // SchedulerWorker that owns the current thread, if any. | 32 // SchedulerWorker that owns the current thread, if any. |
31 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky | 33 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky |
32 tls_current_worker = LAZY_INSTANCE_INITIALIZER; | 34 tls_current_worker = LAZY_INSTANCE_INITIALIZER; |
33 | 35 |
34 // SchedulerWorkerPool that owns the current thread, if any. | 36 // SchedulerWorkerPool that owns the current thread, if any. |
35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky | 37 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky |
36 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; | 38 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; |
37 | 39 |
38 // A task runner that runs tasks with the PARALLEL ExecutionMode. | 40 // A task runner that runs tasks with the PARALLEL ExecutionMode. |
39 class SchedulerParallelTaskRunner : public TaskRunner { | 41 class SchedulerParallelTaskRunner : public TaskRunner { |
40 public: | 42 public: |
41 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 43 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
42 // long as |worker_pool| is alive. | 44 // long as |worker_pool| is alive. |
43 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 45 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
44 SchedulerParallelTaskRunner(const TaskTraits& traits, | 46 SchedulerParallelTaskRunner(const TaskTraits& traits, |
45 SchedulerWorkerPool* worker_pool) | 47 SchedulerWorkerPool* worker_pool) |
46 : traits_(traits), worker_pool_(worker_pool) {} | 48 : traits_(traits), worker_pool_(worker_pool) { |
49 DCHECK(worker_pool_); | |
50 } | |
47 | 51 |
48 // TaskRunner: | 52 // TaskRunner: |
49 bool PostDelayedTask(const tracked_objects::Location& from_here, | 53 bool PostDelayedTask(const tracked_objects::Location& from_here, |
50 const Closure& closure, | 54 const Closure& closure, |
51 TimeDelta delay) override { | 55 TimeDelta delay) override { |
52 // Post the task as part of a one-off single-task Sequence. | 56 // Post the task as part of a one-off single-task Sequence. |
53 return worker_pool_->PostTaskWithSequence( | 57 return worker_pool_->PostTaskWithSequence( |
54 WrapUnique(new Task(from_here, closure, traits_, delay)), | 58 WrapUnique(new Task(from_here, closure, traits_, delay)), |
55 make_scoped_refptr(new Sequence), nullptr); | 59 make_scoped_refptr(new Sequence), nullptr); |
56 } | 60 } |
(...skipping 12 matching lines...) Expand all Loading... | |
69 }; | 73 }; |
70 | 74 |
71 // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 75 // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
72 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 76 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
73 public: | 77 public: |
74 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks | 78 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks |
75 // so long as |worker_pool| is alive. | 79 // so long as |worker_pool| is alive. |
76 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 80 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
77 SchedulerSequencedTaskRunner(const TaskTraits& traits, | 81 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
78 SchedulerWorkerPool* worker_pool) | 82 SchedulerWorkerPool* worker_pool) |
79 : traits_(traits), worker_pool_(worker_pool) {} | 83 : traits_(traits), worker_pool_(worker_pool) { |
84 DCHECK(worker_pool_); | |
85 } | |
80 | 86 |
81 // SequencedTaskRunner: | 87 // SequencedTaskRunner: |
82 bool PostDelayedTask(const tracked_objects::Location& from_here, | 88 bool PostDelayedTask(const tracked_objects::Location& from_here, |
83 const Closure& closure, | 89 const Closure& closure, |
84 TimeDelta delay) override { | 90 TimeDelta delay) override { |
85 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 91 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
86 task->sequenced_task_runner_ref = this; | 92 task->sequenced_task_runner_ref = this; |
87 | 93 |
88 // Post the task as part of |sequence_|. | 94 // Post the task as part of |sequence_|. |
89 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 95 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
(...skipping 24 matching lines...) Expand all Loading... | |
114 }; | 120 }; |
115 | 121 |
116 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. | 122 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
117 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | 123 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
118 public: | 124 public: |
119 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | 125 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
120 // tasks so long as |worker_pool| and |worker| are alive. | 126 // tasks so long as |worker_pool| and |worker| are alive. |
121 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| | 127 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
122 // and |worker|. | 128 // and |worker|. |
123 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | 129 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
124 SchedulerWorkerPool* worker_pool, | 130 SchedulerWorkerPoolImpl* worker_pool, |
125 SchedulerWorker* worker) | 131 SchedulerWorker* worker) |
126 : traits_(traits), | 132 : traits_(traits), |
127 worker_pool_(worker_pool), | 133 worker_pool_(worker_pool), |
128 worker_(worker) {} | 134 worker_(worker) { |
135 DCHECK(worker_pool_); | |
136 DCHECK(worker_); | |
137 worker_pool_->RegisterSingleThreadTaskRunner(worker_); | |
138 } | |
129 | 139 |
130 // SingleThreadTaskRunner: | 140 // SingleThreadTaskRunner: |
131 bool PostDelayedTask(const tracked_objects::Location& from_here, | 141 bool PostDelayedTask(const tracked_objects::Location& from_here, |
132 const Closure& closure, | 142 const Closure& closure, |
133 TimeDelta delay) override { | 143 TimeDelta delay) override { |
134 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 144 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
135 task->single_thread_task_runner_ref = this; | 145 task->single_thread_task_runner_ref = this; |
136 | 146 |
137 // Post the task to be executed by |worker_| as part of |sequence_|. | 147 // Post the task to be executed by |worker_| as part of |sequence_|. |
138 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 148 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
139 worker_); | 149 worker_); |
140 } | 150 } |
141 | 151 |
142 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 152 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
143 const Closure& closure, | 153 const Closure& closure, |
144 base::TimeDelta delay) override { | 154 base::TimeDelta delay) override { |
145 // Tasks are never nested within the task scheduler. | 155 // Tasks are never nested within the task scheduler. |
146 return PostDelayedTask(from_here, closure, delay); | 156 return PostDelayedTask(from_here, closure, delay); |
147 } | 157 } |
148 | 158 |
149 bool RunsTasksOnCurrentThread() const override { | 159 bool RunsTasksOnCurrentThread() const override { |
150 return tls_current_worker.Get().Get() == worker_; | 160 return tls_current_worker.Get().Get() == worker_; |
151 } | 161 } |
152 | 162 |
153 private: | 163 private: |
154 ~SchedulerSingleThreadTaskRunner() override = default; | 164 ~SchedulerSingleThreadTaskRunner() override { |
165 worker_pool_->UnregisterSingleThreadTaskRunner(worker_); | |
166 } | |
155 | 167 |
156 // Sequence for all Tasks posted through this TaskRunner. | 168 // Sequence for all Tasks posted through this TaskRunner. |
157 const scoped_refptr<Sequence> sequence_ = new Sequence; | 169 const scoped_refptr<Sequence> sequence_ = new Sequence; |
158 | 170 |
159 const TaskTraits traits_; | 171 const TaskTraits traits_; |
160 SchedulerWorkerPool* const worker_pool_; | 172 SchedulerWorkerPoolImpl* const worker_pool_; |
161 SchedulerWorker* const worker_; | 173 SchedulerWorker* const worker_; |
162 | 174 |
163 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | 175 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
164 }; | 176 }; |
165 | 177 |
166 // Only used in DCHECKs. | 178 // Only used in DCHECKs. |
167 bool ContainsWorker( | 179 bool ContainsWorker( |
168 const std::vector<std::unique_ptr<SchedulerWorker>>& workers, | 180 const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
169 const SchedulerWorker* worker) { | 181 const SchedulerWorker* worker) { |
170 auto it = std::find_if(workers.begin(), workers.end(), | 182 auto it = std::find_if(workers.begin(), workers.end(), |
171 [worker](const std::unique_ptr<SchedulerWorker>& i) { | 183 [worker](const std::unique_ptr<SchedulerWorker>& i) { |
172 return i.get() == worker; | 184 return i.get() == worker; |
173 }); | 185 }); |
174 return it != workers.end(); | 186 return it != workers.end(); |
175 } | 187 } |
176 | 188 |
177 } // namespace | 189 } // namespace |
178 | 190 |
179 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 191 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
180 : public SchedulerWorker::Delegate { | 192 : public SchedulerWorker::Delegate { |
181 public: | 193 public: |
182 // |outer| owns the worker for which this delegate is constructed. | 194 // |outer| owns the worker for which this delegate is constructed. |
183 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 195 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
184 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 196 // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
185 // PriorityQueue whose transactions may overlap with the worker's | 197 // PriorityQueue whose transactions may overlap with the worker's |
186 // single-threaded PriorityQueue's transactions. |index| will be appended to | 198 // single-threaded PriorityQueue's transactions. |index| will be appended to |
187 // the pool name to label the underlying worker threads. | 199 // the pool name to label the underlying worker threads. |
188 SchedulerWorkerDelegateImpl( | 200 SchedulerWorkerDelegateImpl( |
189 SchedulerWorkerPoolImpl* outer, | 201 SchedulerWorkerPoolImpl* outer, |
202 const TimeDelta& suggested_reclaim_time, | |
190 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 203 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
191 const PriorityQueue* shared_priority_queue, | 204 const PriorityQueue* shared_priority_queue, |
192 int index); | 205 int index); |
193 ~SchedulerWorkerDelegateImpl() override; | 206 ~SchedulerWorkerDelegateImpl() override; |
194 | 207 |
195 PriorityQueue* single_threaded_priority_queue() { | 208 PriorityQueue* single_threaded_priority_queue() { |
196 return &single_threaded_priority_queue_; | 209 return &single_threaded_priority_queue_; |
197 } | 210 } |
198 | 211 |
199 // SchedulerWorker::Delegate: | 212 // SchedulerWorker::Delegate: |
200 void OnMainEntry(SchedulerWorker* worker) override; | 213 void OnMainEntry(SchedulerWorker* worker) override; |
201 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 214 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
202 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 215 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
203 TimeDelta GetSleepTimeout() override; | 216 TimeDelta GetSleepTimeout() override; |
204 bool CanDetach(SchedulerWorker* worker) override; | 217 bool CanDetach(SchedulerWorker* worker) override; |
205 | 218 |
219 void RegisterSingleThreadTaskRunner() { | |
220 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); | |
221 } | |
222 | |
223 void UnregisterSingleThreadTaskRunner() { | |
224 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); | |
225 } | |
226 | |
206 private: | 227 private: |
207 SchedulerWorkerPoolImpl* outer_; | 228 SchedulerWorkerPoolImpl* outer_; |
229 const TimeDelta suggested_reclaim_time_; | |
208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 230 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
209 | 231 |
210 // Single-threaded PriorityQueue for the worker. | 232 // Single-threaded PriorityQueue for the worker. |
211 PriorityQueue single_threaded_priority_queue_; | 233 PriorityQueue single_threaded_priority_queue_; |
212 | 234 |
213 // True if the last Sequence returned by GetWork() was extracted from | 235 // True if the last Sequence returned by GetWork() was extracted from |
214 // |single_threaded_priority_queue_|. | 236 // |single_threaded_priority_queue_|. |
215 bool last_sequence_is_single_threaded_ = false; | 237 bool last_sequence_is_single_threaded_ = false; |
216 | 238 |
239 subtle::Atomic32 num_single_threaded_runners_ = 0; | |
240 | |
217 const int index_; | 241 const int index_; |
218 | 242 |
219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 243 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
220 }; | 244 }; |
221 | 245 |
222 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 246 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
223 // SchedulerWorkerPool should never be deleted in production unless its | 247 // SchedulerWorkerPool should never be deleted in production unless its |
224 // initialization failed. | 248 // initialization failed. |
225 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 249 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
226 } | 250 } |
227 | 251 |
228 // static | 252 // static |
229 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( | 253 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
230 StringPiece name, | 254 StringPiece name, |
231 ThreadPriority thread_priority, | 255 ThreadPriority thread_priority, |
232 size_t max_threads, | 256 size_t max_threads, |
233 IORestriction io_restriction, | 257 IORestriction io_restriction, |
258 const TimeDelta& suggested_reclaim_time, | |
234 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 259 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
235 TaskTracker* task_tracker, | 260 TaskTracker* task_tracker, |
236 DelayedTaskManager* delayed_task_manager) { | 261 DelayedTaskManager* delayed_task_manager) { |
237 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( | 262 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
238 new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, | 263 new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, |
239 delayed_task_manager)); | 264 delayed_task_manager)); |
240 if (worker_pool->Initialize(thread_priority, max_threads, | 265 if (worker_pool->Initialize(thread_priority, max_threads, |
266 suggested_reclaim_time, | |
241 re_enqueue_sequence_callback)) { | 267 re_enqueue_sequence_callback)) { |
242 return worker_pool; | 268 return worker_pool; |
243 } | 269 } |
244 return nullptr; | 270 return nullptr; |
245 } | 271 } |
246 | 272 |
247 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 273 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
248 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 274 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
249 while (idle_workers_stack_.Size() < workers_.size()) | 275 while (idle_workers_stack_.Size() < workers_.size()) |
250 idle_workers_stack_cv_for_testing_->Wait(); | 276 idle_workers_stack_cv_for_testing_->Wait(); |
251 } | 277 } |
252 | 278 |
253 void SchedulerWorkerPoolImpl::JoinForTesting() { | 279 void SchedulerWorkerPoolImpl::JoinForTesting() { |
280 { | |
281 AutoSchedulerLock auto_lock(join_for_testing_called_lock_); | |
282 join_for_testing_called_ = true; | |
283 } | |
254 for (const auto& worker : workers_) | 284 for (const auto& worker : workers_) |
255 worker->JoinForTesting(); | 285 worker->JoinForTesting(); |
256 | 286 |
257 DCHECK(!join_for_testing_returned_.IsSignaled()); | 287 DCHECK(!join_for_testing_returned_.IsSignaled()); |
258 join_for_testing_returned_.Signal(); | 288 join_for_testing_returned_.Signal(); |
259 } | 289 } |
260 | 290 |
261 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 291 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
262 const TaskTraits& traits, | 292 const TaskTraits& traits, |
263 ExecutionMode execution_mode) { | 293 ExecutionMode execution_mode) { |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
361 sequence_sort_key); | 391 sequence_sort_key); |
362 | 392 |
363 // Wake up a worker to process |sequence|. | 393 // Wake up a worker to process |sequence|. |
364 if (worker) | 394 if (worker) |
365 worker->WakeUp(); | 395 worker->WakeUp(); |
366 else | 396 else |
367 WakeUpOneWorker(); | 397 WakeUpOneWorker(); |
368 } | 398 } |
369 } | 399 } |
370 | 400 |
401 void SchedulerWorkerPoolImpl::RegisterSingleThreadTaskRunner( | |
402 SchedulerWorker* worker) { | |
403 auto delegate = | |
404 static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()); | |
405 delegate->RegisterSingleThreadTaskRunner(); | |
406 } | |
407 | |
408 void SchedulerWorkerPoolImpl::UnregisterSingleThreadTaskRunner( | |
409 SchedulerWorker* worker) { | |
410 auto delegate = | |
411 static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()); | |
412 delegate->UnregisterSingleThreadTaskRunner(); | |
413 } | |
414 | |
371 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 415 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
372 SchedulerWorkerDelegateImpl( | 416 SchedulerWorkerDelegateImpl( |
373 SchedulerWorkerPoolImpl* outer, | 417 SchedulerWorkerPoolImpl* outer, |
418 const TimeDelta& suggested_reclaim_time, | |
374 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 419 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
375 const PriorityQueue* shared_priority_queue, | 420 const PriorityQueue* shared_priority_queue, |
376 int index) | 421 int index) |
377 : outer_(outer), | 422 : outer_(outer), |
423 suggested_reclaim_time_(suggested_reclaim_time), | |
378 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 424 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
379 single_threaded_priority_queue_(shared_priority_queue), | 425 single_threaded_priority_queue_(shared_priority_queue), |
380 index_(index) {} | 426 index_(index) {} |
381 | 427 |
382 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 428 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
383 ~SchedulerWorkerDelegateImpl() = default; | 429 ~SchedulerWorkerDelegateImpl() = default; |
384 | 430 |
385 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 431 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
386 SchedulerWorker* worker) { | 432 SchedulerWorker* worker) { |
387 #if DCHECK_IS_ON() | 433 #if DCHECK_IS_ON() |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
469 std::move(sequence), sequence_sort_key); | 515 std::move(sequence), sequence_sort_key); |
470 } else { | 516 } else { |
471 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 517 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
472 // |sequence| must be enqueued. | 518 // |sequence| must be enqueued. |
473 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 519 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
474 } | 520 } |
475 } | 521 } |
476 | 522 |
477 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 523 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
478 GetSleepTimeout() { | 524 GetSleepTimeout() { |
479 return TimeDelta::Max(); | 525 return suggested_reclaim_time_; |
fdoray
2016/07/04 19:41:32
From scheduler_worker.cc:
while (...) {
scoped_
robliao
2016/07/07 17:49:16
Nice catch! Nothing at the moment. Fixed with an i
| |
480 } | 526 } |
481 | 527 |
482 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 528 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
483 SchedulerWorker* worker) { | 529 SchedulerWorker* worker) { |
484 return false; | 530 return worker != outer_->PeekAtIdleWorkersStack() && |
531 !subtle::Release_Load(&num_single_threaded_runners_) && | |
fdoray
2016/07/04 19:41:32
Add comment explaining why it doesn't matter if |n
gab
2016/07/07 15:58:30
Why do you need Release_Load() here?
robliao
2016/07/07 17:49:16
Done.
gab
2016/07/13 18:36:31
Hmmm, I don't think this matters, idle thread acco
robliao
2016/07/13 20:19:46
In either case, it's the safest thing to do here.
gab
2016/07/13 21:07:55
I disagree. It's not safer. It's equivalent (plus
robliao
2016/07/19 22:03:47
In light of the atomic ops discussion, any strong
gab
2016/07/20 01:45:21
From the conclusion we derived, I think the inc/de
robliao
2016/07/20 19:44:00
I've my understanding is right, the atomic ops are
gab
2016/07/20 20:26:31
We have different understandings on a few things (
robliao
2016/07/20 22:18:03
data types (the whole point of atomic RMW is to ma
gab
2016/07/21 13:43:21
I'm not convinced this is true. The RMW will add a
robliao
2016/07/21 18:44:23
Digging into this deeper, the RMW will happen atom
gab
2016/07/21 21:24:24
Refcount is different, no one ever wants to just "
robliao
2016/07/22 16:44:06
Fenced the load.
| |
532 !outer_->HasJoinedForTesting(); | |
485 } | 533 } |
486 | 534 |
487 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 535 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
488 StringPiece name, | 536 StringPiece name, |
489 IORestriction io_restriction, | 537 IORestriction io_restriction, |
490 TaskTracker* task_tracker, | 538 TaskTracker* task_tracker, |
491 DelayedTaskManager* delayed_task_manager) | 539 DelayedTaskManager* delayed_task_manager) |
492 : name_(name.as_string()), | 540 : name_(name.as_string()), |
493 io_restriction_(io_restriction), | 541 io_restriction_(io_restriction), |
494 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 542 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
495 idle_workers_stack_cv_for_testing_( | 543 idle_workers_stack_cv_for_testing_( |
496 idle_workers_stack_lock_.CreateConditionVariable()), | 544 idle_workers_stack_lock_.CreateConditionVariable()), |
497 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 545 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
498 WaitableEvent::InitialState::NOT_SIGNALED), | 546 WaitableEvent::InitialState::NOT_SIGNALED), |
547 join_for_testing_called_(false), | |
499 #if DCHECK_IS_ON() | 548 #if DCHECK_IS_ON() |
500 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | 549 workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
501 WaitableEvent::InitialState::NOT_SIGNALED), | 550 WaitableEvent::InitialState::NOT_SIGNALED), |
502 #endif | 551 #endif |
503 task_tracker_(task_tracker), | 552 task_tracker_(task_tracker), |
504 delayed_task_manager_(delayed_task_manager) { | 553 delayed_task_manager_(delayed_task_manager) { |
505 DCHECK(task_tracker_); | 554 DCHECK(task_tracker_); |
506 DCHECK(delayed_task_manager_); | 555 DCHECK(delayed_task_manager_); |
507 } | 556 } |
508 | 557 |
509 bool SchedulerWorkerPoolImpl::Initialize( | 558 bool SchedulerWorkerPoolImpl::Initialize( |
510 ThreadPriority thread_priority, | 559 ThreadPriority thread_priority, |
511 size_t max_threads, | 560 size_t max_threads, |
561 const TimeDelta& suggested_reclaim_time, | |
512 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | 562 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
513 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 563 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
514 | 564 |
515 DCHECK(workers_.empty()); | 565 DCHECK(workers_.empty()); |
516 | 566 |
517 for (size_t i = 0; i < max_threads; ++i) { | 567 for (size_t i = 0; i < max_threads; ++i) { |
518 std::unique_ptr<SchedulerWorker> worker = | 568 std::unique_ptr<SchedulerWorker> worker = |
519 SchedulerWorker::Create( | 569 SchedulerWorker::Create( |
520 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( | 570 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
521 this, re_enqueue_sequence_callback, | 571 this, suggested_reclaim_time, |
572 re_enqueue_sequence_callback, | |
522 &shared_priority_queue_, static_cast<int>(i))), | 573 &shared_priority_queue_, static_cast<int>(i))), |
523 task_tracker_, | 574 task_tracker_, |
524 SchedulerWorker::InitialState::ALIVE); | 575 i == 0 |
576 ? SchedulerWorker::InitialState::ALIVE | |
577 : SchedulerWorker::InitialState::DETACHED); | |
525 if (!worker) | 578 if (!worker) |
526 break; | 579 break; |
527 idle_workers_stack_.Push(worker.get()); | 580 idle_workers_stack_.Push(worker.get()); |
528 workers_.push_back(std::move(worker)); | 581 workers_.push_back(std::move(worker)); |
529 } | 582 } |
530 | 583 |
531 #if DCHECK_IS_ON() | 584 #if DCHECK_IS_ON() |
532 workers_created_.Signal(); | 585 workers_created_.Signal(); |
533 #endif | 586 #endif |
534 | 587 |
535 return !workers_.empty(); | 588 return !workers_.empty(); |
536 } | 589 } |
537 | 590 |
538 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 591 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
539 SchedulerWorker* worker; | 592 SchedulerWorker* worker; |
540 { | 593 { |
541 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 594 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
542 worker = idle_workers_stack_.Pop(); | 595 worker = idle_workers_stack_.Pop(); |
543 } | 596 } |
544 if (worker) | 597 if (worker) |
545 worker->WakeUp(); | 598 worker->WakeUp(); |
546 } | 599 } |
547 | 600 |
548 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( | 601 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
549 SchedulerWorker* worker) { | 602 SchedulerWorker* worker) { |
550 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 603 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
551 idle_workers_stack_.Push(worker); | 604 // Detachment may cause multiple attempts to add because the delegate cannot |
605 // determine who woke it up. As a result, when it wakes up, it may conclude | |
606 // there's no work to be done and attempt to add itself to the idle stack | |
607 // again. | |
608 if (!idle_workers_stack_.Contains(worker)) | |
fdoray
2016/07/04 19:41:32
A detached worker is only re-created when its Wake
robliao
2016/07/07 17:49:16
Prior to this change, GetWork had two precondition
fdoray
2016/07/07 20:45:56
You're right. Thanks!
| |
609 idle_workers_stack_.Push(worker); | |
610 | |
552 DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); | 611 DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
553 | 612 |
554 if (idle_workers_stack_.Size() == workers_.size()) | 613 if (idle_workers_stack_.Size() == workers_.size()) |
555 idle_workers_stack_cv_for_testing_->Broadcast(); | 614 idle_workers_stack_cv_for_testing_->Broadcast(); |
556 } | 615 } |
557 | 616 |
617 const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { | |
618 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | |
619 return idle_workers_stack_.Peek(); | |
620 } | |
621 | |
558 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( | 622 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
559 SchedulerWorker* worker) { | 623 SchedulerWorker* worker) { |
560 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 624 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
561 idle_workers_stack_.Remove(worker); | 625 idle_workers_stack_.Remove(worker); |
562 } | 626 } |
563 | 627 |
628 bool SchedulerWorkerPoolImpl::HasJoinedForTesting() { | |
629 AutoSchedulerLock auto_lock(join_for_testing_called_lock_); | |
630 return join_for_testing_called_; | |
631 } | |
632 | |
564 } // namespace internal | 633 } // namespace internal |
565 } // namespace base | 634 } // namespace base |
OLD | NEW |