Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 185 | 185 |
| 186 // Number of tasks executed since the last time the | 186 // Number of tasks executed since the last time the |
| 187 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. | 187 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. |
| 188 size_t num_tasks_since_last_detach_ = 0; | 188 size_t num_tasks_since_last_detach_ = 0; |
| 189 | 189 |
| 190 const int index_; | 190 const int index_; |
| 191 | 191 |
| 192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| 193 }; | 193 }; |
| 194 | 194 |
| 195 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | |
| 196 const std::string& name, | |
| 197 ThreadPriority priority_hint, | |
| 198 ReEnqueueSequenceCallback re_enqueue_sequence_callback, | |
| 199 TaskTracker* task_tracker, | |
| 200 DelayedTaskManager* delayed_task_manager) | |
| 201 : name_(name), | |
| 202 priority_hint_(priority_hint), | |
| 203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), | |
| 204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | |
| 205 idle_workers_stack_cv_for_testing_( | |
| 206 idle_workers_stack_lock_.CreateConditionVariable()), | |
| 207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | |
| 208 WaitableEvent::InitialState::NOT_SIGNALED), | |
| 209 #if DCHECK_IS_ON() | |
| 210 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | |
| 211 WaitableEvent::InitialState::NOT_SIGNALED), | |
| 212 #endif | |
| 213 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | |
| 214 detach_duration_histogram_(Histogram::FactoryTimeGet( | |
| 215 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | |
| 216 TimeDelta::FromMilliseconds(1), | |
| 217 TimeDelta::FromHours(1), | |
| 218 50, | |
| 219 HistogramBase::kUmaTargetedHistogramFlag)), | |
| 220 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more | |
| 221 // than 1000 tasks before detaching, there is no need to know the exact | |
| 222 // number of tasks that ran. | |
| 223 num_tasks_before_detach_histogram_(Histogram::FactoryGet( | |
| 224 kNumTasksBeforeDetachHistogramPrefix + name_ + kPoolNameSuffix, | |
| 225 1, | |
| 226 1000, | |
| 227 50, | |
| 228 HistogramBase::kUmaTargetedHistogramFlag)), | |
| 229 // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is | |
| 230 // expected to run between zero and a few tens of tasks between waits. | |
| 231 // When it runs more than 100 tasks, there is no need to know the exact | |
| 232 // number of tasks that ran. | |
| 233 num_tasks_between_waits_histogram_(Histogram::FactoryGet( | |
| 234 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, | |
| 235 1, | |
| 236 100, | |
| 237 50, | |
| 238 HistogramBase::kUmaTargetedHistogramFlag)), | |
| 239 task_tracker_(task_tracker), | |
| 240 delayed_task_manager_(delayed_task_manager) { | |
| 241 DCHECK(task_tracker_); | |
| 242 DCHECK(delayed_task_manager_); | |
| 243 } | |
| 244 | |
| 245 void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { | |
| 246 suggested_reclaim_time_ = params.suggested_reclaim_time(); | |
| 247 | |
| 248 std::vector<SchedulerWorker*> workers_to_wake_up; | |
| 249 | |
| 250 { | |
| 251 // Get the number of enqueued Sequences to know how many workers to wake up | |
| 252 // at the end of this method. Keep the transaction alive to prevent new | |
| 253 // Sequences from being enqueued in the meantime. | |
| 254 auto priority_queue_transaction = shared_priority_queue_.BeginTransaction(); | |
| 255 const size_t num_workers_to_wake_up = priority_queue_transaction->Size(); | |
| 256 | |
| 257 // The number of workers created alive is |num_workers_to_wake_up|, plus one | |
| 258 // if the standby thread policy is ONE. | |
| 259 const size_t num_alive_workers = | |
|
robliao
2017/04/05 23:29:23
Can do this computation at the end to avoid holdin
fdoray
2017/04/06 15:07:12
PQ lock is a predecessor of |idle_workers_stack_lo
| |
| 260 num_workers_to_wake_up + | |
| 261 (params.standby_thread_policy() == | |
| 262 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE | |
| 263 ? 1 | |
| 264 : 0); | |
| 265 | |
| 266 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | |
| 267 | |
| 268 #if DCHECK_IS_ON() | |
| 269 DCHECK(!workers_created_.IsSignaled()); | |
| 270 #endif | |
| 271 | |
| 272 DCHECK(workers_.empty()); | |
| 273 workers_.resize(params.max_threads()); | |
| 274 | |
| 275 // Create workers in reverse order of index so that the worker with the | |
| 276 // highest index is at the bottom of the idle stack. | |
| 277 for (int index = params.max_threads() - 1; index >= 0; --index) { | |
| 278 const SchedulerWorker::InitialState initial_state = | |
| 279 static_cast<size_t>(index) < num_alive_workers | |
| 280 ? SchedulerWorker::InitialState::ALIVE | |
| 281 : SchedulerWorker::InitialState::DETACHED; | |
| 282 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | |
| 283 params.priority_hint(), | |
| 284 MakeUnique<SchedulerWorkerDelegateImpl>( | |
| 285 this, re_enqueue_sequence_callback_, index), | |
| 286 task_tracker_, initial_state, params.backward_compatibility()); | |
| 287 if (!worker) | |
| 288 break; | |
| 289 | |
| 290 if (static_cast<size_t>(index) < num_workers_to_wake_up) | |
| 291 workers_to_wake_up.push_back(worker.get()); | |
| 292 else | |
| 293 idle_workers_stack_.Push(worker.get()); | |
| 294 | |
| 295 workers_[index] = std::move(worker); | |
| 296 } | |
| 297 | |
| 298 #if DCHECK_IS_ON() | |
| 299 workers_created_.Signal(); | |
| 300 #endif | |
| 301 | |
| 302 CHECK(!workers_.empty()); | |
| 303 } | |
| 304 | |
| 305 for (SchedulerWorker* worker : workers_to_wake_up) | |
| 306 worker->WakeUp(); | |
| 307 } | |
| 308 | |
| 195 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 309 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
| 196 // SchedulerWorkerPool should never be deleted in production unless its | 310 // SchedulerWorkerPool should never be deleted in production unless its |
| 197 // initialization failed. | 311 // initialization failed. |
| 198 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 312 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
| 199 } | 313 } |
| 200 | 314 |
| 201 // static | |
| 202 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( | |
| 203 const SchedulerWorkerPoolParams& params, | |
| 204 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
| 205 TaskTracker* task_tracker, | |
| 206 DelayedTaskManager* delayed_task_manager) { | |
| 207 auto worker_pool = WrapUnique( | |
| 208 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager)); | |
| 209 if (worker_pool->Initialize(params, re_enqueue_sequence_callback)) | |
| 210 return worker_pool; | |
| 211 return nullptr; | |
| 212 } | |
| 213 | |
| 214 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 315 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| 215 const TaskTraits& traits) { | 316 const TaskTraits& traits) { |
| 216 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 317 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
| 217 } | 318 } |
| 218 | 319 |
| 219 scoped_refptr<SequencedTaskRunner> | 320 scoped_refptr<SequencedTaskRunner> |
| 220 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 321 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
| 221 const TaskTraits& traits) { | 322 const TaskTraits& traits) { |
| 222 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 323 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| 223 } | 324 } |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 293 } | 394 } |
| 294 } | 395 } |
| 295 | 396 |
| 296 void SchedulerWorkerPoolImpl::GetHistograms( | 397 void SchedulerWorkerPoolImpl::GetHistograms( |
| 297 std::vector<const HistogramBase*>* histograms) const { | 398 std::vector<const HistogramBase*>* histograms) const { |
| 298 histograms->push_back(detach_duration_histogram_); | 399 histograms->push_back(detach_duration_histogram_); |
| 299 histograms->push_back(num_tasks_between_waits_histogram_); | 400 histograms->push_back(num_tasks_between_waits_histogram_); |
| 300 } | 401 } |
| 301 | 402 |
| 302 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { | 403 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { |
| 404 #if DCHECK_IS_ON() | |
| 405 DCHECK(workers_created_.IsSignaled()); | |
| 406 #endif | |
| 303 return workers_.size(); | 407 return workers_.size(); |
| 304 } | 408 } |
| 305 | 409 |
| 306 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 410 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| 411 #if DCHECK_IS_ON() | |
| 412 DCHECK(workers_created_.IsSignaled()); | |
| 413 #endif | |
| 307 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 414 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 308 while (idle_workers_stack_.Size() < workers_.size()) | 415 while (idle_workers_stack_.Size() < workers_.size()) |
| 309 idle_workers_stack_cv_for_testing_->Wait(); | 416 idle_workers_stack_cv_for_testing_->Wait(); |
| 310 } | 417 } |
| 311 | 418 |
| 312 void SchedulerWorkerPoolImpl::JoinForTesting() { | 419 void SchedulerWorkerPoolImpl::JoinForTesting() { |
| 420 #if DCHECK_IS_ON() | |
| 421 DCHECK(workers_created_.IsSignaled()); | |
| 422 #endif | |
| 313 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) | 423 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) |
| 314 << "Workers can detach during join."; | 424 << "Workers can detach during join."; |
| 315 for (const auto& worker : workers_) | 425 for (const auto& worker : workers_) |
| 316 worker->JoinForTesting(); | 426 worker->JoinForTesting(); |
| 317 | 427 |
| 318 DCHECK(!join_for_testing_returned_.IsSignaled()); | 428 DCHECK(!join_for_testing_returned_.IsSignaled()); |
| 319 join_for_testing_returned_.Signal(); | 429 join_for_testing_returned_.Signal(); |
| 320 } | 430 } |
| 321 | 431 |
| 322 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { | 432 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
| 323 worker_detachment_disallowed_.Set(); | 433 worker_detachment_disallowed_.Set(); |
| 324 } | 434 } |
| 325 | 435 |
| 326 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { | 436 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { |
| 437 #if DCHECK_IS_ON() | |
| 438 DCHECK(workers_created_.IsSignaled()); | |
| 439 #endif | |
| 327 size_t num_alive_workers = 0; | 440 size_t num_alive_workers = 0; |
| 328 for (const auto& worker : workers_) { | 441 for (const auto& worker : workers_) { |
| 329 if (worker->ThreadAliveForTesting()) | 442 if (worker->ThreadAliveForTesting()) |
| 330 ++num_alive_workers; | 443 ++num_alive_workers; |
| 331 } | 444 } |
| 332 return num_alive_workers; | 445 return num_alive_workers; |
| 333 } | 446 } |
| 334 | 447 |
| 335 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 448 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 336 SchedulerWorkerDelegateImpl( | 449 SchedulerWorkerDelegateImpl( |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 458 } | 571 } |
| 459 | 572 |
| 460 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { | 573 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { |
| 461 DCHECK(!did_detach_since_last_get_work_); | 574 DCHECK(!did_detach_since_last_get_work_); |
| 462 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); | 575 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); |
| 463 num_tasks_since_last_detach_ = 0; | 576 num_tasks_since_last_detach_ = 0; |
| 464 did_detach_since_last_get_work_ = true; | 577 did_detach_since_last_get_work_ = true; |
| 465 last_detach_time_ = TimeTicks::Now(); | 578 last_detach_time_ = TimeTicks::Now(); |
| 466 } | 579 } |
| 467 | 580 |
| 468 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | |
| 469 const SchedulerWorkerPoolParams& params, | |
| 470 TaskTracker* task_tracker, | |
| 471 DelayedTaskManager* delayed_task_manager) | |
| 472 : name_(params.name()), | |
| 473 suggested_reclaim_time_(params.suggested_reclaim_time()), | |
| 474 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | |
| 475 idle_workers_stack_cv_for_testing_( | |
| 476 idle_workers_stack_lock_.CreateConditionVariable()), | |
| 477 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | |
| 478 WaitableEvent::InitialState::NOT_SIGNALED), | |
| 479 #if DCHECK_IS_ON() | |
| 480 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | |
| 481 WaitableEvent::InitialState::NOT_SIGNALED), | |
| 482 #endif | |
| 483 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | |
| 484 detach_duration_histogram_(Histogram::FactoryTimeGet( | |
| 485 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | |
| 486 TimeDelta::FromMilliseconds(1), | |
| 487 TimeDelta::FromHours(1), | |
| 488 50, | |
| 489 HistogramBase::kUmaTargetedHistogramFlag)), | |
| 490 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more | |
| 491 // than 1000 tasks before detaching, there is no need to know the exact | |
| 492 // number of tasks that ran. | |
| 493 num_tasks_before_detach_histogram_(Histogram::FactoryGet( | |
| 494 kNumTasksBeforeDetachHistogramPrefix + name_ + kPoolNameSuffix, | |
| 495 1, | |
| 496 1000, | |
| 497 50, | |
| 498 HistogramBase::kUmaTargetedHistogramFlag)), | |
| 499 // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is | |
| 500 // expected to run between zero and a few tens of tasks between waits. | |
| 501 // When it runs more than 100 tasks, there is no need to know the exact | |
| 502 // number of tasks that ran. | |
| 503 num_tasks_between_waits_histogram_(Histogram::FactoryGet( | |
| 504 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, | |
| 505 1, | |
| 506 100, | |
| 507 50, | |
| 508 HistogramBase::kUmaTargetedHistogramFlag)), | |
| 509 task_tracker_(task_tracker), | |
| 510 delayed_task_manager_(delayed_task_manager) { | |
| 511 DCHECK(task_tracker_); | |
| 512 DCHECK(delayed_task_manager_); | |
| 513 } | |
| 514 | |
| 515 bool SchedulerWorkerPoolImpl::Initialize( | |
| 516 const SchedulerWorkerPoolParams& params, | |
| 517 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | |
| 518 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | |
| 519 | |
| 520 DCHECK(workers_.empty()); | |
| 521 workers_.resize(params.max_threads()); | |
| 522 | |
| 523 // Create workers and push them to the idle stack in reverse order of index. | |
| 524 // This ensures that they are woken up in order of index and that the ALIVE | |
| 525 // worker is on top of the stack. | |
| 526 for (int index = params.max_threads() - 1; index >= 0; --index) { | |
| 527 const bool is_standby_lazy = | |
| 528 params.standby_thread_policy() == | |
| 529 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY; | |
| 530 const SchedulerWorker::InitialState initial_state = | |
| 531 (index == 0 && !is_standby_lazy) | |
| 532 ? SchedulerWorker::InitialState::ALIVE | |
| 533 : SchedulerWorker::InitialState::DETACHED; | |
| 534 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | |
| 535 params.priority_hint(), | |
| 536 MakeUnique<SchedulerWorkerDelegateImpl>( | |
| 537 this, re_enqueue_sequence_callback, index), | |
| 538 task_tracker_, initial_state, params.backward_compatibility()); | |
| 539 if (!worker) | |
| 540 break; | |
| 541 idle_workers_stack_.Push(worker.get()); | |
| 542 workers_[index] = std::move(worker); | |
| 543 } | |
| 544 | |
| 545 #if DCHECK_IS_ON() | |
| 546 workers_created_.Signal(); | |
| 547 #endif | |
| 548 | |
| 549 return !workers_.empty(); | |
| 550 } | |
| 551 | |
| 552 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 581 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| 553 SchedulerWorker* worker; | 582 SchedulerWorker* worker; |
| 554 { | 583 { |
| 555 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 584 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 556 worker = idle_workers_stack_.Pop(); | 585 worker = idle_workers_stack_.Pop(); |
| 557 } | 586 } |
| 558 if (worker) | 587 if (worker) |
| 559 worker->WakeUp(); | 588 worker->WakeUp(); |
| 560 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding | 589 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding |
| 561 // hysteresis to the CanDetach check. See https://crbug.com/666041. | 590 // hysteresis to the CanDetach check. See https://crbug.com/666041. |
| (...skipping 25 matching lines...) Expand all Loading... | |
| 587 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 616 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 588 idle_workers_stack_.Remove(worker); | 617 idle_workers_stack_.Remove(worker); |
| 589 } | 618 } |
| 590 | 619 |
| 591 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 620 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| 592 return !worker_detachment_disallowed_.IsSet(); | 621 return !worker_detachment_disallowed_.IsSet(); |
| 593 } | 622 } |
| 594 | 623 |
| 595 } // namespace internal | 624 } // namespace internal |
| 596 } // namespace base | 625 } // namespace base |
| OLD | NEW |