Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(296)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2801673002: Separate the create and start phases in SchedulerWorkerPoolImpl. (Closed)
Patch Set: self-review Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after
185 185
186 // Number of tasks executed since the last time the 186 // Number of tasks executed since the last time the
187 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. 187 // TaskScheduler.NumTasksBeforeDetach histogram was recorded.
188 size_t num_tasks_since_last_detach_ = 0; 188 size_t num_tasks_since_last_detach_ = 0;
189 189
190 const int index_; 190 const int index_;
191 191
192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); 192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
193 }; 193 };
194 194
195 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
196 const std::string& name,
197 ThreadPriority priority_hint,
198 ReEnqueueSequenceCallback re_enqueue_sequence_callback,
199 TaskTracker* task_tracker,
200 DelayedTaskManager* delayed_task_manager)
201 : name_(name),
202 priority_hint_(priority_hint),
203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)),
204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
205 idle_workers_stack_cv_for_testing_(
206 idle_workers_stack_lock_.CreateConditionVariable()),
207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
208 WaitableEvent::InitialState::NOT_SIGNALED),
209 #if DCHECK_IS_ON()
210 workers_created_(WaitableEvent::ResetPolicy::MANUAL,
211 WaitableEvent::InitialState::NOT_SIGNALED),
212 #endif
213 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
214 detach_duration_histogram_(Histogram::FactoryTimeGet(
215 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix,
216 TimeDelta::FromMilliseconds(1),
217 TimeDelta::FromHours(1),
218 50,
219 HistogramBase::kUmaTargetedHistogramFlag)),
220 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more
221 // than 1000 tasks before detaching, there is no need to know the exact
222 // number of tasks that ran.
223 num_tasks_before_detach_histogram_(Histogram::FactoryGet(
224 kNumTasksBeforeDetachHistogramPrefix + name_ + kPoolNameSuffix,
225 1,
226 1000,
227 50,
228 HistogramBase::kUmaTargetedHistogramFlag)),
229 // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is
230 // expected to run between zero and a few tens of tasks between waits.
231 // When it runs more than 100 tasks, there is no need to know the exact
232 // number of tasks that ran.
233 num_tasks_between_waits_histogram_(Histogram::FactoryGet(
234 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix,
235 1,
236 100,
237 50,
238 HistogramBase::kUmaTargetedHistogramFlag)),
239 task_tracker_(task_tracker),
240 delayed_task_manager_(delayed_task_manager) {
241 DCHECK(task_tracker_);
242 DCHECK(delayed_task_manager_);
243 }
244
245 void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) {
246 suggested_reclaim_time_ = params.suggested_reclaim_time();
247
248 std::vector<SchedulerWorker*> workers_to_wake_up;
249
250 {
251 // Get the number of enqueued Sequences to know how many workers to wake up
252 // at the end of this method. Keep the transaction alive to prevent new
253 // Sequences from being enqueued in the meantime.
254 auto priority_queue_transaction = shared_priority_queue_.BeginTransaction();
255 const size_t num_workers_to_wake_up = priority_queue_transaction->Size();
256
257 // The number of workers created alive is |num_workers_to_wake_up|, plus one
258 // if the standby thread policy is ONE.
259 const size_t num_alive_workers =
robliao 2017/04/05 23:29:23 Can do this computation at the end to avoid holdin
fdoray 2017/04/06 15:07:12 PQ lock is a predecessor of |idle_workers_stack_lo
260 num_workers_to_wake_up +
261 (params.standby_thread_policy() ==
262 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE
263 ? 1
264 : 0);
265
266 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
267
268 #if DCHECK_IS_ON()
269 DCHECK(!workers_created_.IsSignaled());
270 #endif
271
272 DCHECK(workers_.empty());
273 workers_.resize(params.max_threads());
274
275 // Create workers in reverse order of index so that the worker with the
276 // highest index is at the bottom of the idle stack.
277 for (int index = params.max_threads() - 1; index >= 0; --index) {
278 const SchedulerWorker::InitialState initial_state =
279 static_cast<size_t>(index) < num_alive_workers
280 ? SchedulerWorker::InitialState::ALIVE
281 : SchedulerWorker::InitialState::DETACHED;
282 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create(
283 params.priority_hint(),
284 MakeUnique<SchedulerWorkerDelegateImpl>(
285 this, re_enqueue_sequence_callback_, index),
286 task_tracker_, initial_state, params.backward_compatibility());
287 if (!worker)
288 break;
289
290 if (static_cast<size_t>(index) < num_workers_to_wake_up)
291 workers_to_wake_up.push_back(worker.get());
292 else
293 idle_workers_stack_.Push(worker.get());
294
295 workers_[index] = std::move(worker);
296 }
297
298 #if DCHECK_IS_ON()
299 workers_created_.Signal();
300 #endif
301
302 CHECK(!workers_.empty());
303 }
304
305 for (SchedulerWorker* worker : workers_to_wake_up)
306 worker->WakeUp();
307 }
308
195 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { 309 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
196 // SchedulerWorkerPool should never be deleted in production unless its 310 // SchedulerWorkerPool should never be deleted in production unless its
197 // initialization failed. 311 // initialization failed.
198 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); 312 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
199 } 313 }
200 314
201 // static
202 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
203 const SchedulerWorkerPoolParams& params,
204 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
205 TaskTracker* task_tracker,
206 DelayedTaskManager* delayed_task_manager) {
207 auto worker_pool = WrapUnique(
208 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager));
209 if (worker_pool->Initialize(params, re_enqueue_sequence_callback))
210 return worker_pool;
211 return nullptr;
212 }
213
214 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( 315 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
215 const TaskTraits& traits) { 316 const TaskTraits& traits) {
216 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); 317 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
217 } 318 }
218 319
219 scoped_refptr<SequencedTaskRunner> 320 scoped_refptr<SequencedTaskRunner>
220 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( 321 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits(
221 const TaskTraits& traits) { 322 const TaskTraits& traits) {
222 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); 323 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
223 } 324 }
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
293 } 394 }
294 } 395 }
295 396
296 void SchedulerWorkerPoolImpl::GetHistograms( 397 void SchedulerWorkerPoolImpl::GetHistograms(
297 std::vector<const HistogramBase*>* histograms) const { 398 std::vector<const HistogramBase*>* histograms) const {
298 histograms->push_back(detach_duration_histogram_); 399 histograms->push_back(detach_duration_histogram_);
299 histograms->push_back(num_tasks_between_waits_histogram_); 400 histograms->push_back(num_tasks_between_waits_histogram_);
300 } 401 }
301 402
302 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { 403 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const {
404 #if DCHECK_IS_ON()
405 DCHECK(workers_created_.IsSignaled());
406 #endif
303 return workers_.size(); 407 return workers_.size();
304 } 408 }
305 409
306 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { 410 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
411 #if DCHECK_IS_ON()
412 DCHECK(workers_created_.IsSignaled());
413 #endif
307 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 414 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
308 while (idle_workers_stack_.Size() < workers_.size()) 415 while (idle_workers_stack_.Size() < workers_.size())
309 idle_workers_stack_cv_for_testing_->Wait(); 416 idle_workers_stack_cv_for_testing_->Wait();
310 } 417 }
311 418
312 void SchedulerWorkerPoolImpl::JoinForTesting() { 419 void SchedulerWorkerPoolImpl::JoinForTesting() {
420 #if DCHECK_IS_ON()
421 DCHECK(workers_created_.IsSignaled());
422 #endif
313 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) 423 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max())
314 << "Workers can detach during join."; 424 << "Workers can detach during join.";
315 for (const auto& worker : workers_) 425 for (const auto& worker : workers_)
316 worker->JoinForTesting(); 426 worker->JoinForTesting();
317 427
318 DCHECK(!join_for_testing_returned_.IsSignaled()); 428 DCHECK(!join_for_testing_returned_.IsSignaled());
319 join_for_testing_returned_.Signal(); 429 join_for_testing_returned_.Signal();
320 } 430 }
321 431
322 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { 432 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() {
323 worker_detachment_disallowed_.Set(); 433 worker_detachment_disallowed_.Set();
324 } 434 }
325 435
326 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { 436 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
437 #if DCHECK_IS_ON()
438 DCHECK(workers_created_.IsSignaled());
439 #endif
327 size_t num_alive_workers = 0; 440 size_t num_alive_workers = 0;
328 for (const auto& worker : workers_) { 441 for (const auto& worker : workers_) {
329 if (worker->ThreadAliveForTesting()) 442 if (worker->ThreadAliveForTesting())
330 ++num_alive_workers; 443 ++num_alive_workers;
331 } 444 }
332 return num_alive_workers; 445 return num_alive_workers;
333 } 446 }
334 447
335 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 448 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
336 SchedulerWorkerDelegateImpl( 449 SchedulerWorkerDelegateImpl(
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
458 } 571 }
459 572
460 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { 573 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() {
461 DCHECK(!did_detach_since_last_get_work_); 574 DCHECK(!did_detach_since_last_get_work_);
462 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); 575 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_);
463 num_tasks_since_last_detach_ = 0; 576 num_tasks_since_last_detach_ = 0;
464 did_detach_since_last_get_work_ = true; 577 did_detach_since_last_get_work_ = true;
465 last_detach_time_ = TimeTicks::Now(); 578 last_detach_time_ = TimeTicks::Now();
466 } 579 }
467 580
468 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
469 const SchedulerWorkerPoolParams& params,
470 TaskTracker* task_tracker,
471 DelayedTaskManager* delayed_task_manager)
472 : name_(params.name()),
473 suggested_reclaim_time_(params.suggested_reclaim_time()),
474 idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
475 idle_workers_stack_cv_for_testing_(
476 idle_workers_stack_lock_.CreateConditionVariable()),
477 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
478 WaitableEvent::InitialState::NOT_SIGNALED),
479 #if DCHECK_IS_ON()
480 workers_created_(WaitableEvent::ResetPolicy::MANUAL,
481 WaitableEvent::InitialState::NOT_SIGNALED),
482 #endif
483 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
484 detach_duration_histogram_(Histogram::FactoryTimeGet(
485 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix,
486 TimeDelta::FromMilliseconds(1),
487 TimeDelta::FromHours(1),
488 50,
489 HistogramBase::kUmaTargetedHistogramFlag)),
490 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more
491 // than 1000 tasks before detaching, there is no need to know the exact
492 // number of tasks that ran.
493 num_tasks_before_detach_histogram_(Histogram::FactoryGet(
494 kNumTasksBeforeDetachHistogramPrefix + name_ + kPoolNameSuffix,
495 1,
496 1000,
497 50,
498 HistogramBase::kUmaTargetedHistogramFlag)),
499 // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is
500 // expected to run between zero and a few tens of tasks between waits.
501 // When it runs more than 100 tasks, there is no need to know the exact
502 // number of tasks that ran.
503 num_tasks_between_waits_histogram_(Histogram::FactoryGet(
504 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix,
505 1,
506 100,
507 50,
508 HistogramBase::kUmaTargetedHistogramFlag)),
509 task_tracker_(task_tracker),
510 delayed_task_manager_(delayed_task_manager) {
511 DCHECK(task_tracker_);
512 DCHECK(delayed_task_manager_);
513 }
514
515 bool SchedulerWorkerPoolImpl::Initialize(
516 const SchedulerWorkerPoolParams& params,
517 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
518 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
519
520 DCHECK(workers_.empty());
521 workers_.resize(params.max_threads());
522
523 // Create workers and push them to the idle stack in reverse order of index.
524 // This ensures that they are woken up in order of index and that the ALIVE
525 // worker is on top of the stack.
526 for (int index = params.max_threads() - 1; index >= 0; --index) {
527 const bool is_standby_lazy =
528 params.standby_thread_policy() ==
529 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY;
530 const SchedulerWorker::InitialState initial_state =
531 (index == 0 && !is_standby_lazy)
532 ? SchedulerWorker::InitialState::ALIVE
533 : SchedulerWorker::InitialState::DETACHED;
534 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create(
535 params.priority_hint(),
536 MakeUnique<SchedulerWorkerDelegateImpl>(
537 this, re_enqueue_sequence_callback, index),
538 task_tracker_, initial_state, params.backward_compatibility());
539 if (!worker)
540 break;
541 idle_workers_stack_.Push(worker.get());
542 workers_[index] = std::move(worker);
543 }
544
545 #if DCHECK_IS_ON()
546 workers_created_.Signal();
547 #endif
548
549 return !workers_.empty();
550 }
551
552 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { 581 void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
553 SchedulerWorker* worker; 582 SchedulerWorker* worker;
554 { 583 {
555 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 584 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
556 worker = idle_workers_stack_.Pop(); 585 worker = idle_workers_stack_.Pop();
557 } 586 }
558 if (worker) 587 if (worker)
559 worker->WakeUp(); 588 worker->WakeUp();
560 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding 589 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding
561 // hysteresis to the CanDetach check. See https://crbug.com/666041. 590 // hysteresis to the CanDetach check. See https://crbug.com/666041.
(...skipping 25 matching lines...) Expand all
587 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 616 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
588 idle_workers_stack_.Remove(worker); 617 idle_workers_stack_.Remove(worker);
589 } 618 }
590 619
591 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { 620 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
592 return !worker_detachment_disallowed_.IsSet(); 621 return !worker_detachment_disallowed_.IsSet();
593 } 622 }
594 623
595 } // namespace internal 624 } // namespace internal
596 } // namespace base 625 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698