Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(850)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/atomicops.h"
12 #include "base/bind.h" 13 #include "base/bind.h"
13 #include "base/bind_helpers.h" 14 #include "base/bind_helpers.h"
14 #include "base/lazy_instance.h" 15 #include "base/lazy_instance.h"
15 #include "base/memory/ptr_util.h" 16 #include "base/memory/ptr_util.h"
16 #include "base/sequenced_task_runner.h" 17 #include "base/sequenced_task_runner.h"
17 #include "base/single_thread_task_runner.h" 18 #include "base/single_thread_task_runner.h"
18 #include "base/strings/stringprintf.h" 19 #include "base/strings/stringprintf.h"
19 #include "base/task_scheduler/delayed_task_manager.h" 20 #include "base/task_scheduler/delayed_task_manager.h"
20 #include "base/task_scheduler/task_tracker.h" 21 #include "base/task_scheduler/task_tracker.h"
21 #include "base/threading/platform_thread.h" 22 #include "base/threading/platform_thread.h"
22 #include "base/threading/thread_local.h" 23 #include "base/threading/thread_local.h"
23 #include "base/threading/thread_restrictions.h" 24 #include "base/threading/thread_restrictions.h"
25 #include "base/time/time.h"
24 26
25 namespace base { 27 namespace base {
26 namespace internal { 28 namespace internal {
27 29
28 namespace { 30 namespace {
29 31
30 // SchedulerWorker that owns the current thread, if any. 32 // SchedulerWorker that owns the current thread, if any.
31 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky 33 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky
32 tls_current_worker = LAZY_INSTANCE_INITIALIZER; 34 tls_current_worker = LAZY_INSTANCE_INITIALIZER;
33 35
34 // SchedulerWorkerPool that owns the current thread, if any. 36 // SchedulerWorkerPool that owns the current thread, if any.
35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky 37 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
36 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; 38 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;
37 39
38 // A task runner that runs tasks with the PARALLEL ExecutionMode. 40 // A task runner that runs tasks with the PARALLEL ExecutionMode.
39 class SchedulerParallelTaskRunner : public TaskRunner { 41 class SchedulerParallelTaskRunner : public TaskRunner {
40 public: 42 public:
41 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 43 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
42 // long as |worker_pool| is alive. 44 // long as |worker_pool| is alive.
43 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. 45 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
44 SchedulerParallelTaskRunner(const TaskTraits& traits, 46 SchedulerParallelTaskRunner(const TaskTraits& traits,
45 SchedulerWorkerPool* worker_pool) 47 SchedulerWorkerPool* worker_pool)
46 : traits_(traits), worker_pool_(worker_pool) {} 48 : traits_(traits), worker_pool_(worker_pool) {
49 DCHECK(worker_pool_);
50 }
47 51
48 // TaskRunner: 52 // TaskRunner:
49 bool PostDelayedTask(const tracked_objects::Location& from_here, 53 bool PostDelayedTask(const tracked_objects::Location& from_here,
50 const Closure& closure, 54 const Closure& closure,
51 TimeDelta delay) override { 55 TimeDelta delay) override {
52 // Post the task as part of a one-off single-task Sequence. 56 // Post the task as part of a one-off single-task Sequence.
53 return worker_pool_->PostTaskWithSequence( 57 return worker_pool_->PostTaskWithSequence(
54 WrapUnique(new Task(from_here, closure, traits_, delay)), 58 WrapUnique(new Task(from_here, closure, traits_, delay)),
55 make_scoped_refptr(new Sequence), nullptr); 59 make_scoped_refptr(new Sequence), nullptr);
56 } 60 }
(...skipping 12 matching lines...) Expand all
69 }; 73 };
70 74
71 // A task runner that runs tasks with the SEQUENCED ExecutionMode. 75 // A task runner that runs tasks with the SEQUENCED ExecutionMode.
72 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { 76 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
73 public: 77 public:
74 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks 78 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
75 // so long as |worker_pool| is alive. 79 // so long as |worker_pool| is alive.
76 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. 80 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
77 SchedulerSequencedTaskRunner(const TaskTraits& traits, 81 SchedulerSequencedTaskRunner(const TaskTraits& traits,
78 SchedulerWorkerPool* worker_pool) 82 SchedulerWorkerPool* worker_pool)
79 : traits_(traits), worker_pool_(worker_pool) {} 83 : traits_(traits), worker_pool_(worker_pool) {
84 DCHECK(worker_pool_);
85 }
80 86
81 // SequencedTaskRunner: 87 // SequencedTaskRunner:
82 bool PostDelayedTask(const tracked_objects::Location& from_here, 88 bool PostDelayedTask(const tracked_objects::Location& from_here,
83 const Closure& closure, 89 const Closure& closure,
84 TimeDelta delay) override { 90 TimeDelta delay) override {
85 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); 91 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
86 task->sequenced_task_runner_ref = this; 92 task->sequenced_task_runner_ref = this;
87 93
88 // Post the task as part of |sequence_|. 94 // Post the task as part of |sequence_|.
89 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, 95 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
(...skipping 24 matching lines...) Expand all
114 }; 120 };
115 121
116 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. 122 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
117 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { 123 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
118 public: 124 public:
119 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post 125 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
120 // tasks so long as |worker_pool| and |worker| are alive. 126 // tasks so long as |worker_pool| and |worker| are alive.
121 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| 127 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
122 // and |worker|. 128 // and |worker|.
123 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, 129 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
124 SchedulerWorkerPool* worker_pool, 130 SchedulerWorkerPoolImpl* worker_pool,
125 SchedulerWorker* worker) 131 SchedulerWorker* worker)
126 : traits_(traits), 132 : traits_(traits),
127 worker_pool_(worker_pool), 133 worker_pool_(worker_pool),
128 worker_(worker) {} 134 worker_(worker) {
135 DCHECK(worker_pool_);
136 DCHECK(worker_);
137 worker_pool_->RegisterSingleThreadTaskRunner(worker_);
138 }
129 139
130 // SingleThreadTaskRunner: 140 // SingleThreadTaskRunner:
131 bool PostDelayedTask(const tracked_objects::Location& from_here, 141 bool PostDelayedTask(const tracked_objects::Location& from_here,
132 const Closure& closure, 142 const Closure& closure,
133 TimeDelta delay) override { 143 TimeDelta delay) override {
134 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); 144 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
135 task->single_thread_task_runner_ref = this; 145 task->single_thread_task_runner_ref = this;
136 146
137 // Post the task to be executed by |worker_| as part of |sequence_|. 147 // Post the task to be executed by |worker_| as part of |sequence_|.
138 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, 148 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
139 worker_); 149 worker_);
140 } 150 }
141 151
142 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 152 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
143 const Closure& closure, 153 const Closure& closure,
144 base::TimeDelta delay) override { 154 base::TimeDelta delay) override {
145 // Tasks are never nested within the task scheduler. 155 // Tasks are never nested within the task scheduler.
146 return PostDelayedTask(from_here, closure, delay); 156 return PostDelayedTask(from_here, closure, delay);
147 } 157 }
148 158
149 bool RunsTasksOnCurrentThread() const override { 159 bool RunsTasksOnCurrentThread() const override {
150 return tls_current_worker.Get().Get() == worker_; 160 return tls_current_worker.Get().Get() == worker_;
151 } 161 }
152 162
153 private: 163 private:
154 ~SchedulerSingleThreadTaskRunner() override = default; 164 ~SchedulerSingleThreadTaskRunner() override {
165 worker_pool_->UnregisterSingleThreadTaskRunner(worker_);
166 }
155 167
156 // Sequence for all Tasks posted through this TaskRunner. 168 // Sequence for all Tasks posted through this TaskRunner.
157 const scoped_refptr<Sequence> sequence_ = new Sequence; 169 const scoped_refptr<Sequence> sequence_ = new Sequence;
158 170
159 const TaskTraits traits_; 171 const TaskTraits traits_;
160 SchedulerWorkerPool* const worker_pool_; 172 SchedulerWorkerPoolImpl* const worker_pool_;
161 SchedulerWorker* const worker_; 173 SchedulerWorker* const worker_;
162 174
163 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); 175 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
164 }; 176 };
165 177
166 // Only used in DCHECKs. 178 // Only used in DCHECKs.
167 bool ContainsWorker( 179 bool ContainsWorker(
168 const std::vector<std::unique_ptr<SchedulerWorker>>& workers, 180 const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
169 const SchedulerWorker* worker) { 181 const SchedulerWorker* worker) {
170 auto it = std::find_if(workers.begin(), workers.end(), 182 auto it = std::find_if(workers.begin(), workers.end(),
171 [worker](const std::unique_ptr<SchedulerWorker>& i) { 183 [worker](const std::unique_ptr<SchedulerWorker>& i) {
172 return i.get() == worker; 184 return i.get() == worker;
173 }); 185 });
174 return it != workers.end(); 186 return it != workers.end();
175 } 187 }
176 188
177 } // namespace 189 } // namespace
178 190
179 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl 191 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
180 : public SchedulerWorker::Delegate { 192 : public SchedulerWorker::Delegate {
181 public: 193 public:
182 // |outer| owns the worker for which this delegate is constructed. 194 // |outer| owns the worker for which this delegate is constructed.
183 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is 195 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
184 // called with a non-single-threaded Sequence. |shared_priority_queue| is a 196 // called with a non-single-threaded Sequence. |shared_priority_queue| is a
185 // PriorityQueue whose transactions may overlap with the worker's 197 // PriorityQueue whose transactions may overlap with the worker's
186 // single-threaded PriorityQueue's transactions. |index| will be appended to 198 // single-threaded PriorityQueue's transactions. |index| will be appended to
187 // the pool name to label the underlying worker threads. 199 // the pool name to label the underlying worker threads.
188 SchedulerWorkerDelegateImpl( 200 SchedulerWorkerDelegateImpl(
189 SchedulerWorkerPoolImpl* outer, 201 SchedulerWorkerPoolImpl* outer,
202 const TimeDelta& suggested_reclaim_time,
190 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 203 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
191 const PriorityQueue* shared_priority_queue, 204 const PriorityQueue* shared_priority_queue,
192 int index); 205 int index);
193 ~SchedulerWorkerDelegateImpl() override; 206 ~SchedulerWorkerDelegateImpl() override;
194 207
195 PriorityQueue* single_threaded_priority_queue() { 208 PriorityQueue* single_threaded_priority_queue() {
196 return &single_threaded_priority_queue_; 209 return &single_threaded_priority_queue_;
197 } 210 }
198 211
199 // SchedulerWorker::Delegate: 212 // SchedulerWorker::Delegate:
200 void OnMainEntry(SchedulerWorker* worker) override; 213 void OnMainEntry(SchedulerWorker* worker) override;
201 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; 214 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
202 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; 215 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
203 TimeDelta GetSleepTimeout() override; 216 TimeDelta GetSleepTimeout() override;
204 bool CanDetach(SchedulerWorker* worker) override; 217 bool CanDetach(SchedulerWorker* worker) override;
205 218
219 void RegisterSingleThreadTaskRunner() {
220 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1);
221 }
222
223 void UnregisterSingleThreadTaskRunner() {
224 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1);
225 }
226
206 private: 227 private:
207 SchedulerWorkerPoolImpl* outer_; 228 SchedulerWorkerPoolImpl* outer_;
229 const TimeDelta suggested_reclaim_time_;
208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; 230 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
209 231
210 // Single-threaded PriorityQueue for the worker. 232 // Single-threaded PriorityQueue for the worker.
211 PriorityQueue single_threaded_priority_queue_; 233 PriorityQueue single_threaded_priority_queue_;
212 234
213 // True if the last Sequence returned by GetWork() was extracted from 235 // True if the last Sequence returned by GetWork() was extracted from
214 // |single_threaded_priority_queue_|. 236 // |single_threaded_priority_queue_|.
215 bool last_sequence_is_single_threaded_ = false; 237 bool last_sequence_is_single_threaded_ = false;
216 238
239 subtle::Atomic32 num_single_threaded_runners_ = 0;
240
217 const int index_; 241 const int index_;
218 242
219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); 243 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
220 }; 244 };
221 245
222 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { 246 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
223 // SchedulerWorkerPool should never be deleted in production unless its 247 // SchedulerWorkerPool should never be deleted in production unless its
224 // initialization failed. 248 // initialization failed.
225 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); 249 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
226 } 250 }
227 251
228 // static 252 // static
229 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( 253 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
230 StringPiece name, 254 StringPiece name,
231 ThreadPriority thread_priority, 255 ThreadPriority thread_priority,
232 size_t max_threads, 256 size_t max_threads,
233 IORestriction io_restriction, 257 IORestriction io_restriction,
258 const TimeDelta& suggested_reclaim_time,
234 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 259 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
235 TaskTracker* task_tracker, 260 TaskTracker* task_tracker,
236 DelayedTaskManager* delayed_task_manager) { 261 DelayedTaskManager* delayed_task_manager) {
237 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( 262 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool(
238 new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, 263 new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker,
239 delayed_task_manager)); 264 delayed_task_manager));
240 if (worker_pool->Initialize(thread_priority, max_threads, 265 if (worker_pool->Initialize(thread_priority, max_threads,
266 suggested_reclaim_time,
241 re_enqueue_sequence_callback)) { 267 re_enqueue_sequence_callback)) {
242 return worker_pool; 268 return worker_pool;
243 } 269 }
244 return nullptr; 270 return nullptr;
245 } 271 }
246 272
247 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { 273 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
248 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 274 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
249 while (idle_workers_stack_.Size() < workers_.size()) 275 while (idle_workers_stack_.Size() < workers_.size())
250 idle_workers_stack_cv_for_testing_->Wait(); 276 idle_workers_stack_cv_for_testing_->Wait();
251 } 277 }
252 278
253 void SchedulerWorkerPoolImpl::JoinForTesting() { 279 void SchedulerWorkerPoolImpl::JoinForTesting() {
280 {
281 AutoSchedulerLock auto_lock(join_for_testing_called_lock_);
282 join_for_testing_called_ = true;
283 }
254 for (const auto& worker : workers_) 284 for (const auto& worker : workers_)
255 worker->JoinForTesting(); 285 worker->JoinForTesting();
256 286
257 DCHECK(!join_for_testing_returned_.IsSignaled()); 287 DCHECK(!join_for_testing_returned_.IsSignaled());
258 join_for_testing_returned_.Signal(); 288 join_for_testing_returned_.Signal();
259 } 289 }
260 290
261 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( 291 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
262 const TaskTraits& traits, 292 const TaskTraits& traits,
263 ExecutionMode execution_mode) { 293 ExecutionMode execution_mode) {
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
361 sequence_sort_key); 391 sequence_sort_key);
362 392
363 // Wake up a worker to process |sequence|. 393 // Wake up a worker to process |sequence|.
364 if (worker) 394 if (worker)
365 worker->WakeUp(); 395 worker->WakeUp();
366 else 396 else
367 WakeUpOneWorker(); 397 WakeUpOneWorker();
368 } 398 }
369 } 399 }
370 400
401 void SchedulerWorkerPoolImpl::RegisterSingleThreadTaskRunner(
402 SchedulerWorker* worker) {
403 auto delegate =
404 static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate());
405 delegate->RegisterSingleThreadTaskRunner();
406 }
407
408 void SchedulerWorkerPoolImpl::UnregisterSingleThreadTaskRunner(
409 SchedulerWorker* worker) {
410 auto delegate =
411 static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate());
412 delegate->UnregisterSingleThreadTaskRunner();
413 }
414
371 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 415 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
372 SchedulerWorkerDelegateImpl( 416 SchedulerWorkerDelegateImpl(
373 SchedulerWorkerPoolImpl* outer, 417 SchedulerWorkerPoolImpl* outer,
418 const TimeDelta& suggested_reclaim_time,
374 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 419 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
375 const PriorityQueue* shared_priority_queue, 420 const PriorityQueue* shared_priority_queue,
376 int index) 421 int index)
377 : outer_(outer), 422 : outer_(outer),
423 suggested_reclaim_time_(suggested_reclaim_time),
378 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), 424 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
379 single_threaded_priority_queue_(shared_priority_queue), 425 single_threaded_priority_queue_(shared_priority_queue),
380 index_(index) {} 426 index_(index) {}
381 427
382 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 428 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
383 ~SchedulerWorkerDelegateImpl() = default; 429 ~SchedulerWorkerDelegateImpl() = default;
384 430
385 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( 431 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
386 SchedulerWorker* worker) { 432 SchedulerWorker* worker) {
387 #if DCHECK_IS_ON() 433 #if DCHECK_IS_ON()
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
469 std::move(sequence), sequence_sort_key); 515 std::move(sequence), sequence_sort_key);
470 } else { 516 } else {
471 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue 517 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
472 // |sequence| must be enqueued. 518 // |sequence| must be enqueued.
473 re_enqueue_sequence_callback_.Run(std::move(sequence)); 519 re_enqueue_sequence_callback_.Run(std::move(sequence));
474 } 520 }
475 } 521 }
476 522
477 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 523 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
478 GetSleepTimeout() { 524 GetSleepTimeout() {
479 return TimeDelta::Max(); 525 return suggested_reclaim_time_;
fdoray 2016/07/04 19:41:32 From scheduler_worker.cc: while (...) { scoped_
robliao 2016/07/07 17:49:16 Nice catch! Nothing at the moment. Fixed with an i
480 } 526 }
481 527
482 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( 528 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
483 SchedulerWorker* worker) { 529 SchedulerWorker* worker) {
484 return false; 530 return worker != outer_->PeekAtIdleWorkersStack() &&
531 !subtle::Release_Load(&num_single_threaded_runners_) &&
fdoray 2016/07/04 19:41:32 Add comment explaining why it doesn't matter if |n
gab 2016/07/07 15:58:30 Why do you need Release_Load() here?
robliao 2016/07/07 17:49:16 Done.
gab 2016/07/13 18:36:31 Hmmm, I don't think this matters, idle thread acco
robliao 2016/07/13 20:19:46 In either case, it's the safest thing to do here.
gab 2016/07/13 21:07:55 I disagree. It's not safer. It's equivalent (plus
robliao 2016/07/19 22:03:47 In light of the atomic ops discussion, any strong
gab 2016/07/20 01:45:21 From the conclusion we derived, I think the inc/de
robliao 2016/07/20 19:44:00 I've my understanding is right, the atomic ops are
gab 2016/07/20 20:26:31 We have different understandings on a few things (
robliao 2016/07/20 22:18:03 data types (the whole point of atomic RMW is to ma
gab 2016/07/21 13:43:21 I'm not convinced this is true. The RMW will add a
robliao 2016/07/21 18:44:23 Digging into this deeper, the RMW will happen atom
gab 2016/07/21 21:24:24 Refcount is different, no one ever wants to just "
robliao 2016/07/22 16:44:06 Fenced the load.
532 !outer_->HasJoinedForTesting();
485 } 533 }
486 534
487 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( 535 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
488 StringPiece name, 536 StringPiece name,
489 IORestriction io_restriction, 537 IORestriction io_restriction,
490 TaskTracker* task_tracker, 538 TaskTracker* task_tracker,
491 DelayedTaskManager* delayed_task_manager) 539 DelayedTaskManager* delayed_task_manager)
492 : name_(name.as_string()), 540 : name_(name.as_string()),
493 io_restriction_(io_restriction), 541 io_restriction_(io_restriction),
494 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), 542 idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
495 idle_workers_stack_cv_for_testing_( 543 idle_workers_stack_cv_for_testing_(
496 idle_workers_stack_lock_.CreateConditionVariable()), 544 idle_workers_stack_lock_.CreateConditionVariable()),
497 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, 545 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
498 WaitableEvent::InitialState::NOT_SIGNALED), 546 WaitableEvent::InitialState::NOT_SIGNALED),
547 join_for_testing_called_(false),
499 #if DCHECK_IS_ON() 548 #if DCHECK_IS_ON()
500 workers_created_(WaitableEvent::ResetPolicy::MANUAL, 549 workers_created_(WaitableEvent::ResetPolicy::MANUAL,
501 WaitableEvent::InitialState::NOT_SIGNALED), 550 WaitableEvent::InitialState::NOT_SIGNALED),
502 #endif 551 #endif
503 task_tracker_(task_tracker), 552 task_tracker_(task_tracker),
504 delayed_task_manager_(delayed_task_manager) { 553 delayed_task_manager_(delayed_task_manager) {
505 DCHECK(task_tracker_); 554 DCHECK(task_tracker_);
506 DCHECK(delayed_task_manager_); 555 DCHECK(delayed_task_manager_);
507 } 556 }
508 557
509 bool SchedulerWorkerPoolImpl::Initialize( 558 bool SchedulerWorkerPoolImpl::Initialize(
510 ThreadPriority thread_priority, 559 ThreadPriority thread_priority,
511 size_t max_threads, 560 size_t max_threads,
561 const TimeDelta& suggested_reclaim_time,
512 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { 562 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
513 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 563 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
514 564
515 DCHECK(workers_.empty()); 565 DCHECK(workers_.empty());
516 566
517 for (size_t i = 0; i < max_threads; ++i) { 567 for (size_t i = 0; i < max_threads; ++i) {
518 std::unique_ptr<SchedulerWorker> worker = 568 std::unique_ptr<SchedulerWorker> worker =
519 SchedulerWorker::Create( 569 SchedulerWorker::Create(
520 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( 570 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl(
521 this, re_enqueue_sequence_callback, 571 this, suggested_reclaim_time,
572 re_enqueue_sequence_callback,
522 &shared_priority_queue_, static_cast<int>(i))), 573 &shared_priority_queue_, static_cast<int>(i))),
523 task_tracker_, 574 task_tracker_,
524 SchedulerWorker::InitialState::ALIVE); 575 i == 0
576 ? SchedulerWorker::InitialState::ALIVE
577 : SchedulerWorker::InitialState::DETACHED);
525 if (!worker) 578 if (!worker)
526 break; 579 break;
527 idle_workers_stack_.Push(worker.get()); 580 idle_workers_stack_.Push(worker.get());
528 workers_.push_back(std::move(worker)); 581 workers_.push_back(std::move(worker));
529 } 582 }
530 583
531 #if DCHECK_IS_ON() 584 #if DCHECK_IS_ON()
532 workers_created_.Signal(); 585 workers_created_.Signal();
533 #endif 586 #endif
534 587
535 return !workers_.empty(); 588 return !workers_.empty();
536 } 589 }
537 590
538 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { 591 void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
539 SchedulerWorker* worker; 592 SchedulerWorker* worker;
540 { 593 {
541 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 594 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
542 worker = idle_workers_stack_.Pop(); 595 worker = idle_workers_stack_.Pop();
543 } 596 }
544 if (worker) 597 if (worker)
545 worker->WakeUp(); 598 worker->WakeUp();
546 } 599 }
547 600
548 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( 601 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
549 SchedulerWorker* worker) { 602 SchedulerWorker* worker) {
550 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 603 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
551 idle_workers_stack_.Push(worker); 604 // Detachment may cause multiple attempts to add because the delegate cannot
605 // determine who woke it up. As a result, when it wakes up, it may conclude
606 // there's no work to be done and attempt to add itself to the idle stack
607 // again.
608 if (!idle_workers_stack_.Contains(worker))
fdoray 2016/07/04 19:41:32 A detached worker is only re-created when its Wake
robliao 2016/07/07 17:49:16 Prior to this change, GetWork had two precondition
fdoray 2016/07/07 20:45:56 You're right. Thanks!
609 idle_workers_stack_.Push(worker);
610
552 DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); 611 DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
553 612
554 if (idle_workers_stack_.Size() == workers_.size()) 613 if (idle_workers_stack_.Size() == workers_.size())
555 idle_workers_stack_cv_for_testing_->Broadcast(); 614 idle_workers_stack_cv_for_testing_->Broadcast();
556 } 615 }
557 616
617 const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
618 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
619 return idle_workers_stack_.Peek();
620 }
621
558 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( 622 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
559 SchedulerWorker* worker) { 623 SchedulerWorker* worker) {
560 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 624 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
561 idle_workers_stack_.Remove(worker); 625 idle_workers_stack_.Remove(worker);
562 } 626 }
563 627
628 bool SchedulerWorkerPoolImpl::HasJoinedForTesting() {
629 AutoSchedulerLock auto_lock(join_for_testing_called_lock_);
630 return join_for_testing_called_;
631 }
632
564 } // namespace internal 633 } // namespace internal
565 } // namespace base 634 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698