| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 136 return i.get() == worker; | 136 return i.get() == worker; |
| 137 }); | 137 }); |
| 138 return it != workers.end(); | 138 return it != workers.end(); |
| 139 } | 139 } |
| 140 | 140 |
| 141 } // namespace | 141 } // namespace |
| 142 | 142 |
| 143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| 144 : public SchedulerWorker::Delegate { | 144 : public SchedulerWorker::Delegate { |
| 145 public: | 145 public: |
| 146 // |outer| owns the worker for which this delegate is constructed. | 146 // |outer| owns the worker for which this delegate is constructed. |index| |
| 147 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 147 // will be appended to the pool name to label the underlying worker threads. |
| 148 // called. |index| will be appended to the pool name to label the underlying | |
| 149 // worker threads. | |
| 150 SchedulerWorkerDelegateImpl( | 148 SchedulerWorkerDelegateImpl( |
| 151 SchedulerWorkerPoolImpl* outer, | 149 SchedulerWorkerPoolImpl* outer, |
| 152 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
| 153 int index); | 150 int index); |
| 154 ~SchedulerWorkerDelegateImpl() override; | 151 ~SchedulerWorkerDelegateImpl() override; |
| 155 | 152 |
| 156 // SchedulerWorker::Delegate: | 153 // SchedulerWorker::Delegate: |
| 157 void OnMainEntry(SchedulerWorker* worker) override; | 154 void OnMainEntry(SchedulerWorker* worker) override; |
| 158 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 155 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
| 159 void DidRunTask() override; | 156 void DidRunTask() override; |
| 160 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 157 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
| 161 TimeDelta GetSleepTimeout() override; | 158 TimeDelta GetSleepTimeout() override; |
| 162 bool CanDetach(SchedulerWorker* worker) override; | 159 bool CanDetach(SchedulerWorker* worker) override; |
| 163 void OnDetach() override; | 160 void OnDetach() override; |
| 164 | 161 |
| 165 private: | 162 private: |
| 166 SchedulerWorkerPoolImpl* outer_; | 163 SchedulerWorkerPoolImpl* outer_; |
| 167 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | |
| 168 | 164 |
| 169 // Time of the last detach. | 165 // Time of the last detach. |
| 170 TimeTicks last_detach_time_; | 166 TimeTicks last_detach_time_; |
| 171 | 167 |
| 172 // Time when GetWork() first returned nullptr. | 168 // Time when GetWork() first returned nullptr. |
| 173 TimeTicks idle_start_time_; | 169 TimeTicks idle_start_time_; |
| 174 | 170 |
| 175 // Indicates whether the last call to GetWork() returned nullptr. | 171 // Indicates whether the last call to GetWork() returned nullptr. |
| 176 bool last_get_work_returned_nullptr_ = false; | 172 bool last_get_work_returned_nullptr_ = false; |
| 177 | 173 |
| (...skipping 10 matching lines...) Expand all Loading... |
| 188 size_t num_tasks_since_last_detach_ = 0; | 184 size_t num_tasks_since_last_detach_ = 0; |
| 189 | 185 |
| 190 const int index_; | 186 const int index_; |
| 191 | 187 |
| 192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 188 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| 193 }; | 189 }; |
| 194 | 190 |
| 195 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 191 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| 196 const std::string& name, | 192 const std::string& name, |
| 197 ThreadPriority priority_hint, | 193 ThreadPriority priority_hint, |
| 198 ReEnqueueSequenceCallback re_enqueue_sequence_callback, | |
| 199 TaskTracker* task_tracker, | 194 TaskTracker* task_tracker, |
| 200 DelayedTaskManager* delayed_task_manager) | 195 DelayedTaskManager* delayed_task_manager) |
| 201 : name_(name), | 196 : name_(name), |
| 202 priority_hint_(priority_hint), | 197 priority_hint_(priority_hint), |
| 203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), | |
| 204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 198 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| 205 idle_workers_stack_cv_for_testing_( | 199 idle_workers_stack_cv_for_testing_( |
| 206 idle_workers_stack_lock_.CreateConditionVariable()), | 200 idle_workers_stack_lock_.CreateConditionVariable()), |
| 207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 201 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| 208 WaitableEvent::InitialState::NOT_SIGNALED), | 202 WaitableEvent::InitialState::NOT_SIGNALED), |
| 209 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | 203 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. |
| 210 detach_duration_histogram_(Histogram::FactoryTimeGet( | 204 detach_duration_histogram_(Histogram::FactoryTimeGet( |
| 211 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | 205 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, |
| 212 TimeDelta::FromMilliseconds(1), | 206 TimeDelta::FromMilliseconds(1), |
| 213 TimeDelta::FromHours(1), | 207 TimeDelta::FromHours(1), |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 247 #if DCHECK_IS_ON() | 241 #if DCHECK_IS_ON() |
| 248 DCHECK(!workers_created_.IsSet()); | 242 DCHECK(!workers_created_.IsSet()); |
| 249 #endif | 243 #endif |
| 250 | 244 |
| 251 DCHECK(workers_.empty()); | 245 DCHECK(workers_.empty()); |
| 252 workers_.resize(params.max_threads()); | 246 workers_.resize(params.max_threads()); |
| 253 | 247 |
| 254 // Create workers in reverse order of index so that the worker with the | 248 // Create workers in reverse order of index so that the worker with the |
| 255 // highest index is at the bottom of the idle stack. | 249 // highest index is at the bottom of the idle stack. |
| 256 for (int index = params.max_threads() - 1; index >= 0; --index) { | 250 for (int index = params.max_threads() - 1; index >= 0; --index) { |
| 257 workers_[index] = make_scoped_refptr( | 251 workers_[index] = make_scoped_refptr(new SchedulerWorker( |
| 258 new SchedulerWorker(priority_hint_, | 252 priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index), |
| 259 MakeUnique<SchedulerWorkerDelegateImpl>( | 253 task_tracker_, params.backward_compatibility())); |
| 260 this, re_enqueue_sequence_callback_, index), | |
| 261 task_tracker_, params.backward_compatibility())); | |
| 262 | |
| 263 if (index >= num_wake_ups_before_start_) | 254 if (index >= num_wake_ups_before_start_) |
| 264 idle_workers_stack_.Push(workers_[index].get()); | 255 idle_workers_stack_.Push(workers_[index].get()); |
| 265 } | 256 } |
| 266 | 257 |
| 267 #if DCHECK_IS_ON() | 258 #if DCHECK_IS_ON() |
| 268 workers_created_.Set(); | 259 workers_created_.Set(); |
| 269 #endif | 260 #endif |
| 270 } | 261 } |
| 271 | 262 |
| 272 // The number of workers created alive is |num_wake_ups_before_start_|, plus | 263 // The number of workers created alive is |num_wake_ups_before_start_|, plus |
| (...skipping 26 matching lines...) Expand all Loading... |
| 299 const TaskTraits& traits) { | 290 const TaskTraits& traits) { |
| 300 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 291 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
| 301 } | 292 } |
| 302 | 293 |
| 303 scoped_refptr<SequencedTaskRunner> | 294 scoped_refptr<SequencedTaskRunner> |
| 304 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 295 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
| 305 const TaskTraits& traits) { | 296 const TaskTraits& traits) { |
| 306 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 297 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| 307 } | 298 } |
| 308 | 299 |
| 309 void SchedulerWorkerPoolImpl::ReEnqueueSequence( | |
| 310 scoped_refptr<Sequence> sequence, | |
| 311 const SequenceSortKey& sequence_sort_key) { | |
| 312 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), | |
| 313 sequence_sort_key); | |
| 314 | |
| 315 // The thread calling this method just ran a Task from |sequence| and will | |
| 316 // soon try to get another Sequence from which to run a Task. If the thread | |
| 317 // belongs to this pool, it will get that Sequence from | |
| 318 // |shared_priority_queue_|. When that's the case, there is no need to wake up | |
| 319 // another worker after |sequence| is inserted in |shared_priority_queue_|. If | |
| 320 // we did wake up another worker, we would waste resources by having more | |
| 321 // workers trying to get a Sequence from |shared_priority_queue_| than the | |
| 322 // number of Sequences in it. | |
| 323 if (tls_current_worker_pool.Get().Get() != this) | |
| 324 WakeUpOneWorker(); | |
| 325 } | |
| 326 | |
| 327 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( | 300 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
| 328 std::unique_ptr<Task> task, | 301 std::unique_ptr<Task> task, |
| 329 scoped_refptr<Sequence> sequence) { | 302 scoped_refptr<Sequence> sequence) { |
| 330 DCHECK(task); | 303 DCHECK(task); |
| 331 DCHECK(sequence); | 304 DCHECK(sequence); |
| 332 | 305 |
| 333 if (!task_tracker_->WillPostTask(task.get())) | 306 if (!task_tracker_->WillPostTask(task.get())) |
| 334 return false; | 307 return false; |
| 335 | 308 |
| 336 if (task->delayed_run_time.is_null()) { | 309 if (task->delayed_run_time.is_null()) { |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 421 for (const auto& worker : workers_) { | 394 for (const auto& worker : workers_) { |
| 422 if (worker->ThreadAliveForTesting()) | 395 if (worker->ThreadAliveForTesting()) |
| 423 ++num_alive_workers; | 396 ++num_alive_workers; |
| 424 } | 397 } |
| 425 return num_alive_workers; | 398 return num_alive_workers; |
| 426 } | 399 } |
| 427 | 400 |
| 428 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 401 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 429 SchedulerWorkerDelegateImpl( | 402 SchedulerWorkerDelegateImpl( |
| 430 SchedulerWorkerPoolImpl* outer, | 403 SchedulerWorkerPoolImpl* outer, |
| 431 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
| 432 int index) | 404 int index) |
| 433 : outer_(outer), | 405 : outer_(outer), |
| 434 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | |
| 435 index_(index) {} | 406 index_(index) {} |
| 436 | 407 |
| 437 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 408 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 438 ~SchedulerWorkerDelegateImpl() = default; | 409 ~SchedulerWorkerDelegateImpl() = default; |
| 439 | 410 |
| 440 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 411 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| 441 SchedulerWorker* worker) { | 412 SchedulerWorker* worker) { |
| 442 #if DCHECK_IS_ON() | 413 #if DCHECK_IS_ON() |
| 443 DCHECK(outer_->workers_created_.IsSet()); | 414 DCHECK(outer_->workers_created_.IsSet()); |
| 444 DCHECK(ContainsWorker(outer_->workers_, worker)); | 415 DCHECK(ContainsWorker(outer_->workers_, worker)); |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 520 return sequence; | 491 return sequence; |
| 521 } | 492 } |
| 522 | 493 |
| 523 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { | 494 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { |
| 524 ++num_tasks_since_last_wait_; | 495 ++num_tasks_since_last_wait_; |
| 525 ++num_tasks_since_last_detach_; | 496 ++num_tasks_since_last_detach_; |
| 526 } | 497 } |
| 527 | 498 |
| 528 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 499 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 529 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 500 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
| 530 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 501 const SequenceSortKey sort_key = sequence->GetSortKey(); |
| 531 // |sequence| must be enqueued. | 502 outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
| 532 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 503 sort_key); |
| 504 // No need to wake up a worker. The current worker will soon call GetWork(). |
| 533 } | 505 } |
| 534 | 506 |
| 535 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 507 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 536 GetSleepTimeout() { | 508 GetSleepTimeout() { |
| 537 return outer_->suggested_reclaim_time_; | 509 return outer_->suggested_reclaim_time_; |
| 538 } | 510 } |
| 539 | 511 |
| 540 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 512 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| 541 SchedulerWorker* worker) { | 513 SchedulerWorker* worker) { |
| 542 const bool can_detach = | 514 const bool can_detach = |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 602 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 574 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 603 idle_workers_stack_.Remove(worker); | 575 idle_workers_stack_.Remove(worker); |
| 604 } | 576 } |
| 605 | 577 |
| 606 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 578 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| 607 return !worker_detachment_disallowed_.IsSet(); | 579 return !worker_detachment_disallowed_.IsSet(); |
| 608 } | 580 } |
| 609 | 581 |
| 610 } // namespace internal | 582 } // namespace internal |
| 611 } // namespace base | 583 } // namespace base |
| OLD | NEW |