| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 188 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 199 TaskTracker* task_tracker, | 199 TaskTracker* task_tracker, |
| 200 DelayedTaskManager* delayed_task_manager) | 200 DelayedTaskManager* delayed_task_manager) |
| 201 : name_(name), | 201 : name_(name), |
| 202 priority_hint_(priority_hint), | 202 priority_hint_(priority_hint), |
| 203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), | 203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), |
| 204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| 205 idle_workers_stack_cv_for_testing_( | 205 idle_workers_stack_cv_for_testing_( |
| 206 idle_workers_stack_lock_.CreateConditionVariable()), | 206 idle_workers_stack_lock_.CreateConditionVariable()), |
| 207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| 208 WaitableEvent::InitialState::NOT_SIGNALED), | 208 WaitableEvent::InitialState::NOT_SIGNALED), |
| 209 #if DCHECK_IS_ON() | |
| 210 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | |
| 211 WaitableEvent::InitialState::NOT_SIGNALED), | |
| 212 #endif | |
| 213 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | 209 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. |
| 214 detach_duration_histogram_(Histogram::FactoryTimeGet( | 210 detach_duration_histogram_(Histogram::FactoryTimeGet( |
| 215 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | 211 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, |
| 216 TimeDelta::FromMilliseconds(1), | 212 TimeDelta::FromMilliseconds(1), |
| 217 TimeDelta::FromHours(1), | 213 TimeDelta::FromHours(1), |
| 218 50, | 214 50, |
| 219 HistogramBase::kUmaTargetedHistogramFlag)), | 215 HistogramBase::kUmaTargetedHistogramFlag)), |
| 220 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more | 216 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more |
| 221 // than 1000 tasks before detaching, there is no need to know the exact | 217 // than 1000 tasks before detaching, there is no need to know the exact |
| 222 // number of tasks that ran. | 218 // number of tasks that ran. |
| (...skipping 15 matching lines...) Expand all Loading... |
| 238 HistogramBase::kUmaTargetedHistogramFlag)), | 234 HistogramBase::kUmaTargetedHistogramFlag)), |
| 239 task_tracker_(task_tracker), | 235 task_tracker_(task_tracker), |
| 240 delayed_task_manager_(delayed_task_manager) { | 236 delayed_task_manager_(delayed_task_manager) { |
| 241 DCHECK(task_tracker_); | 237 DCHECK(task_tracker_); |
| 242 DCHECK(delayed_task_manager_); | 238 DCHECK(delayed_task_manager_); |
| 243 } | 239 } |
| 244 | 240 |
| 245 void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { | 241 void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { |
| 246 suggested_reclaim_time_ = params.suggested_reclaim_time(); | 242 suggested_reclaim_time_ = params.suggested_reclaim_time(); |
| 247 | 243 |
| 248 std::vector<SchedulerWorker*> workers_to_wake_up; | |
| 249 | |
| 250 { | 244 { |
| 251 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 245 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 252 | 246 |
| 253 #if DCHECK_IS_ON() | 247 #if DCHECK_IS_ON() |
| 254 DCHECK(!workers_created_.IsSignaled()); | 248 DCHECK(!workers_created_.IsSet()); |
| 255 #endif | 249 #endif |
| 256 | 250 |
| 257 DCHECK(workers_.empty()); | 251 DCHECK(workers_.empty()); |
| 258 workers_.resize(params.max_threads()); | 252 workers_.resize(params.max_threads()); |
| 259 | 253 |
| 260 // The number of workers created alive is |num_wake_ups_before_start_|, plus | 254 // The number of workers created alive is |num_wake_ups_before_start_|, plus |
| 261 // one if the standby thread policy is ONE (in order to start with one alive | 255 // one if the standby thread policy is ONE (in order to start with one alive |
| 262 // idle worker). | 256 // idle worker). |
| 263 const int num_alive_workers = | 257 const int num_alive_workers = |
| 264 num_wake_ups_before_start_ + | 258 num_wake_ups_before_start_ + |
| 265 (params.standby_thread_policy() == | 259 (params.standby_thread_policy() == |
| 266 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE | 260 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE |
| 267 ? 1 | 261 ? 1 |
| 268 : 0); | 262 : 0); |
| 269 | 263 |
| 270 // Create workers in reverse order of index so that the worker with the | 264 // Create workers in reverse order of index so that the worker with the |
| 271 // highest index is at the bottom of the idle stack. | 265 // highest index is at the bottom of the idle stack. |
| 272 for (int index = params.max_threads() - 1; index >= 0; --index) { | 266 for (int index = params.max_threads() - 1; index >= 0; --index) { |
| 273 const SchedulerWorker::InitialState initial_state = | 267 workers_[index] = make_scoped_refptr(new SchedulerWorker( |
| 274 index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE | |
| 275 : SchedulerWorker::InitialState::DETACHED; | |
| 276 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | |
| 277 params.priority_hint(), | 268 params.priority_hint(), |
| 278 MakeUnique<SchedulerWorkerDelegateImpl>( | 269 MakeUnique<SchedulerWorkerDelegateImpl>( |
| 279 this, re_enqueue_sequence_callback_, index), | 270 this, re_enqueue_sequence_callback_, index), |
| 280 task_tracker_, initial_state, params.backward_compatibility()); | 271 task_tracker_, params.backward_compatibility(), |
| 281 if (!worker) | 272 index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE |
| 282 break; | 273 : SchedulerWorker::InitialState::DETACHED)); |
| 283 | 274 |
| 284 if (index < num_wake_ups_before_start_) | 275 // Put workers that won't be woken up at the end of this method on the |
| 285 workers_to_wake_up.push_back(worker.get()); | 276 // idle stack. |
| 286 else | 277 if (index >= num_wake_ups_before_start_) |
| 287 idle_workers_stack_.Push(worker.get()); | 278 idle_workers_stack_.Push(workers_[index].get()); |
| 288 | |
| 289 workers_[index] = std::move(worker); | |
| 290 } | 279 } |
| 291 | 280 |
| 292 #if DCHECK_IS_ON() | 281 #if DCHECK_IS_ON() |
| 293 workers_created_.Signal(); | 282 workers_created_.Set(); |
| 294 #endif | 283 #endif |
| 295 | |
| 296 CHECK(!workers_.empty()); | |
| 297 } | 284 } |
| 298 | 285 |
| 299 for (SchedulerWorker* worker : workers_to_wake_up) | 286 // Start all workers. CHECK that the first worker can be started (assume that |
| 300 worker->WakeUp(); | 287 // failure means that threads can't be created on this machine). Wake up one |
| 288 // worker for each wake up that occurred before Start(). |
| 289 for (size_t index = 0; index < workers_.size(); ++index) { |
| 290 const bool start_success = workers_[index]->Start(); |
| 291 CHECK(start_success || index > 0); |
| 292 if (static_cast<int>(index) < num_wake_ups_before_start_) |
| 293 workers_[index]->WakeUp(); |
| 294 } |
| 301 } | 295 } |
| 302 | 296 |
| 303 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 297 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
| 304 // SchedulerWorkerPool should never be deleted in production unless its | 298 // SchedulerWorkerPool should never be deleted in production unless its |
| 305 // initialization failed. | 299 // initialization failed. |
| 306 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 300 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
| 307 } | 301 } |
| 308 | 302 |
| 309 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 303 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| 310 const TaskTraits& traits) { | 304 const TaskTraits& traits) { |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 391 } | 385 } |
| 392 | 386 |
| 393 void SchedulerWorkerPoolImpl::GetHistograms( | 387 void SchedulerWorkerPoolImpl::GetHistograms( |
| 394 std::vector<const HistogramBase*>* histograms) const { | 388 std::vector<const HistogramBase*>* histograms) const { |
| 395 histograms->push_back(detach_duration_histogram_); | 389 histograms->push_back(detach_duration_histogram_); |
| 396 histograms->push_back(num_tasks_between_waits_histogram_); | 390 histograms->push_back(num_tasks_between_waits_histogram_); |
| 397 } | 391 } |
| 398 | 392 |
| 399 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { | 393 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { |
| 400 #if DCHECK_IS_ON() | 394 #if DCHECK_IS_ON() |
| 401 DCHECK(workers_created_.IsSignaled()); | 395 DCHECK(workers_created_.IsSet()); |
| 402 #endif | 396 #endif |
| 403 return workers_.size(); | 397 return workers_.size(); |
| 404 } | 398 } |
| 405 | 399 |
| 406 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 400 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| 407 #if DCHECK_IS_ON() | 401 #if DCHECK_IS_ON() |
| 408 DCHECK(workers_created_.IsSignaled()); | 402 DCHECK(workers_created_.IsSet()); |
| 409 #endif | 403 #endif |
| 410 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 404 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 411 while (idle_workers_stack_.Size() < workers_.size()) | 405 while (idle_workers_stack_.Size() < workers_.size()) |
| 412 idle_workers_stack_cv_for_testing_->Wait(); | 406 idle_workers_stack_cv_for_testing_->Wait(); |
| 413 } | 407 } |
| 414 | 408 |
| 415 void SchedulerWorkerPoolImpl::JoinForTesting() { | 409 void SchedulerWorkerPoolImpl::JoinForTesting() { |
| 416 #if DCHECK_IS_ON() | 410 #if DCHECK_IS_ON() |
| 417 DCHECK(workers_created_.IsSignaled()); | 411 DCHECK(workers_created_.IsSet()); |
| 418 #endif | 412 #endif |
| 419 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) | 413 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) |
| 420 << "Workers can detach during join."; | 414 << "Workers can detach during join."; |
| 421 for (const auto& worker : workers_) | 415 for (const auto& worker : workers_) |
| 422 worker->JoinForTesting(); | 416 worker->JoinForTesting(); |
| 423 | 417 |
| 424 DCHECK(!join_for_testing_returned_.IsSignaled()); | 418 DCHECK(!join_for_testing_returned_.IsSignaled()); |
| 425 join_for_testing_returned_.Signal(); | 419 join_for_testing_returned_.Signal(); |
| 426 } | 420 } |
| 427 | 421 |
| (...skipping 18 matching lines...) Expand all Loading... |
| 446 : outer_(outer), | 440 : outer_(outer), |
| 447 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 441 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| 448 index_(index) {} | 442 index_(index) {} |
| 449 | 443 |
| 450 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 444 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 451 ~SchedulerWorkerDelegateImpl() = default; | 445 ~SchedulerWorkerDelegateImpl() = default; |
| 452 | 446 |
| 453 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 447 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| 454 SchedulerWorker* worker) { | 448 SchedulerWorker* worker) { |
| 455 #if DCHECK_IS_ON() | 449 #if DCHECK_IS_ON() |
| 456 // Wait for |outer_->workers_created_| to avoid traversing | 450 DCHECK(outer_->workers_created_.IsSet()); |
| 457 // |outer_->workers_| while it is being filled by Initialize(). | |
| 458 outer_->workers_created_.Wait(); | |
| 459 DCHECK(ContainsWorker(outer_->workers_, worker)); | 451 DCHECK(ContainsWorker(outer_->workers_, worker)); |
| 460 #endif | 452 #endif |
| 461 | |
| 462 DCHECK_EQ(num_tasks_since_last_wait_, 0U); | 453 DCHECK_EQ(num_tasks_since_last_wait_, 0U); |
| 463 | 454 |
| 464 if (!last_detach_time_.is_null()) { | 455 if (!last_detach_time_.is_null()) { |
| 465 outer_->detach_duration_histogram_->AddTime(TimeTicks::Now() - | 456 outer_->detach_duration_histogram_->AddTime(TimeTicks::Now() - |
| 466 last_detach_time_); | 457 last_detach_time_); |
| 467 } | 458 } |
| 468 | 459 |
| 469 PlatformThread::SetName( | 460 PlatformThread::SetName( |
| 470 StringPrintf("TaskScheduler%sWorker%d", outer_->name_.c_str(), index_)); | 461 StringPrintf("TaskScheduler%sWorker%d", outer_->name_.c_str(), index_)); |
| 471 | 462 |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 570 did_detach_since_last_get_work_ = true; | 561 did_detach_since_last_get_work_ = true; |
| 571 last_detach_time_ = TimeTicks::Now(); | 562 last_detach_time_ = TimeTicks::Now(); |
| 572 } | 563 } |
| 573 | 564 |
| 574 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 565 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| 575 SchedulerWorker* worker = nullptr; | 566 SchedulerWorker* worker = nullptr; |
| 576 { | 567 { |
| 577 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 568 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 578 | 569 |
| 579 #if DCHECK_IS_ON() | 570 #if DCHECK_IS_ON() |
| 580 DCHECK_EQ(workers_.empty(), !workers_created_.IsSignaled()); | 571 DCHECK_EQ(workers_.empty(), !workers_created_.IsSet()); |
| 581 #endif | 572 #endif |
| 582 | 573 |
| 583 if (workers_.empty()) | 574 if (workers_.empty()) |
| 584 ++num_wake_ups_before_start_; | 575 ++num_wake_ups_before_start_; |
| 585 else | 576 else |
| 586 worker = idle_workers_stack_.Pop(); | 577 worker = idle_workers_stack_.Pop(); |
| 587 } | 578 } |
| 588 | 579 |
| 589 if (worker) | 580 if (worker) |
| 590 worker->WakeUp(); | 581 worker->WakeUp(); |
| (...skipping 27 matching lines...) Expand all Loading... |
| 618 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 609 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 619 idle_workers_stack_.Remove(worker); | 610 idle_workers_stack_.Remove(worker); |
| 620 } | 611 } |
| 621 | 612 |
| 622 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 613 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| 623 return !worker_detachment_disallowed_.IsSet(); | 614 return !worker_detachment_disallowed_.IsSet(); |
| 624 } | 615 } |
| 625 | 616 |
| 626 } // namespace internal | 617 } // namespace internal |
| 627 } // namespace base | 618 } // namespace base |
| OLD | NEW |