OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <utility> | 10 #include <utility> |
(...skipping 188 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
199 TaskTracker* task_tracker, | 199 TaskTracker* task_tracker, |
200 DelayedTaskManager* delayed_task_manager) | 200 DelayedTaskManager* delayed_task_manager) |
201 : name_(name), | 201 : name_(name), |
202 priority_hint_(priority_hint), | 202 priority_hint_(priority_hint), |
203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), | 203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), |
204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
205 idle_workers_stack_cv_for_testing_( | 205 idle_workers_stack_cv_for_testing_( |
206 idle_workers_stack_lock_.CreateConditionVariable()), | 206 idle_workers_stack_lock_.CreateConditionVariable()), |
207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
208 WaitableEvent::InitialState::NOT_SIGNALED), | 208 WaitableEvent::InitialState::NOT_SIGNALED), |
209 #if DCHECK_IS_ON() | |
210 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | |
211 WaitableEvent::InitialState::NOT_SIGNALED), | |
212 #endif | |
213 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | 209 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. |
214 detach_duration_histogram_(Histogram::FactoryTimeGet( | 210 detach_duration_histogram_(Histogram::FactoryTimeGet( |
215 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | 211 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, |
216 TimeDelta::FromMilliseconds(1), | 212 TimeDelta::FromMilliseconds(1), |
217 TimeDelta::FromHours(1), | 213 TimeDelta::FromHours(1), |
218 50, | 214 50, |
219 HistogramBase::kUmaTargetedHistogramFlag)), | 215 HistogramBase::kUmaTargetedHistogramFlag)), |
220 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more | 216 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more |
221 // than 1000 tasks before detaching, there is no need to know the exact | 217 // than 1000 tasks before detaching, there is no need to know the exact |
222 // number of tasks that ran. | 218 // number of tasks that ran. |
(...skipping 15 matching lines...) Expand all Loading... |
238 HistogramBase::kUmaTargetedHistogramFlag)), | 234 HistogramBase::kUmaTargetedHistogramFlag)), |
239 task_tracker_(task_tracker), | 235 task_tracker_(task_tracker), |
240 delayed_task_manager_(delayed_task_manager) { | 236 delayed_task_manager_(delayed_task_manager) { |
241 DCHECK(task_tracker_); | 237 DCHECK(task_tracker_); |
242 DCHECK(delayed_task_manager_); | 238 DCHECK(delayed_task_manager_); |
243 } | 239 } |
244 | 240 |
245 void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { | 241 void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { |
246 suggested_reclaim_time_ = params.suggested_reclaim_time(); | 242 suggested_reclaim_time_ = params.suggested_reclaim_time(); |
247 | 243 |
248 std::vector<SchedulerWorker*> workers_to_wake_up; | |
249 | |
250 { | 244 { |
251 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 245 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
252 | 246 |
253 #if DCHECK_IS_ON() | 247 #if DCHECK_IS_ON() |
254 DCHECK(!workers_created_.IsSignaled()); | 248 DCHECK(!workers_created_.IsSet()); |
255 #endif | 249 #endif |
256 | 250 |
257 DCHECK(workers_.empty()); | 251 DCHECK(workers_.empty()); |
258 workers_.resize(params.max_threads()); | 252 workers_.resize(params.max_threads()); |
259 | 253 |
260 // The number of workers created alive is |num_wake_ups_before_start_|, plus | |
261 // one if the standby thread policy is ONE (in order to start with one alive | |
262 // idle worker). | |
263 const int num_alive_workers = | |
264 num_wake_ups_before_start_ + | |
265 (params.standby_thread_policy() == | |
266 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE | |
267 ? 1 | |
268 : 0); | |
269 | |
270 // Create workers in reverse order of index so that the worker with the | 254 // Create workers in reverse order of index so that the worker with the |
271 // highest index is at the bottom of the idle stack. | 255 // highest index is at the bottom of the idle stack. |
272 for (int index = params.max_threads() - 1; index >= 0; --index) { | 256 for (int index = params.max_threads() - 1; index >= 0; --index) { |
273 const SchedulerWorker::InitialState initial_state = | 257 workers_[index] = make_scoped_refptr( |
274 index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE | 258 new SchedulerWorker(params.priority_hint(), |
275 : SchedulerWorker::InitialState::DETACHED; | 259 MakeUnique<SchedulerWorkerDelegateImpl>( |
276 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | 260 this, re_enqueue_sequence_callback_, index), |
277 params.priority_hint(), | 261 task_tracker_, params.backward_compatibility())); |
278 MakeUnique<SchedulerWorkerDelegateImpl>( | |
279 this, re_enqueue_sequence_callback_, index), | |
280 task_tracker_, initial_state, params.backward_compatibility()); | |
281 if (!worker) | |
282 break; | |
283 | 262 |
284 if (index < num_wake_ups_before_start_) | 263 if (index >= num_wake_ups_before_start_) |
285 workers_to_wake_up.push_back(worker.get()); | 264 idle_workers_stack_.Push(workers_[index].get()); |
286 else | |
287 idle_workers_stack_.Push(worker.get()); | |
288 | |
289 workers_[index] = std::move(worker); | |
290 } | 265 } |
291 | 266 |
292 #if DCHECK_IS_ON() | 267 #if DCHECK_IS_ON() |
293 workers_created_.Signal(); | 268 workers_created_.Set(); |
294 #endif | 269 #endif |
295 | |
296 CHECK(!workers_.empty()); | |
297 } | 270 } |
298 | 271 |
299 for (SchedulerWorker* worker : workers_to_wake_up) | 272 // The number of workers created alive is |num_wake_ups_before_start_|, plus |
300 worker->WakeUp(); | 273 // one if the standby thread policy is ONE (in order to start with one alive |
| 274 // idle worker). |
| 275 const int num_alive_workers = |
| 276 num_wake_ups_before_start_ + |
| 277 (params.standby_thread_policy() == |
| 278 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE |
| 279 ? 1 |
| 280 : 0); |
| 281 |
| 282 // Start workers. |
| 283 for (int index = 0; index < params.max_threads(); ++index) { |
| 284 workers_[index]->Start(index < num_alive_workers |
| 285 ? SchedulerWorker::InitialState::ALIVE |
| 286 : SchedulerWorker::InitialState::DETACHED); |
| 287 if (index < num_wake_ups_before_start_) |
| 288 workers_[index]->WakeUp(); |
| 289 } |
301 } | 290 } |
302 | 291 |
303 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 292 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
304 // SchedulerWorkerPool should never be deleted in production unless its | 293 // SchedulerWorkerPool should never be deleted in production unless its |
305 // initialization failed. | 294 // initialization failed. |
306 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 295 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
307 } | 296 } |
308 | 297 |
309 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 298 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
310 const TaskTraits& traits) { | 299 const TaskTraits& traits) { |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
389 } | 378 } |
390 | 379 |
391 void SchedulerWorkerPoolImpl::GetHistograms( | 380 void SchedulerWorkerPoolImpl::GetHistograms( |
392 std::vector<const HistogramBase*>* histograms) const { | 381 std::vector<const HistogramBase*>* histograms) const { |
393 histograms->push_back(detach_duration_histogram_); | 382 histograms->push_back(detach_duration_histogram_); |
394 histograms->push_back(num_tasks_between_waits_histogram_); | 383 histograms->push_back(num_tasks_between_waits_histogram_); |
395 } | 384 } |
396 | 385 |
397 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { | 386 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { |
398 #if DCHECK_IS_ON() | 387 #if DCHECK_IS_ON() |
399 DCHECK(workers_created_.IsSignaled()); | 388 DCHECK(workers_created_.IsSet()); |
400 #endif | 389 #endif |
401 return workers_.size(); | 390 return workers_.size(); |
402 } | 391 } |
403 | 392 |
404 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 393 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
405 #if DCHECK_IS_ON() | 394 #if DCHECK_IS_ON() |
406 DCHECK(workers_created_.IsSignaled()); | 395 DCHECK(workers_created_.IsSet()); |
407 #endif | 396 #endif |
408 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 397 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
409 while (idle_workers_stack_.Size() < workers_.size()) | 398 while (idle_workers_stack_.Size() < workers_.size()) |
410 idle_workers_stack_cv_for_testing_->Wait(); | 399 idle_workers_stack_cv_for_testing_->Wait(); |
411 } | 400 } |
412 | 401 |
413 void SchedulerWorkerPoolImpl::JoinForTesting() { | 402 void SchedulerWorkerPoolImpl::JoinForTesting() { |
414 #if DCHECK_IS_ON() | 403 #if DCHECK_IS_ON() |
415 DCHECK(workers_created_.IsSignaled()); | 404 DCHECK(workers_created_.IsSet()); |
416 #endif | 405 #endif |
417 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) | 406 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) |
418 << "Workers can detach during join."; | 407 << "Workers can detach during join."; |
419 for (const auto& worker : workers_) | 408 for (const auto& worker : workers_) |
420 worker->JoinForTesting(); | 409 worker->JoinForTesting(); |
421 | 410 |
422 DCHECK(!join_for_testing_returned_.IsSignaled()); | 411 DCHECK(!join_for_testing_returned_.IsSignaled()); |
423 join_for_testing_returned_.Signal(); | 412 join_for_testing_returned_.Signal(); |
424 } | 413 } |
425 | 414 |
(...skipping 18 matching lines...) Expand all Loading... |
444 : outer_(outer), | 433 : outer_(outer), |
445 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 434 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
446 index_(index) {} | 435 index_(index) {} |
447 | 436 |
448 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 437 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
449 ~SchedulerWorkerDelegateImpl() = default; | 438 ~SchedulerWorkerDelegateImpl() = default; |
450 | 439 |
451 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 440 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
452 SchedulerWorker* worker) { | 441 SchedulerWorker* worker) { |
453 #if DCHECK_IS_ON() | 442 #if DCHECK_IS_ON() |
454 // Wait for |outer_->workers_created_| to avoid traversing | 443 DCHECK(outer_->workers_created_.IsSet()); |
455 // |outer_->workers_| while it is being filled by Initialize(). | |
456 outer_->workers_created_.Wait(); | |
457 DCHECK(ContainsWorker(outer_->workers_, worker)); | 444 DCHECK(ContainsWorker(outer_->workers_, worker)); |
458 #endif | 445 #endif |
459 | |
460 DCHECK_EQ(num_tasks_since_last_wait_, 0U); | 446 DCHECK_EQ(num_tasks_since_last_wait_, 0U); |
461 | 447 |
462 if (!last_detach_time_.is_null()) { | 448 if (!last_detach_time_.is_null()) { |
463 outer_->detach_duration_histogram_->AddTime(TimeTicks::Now() - | 449 outer_->detach_duration_histogram_->AddTime(TimeTicks::Now() - |
464 last_detach_time_); | 450 last_detach_time_); |
465 } | 451 } |
466 | 452 |
467 PlatformThread::SetName( | 453 PlatformThread::SetName( |
468 StringPrintf("TaskScheduler%sWorker%d", outer_->name_.c_str(), index_)); | 454 StringPrintf("TaskScheduler%sWorker%d", outer_->name_.c_str(), index_)); |
469 | 455 |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
568 did_detach_since_last_get_work_ = true; | 554 did_detach_since_last_get_work_ = true; |
569 last_detach_time_ = TimeTicks::Now(); | 555 last_detach_time_ = TimeTicks::Now(); |
570 } | 556 } |
571 | 557 |
572 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 558 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
573 SchedulerWorker* worker = nullptr; | 559 SchedulerWorker* worker = nullptr; |
574 { | 560 { |
575 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 561 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
576 | 562 |
577 #if DCHECK_IS_ON() | 563 #if DCHECK_IS_ON() |
578 DCHECK_EQ(workers_.empty(), !workers_created_.IsSignaled()); | 564 DCHECK_EQ(workers_.empty(), !workers_created_.IsSet()); |
579 #endif | 565 #endif |
580 | 566 |
581 if (workers_.empty()) | 567 if (workers_.empty()) |
582 ++num_wake_ups_before_start_; | 568 ++num_wake_ups_before_start_; |
583 else | 569 else |
584 worker = idle_workers_stack_.Pop(); | 570 worker = idle_workers_stack_.Pop(); |
585 } | 571 } |
586 | 572 |
587 if (worker) | 573 if (worker) |
588 worker->WakeUp(); | 574 worker->WakeUp(); |
(...skipping 27 matching lines...) Expand all Loading... |
616 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 602 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
617 idle_workers_stack_.Remove(worker); | 603 idle_workers_stack_.Remove(worker); |
618 } | 604 } |
619 | 605 |
620 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 606 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
621 return !worker_detachment_disallowed_.IsSet(); | 607 return !worker_detachment_disallowed_.IsSet(); |
622 } | 608 } |
623 | 609 |
624 } // namespace internal | 610 } // namespace internal |
625 } // namespace base | 611 } // namespace base |
OLD | NEW |