OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <utility> | 10 #include <utility> |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
136 return i.get() == worker; | 136 return i.get() == worker; |
137 }); | 137 }); |
138 return it != workers.end(); | 138 return it != workers.end(); |
139 } | 139 } |
140 | 140 |
141 } // namespace | 141 } // namespace |
142 | 142 |
143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
144 : public SchedulerWorker::Delegate { | 144 : public SchedulerWorker::Delegate { |
145 public: | 145 public: |
146 // |outer| owns the worker for which this delegate is constructed. | 146 // |outer| owns the worker for which this delegate is constructed. |index| |
147 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 147 // will be appended to the pool name to label the underlying worker threads. |
148 // called. |index| will be appended to the pool name to label the underlying | |
149 // worker threads. | |
150 SchedulerWorkerDelegateImpl( | 148 SchedulerWorkerDelegateImpl( |
151 SchedulerWorkerPoolImpl* outer, | 149 SchedulerWorkerPoolImpl* outer, |
152 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
153 int index); | 150 int index); |
154 ~SchedulerWorkerDelegateImpl() override; | 151 ~SchedulerWorkerDelegateImpl() override; |
155 | 152 |
156 // SchedulerWorker::Delegate: | 153 // SchedulerWorker::Delegate: |
157 void OnMainEntry(SchedulerWorker* worker) override; | 154 void OnMainEntry(SchedulerWorker* worker) override; |
158 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 155 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
159 void DidRunTask() override; | 156 void DidRunTask() override; |
160 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 157 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
161 TimeDelta GetSleepTimeout() override; | 158 TimeDelta GetSleepTimeout() override; |
162 bool CanDetach(SchedulerWorker* worker) override; | 159 bool CanDetach(SchedulerWorker* worker) override; |
163 void OnDetach() override; | 160 void OnDetach() override; |
164 | 161 |
165 private: | 162 private: |
166 SchedulerWorkerPoolImpl* outer_; | 163 SchedulerWorkerPoolImpl* outer_; |
167 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | |
168 | 164 |
169 // Time of the last detach. | 165 // Time of the last detach. |
170 TimeTicks last_detach_time_; | 166 TimeTicks last_detach_time_; |
171 | 167 |
172 // Time when GetWork() first returned nullptr. | 168 // Time when GetWork() first returned nullptr. |
173 TimeTicks idle_start_time_; | 169 TimeTicks idle_start_time_; |
174 | 170 |
175 // Indicates whether the last call to GetWork() returned nullptr. | 171 // Indicates whether the last call to GetWork() returned nullptr. |
176 bool last_get_work_returned_nullptr_ = false; | 172 bool last_get_work_returned_nullptr_ = false; |
177 | 173 |
(...skipping 10 matching lines...) Expand all Loading... |
188 size_t num_tasks_since_last_detach_ = 0; | 184 size_t num_tasks_since_last_detach_ = 0; |
189 | 185 |
190 const int index_; | 186 const int index_; |
191 | 187 |
192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 188 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
193 }; | 189 }; |
194 | 190 |
195 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 191 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
196 const std::string& name, | 192 const std::string& name, |
197 ThreadPriority priority_hint, | 193 ThreadPriority priority_hint, |
198 ReEnqueueSequenceCallback re_enqueue_sequence_callback, | |
199 TaskTracker* task_tracker, | 194 TaskTracker* task_tracker, |
200 DelayedTaskManager* delayed_task_manager) | 195 DelayedTaskManager* delayed_task_manager) |
201 : name_(name), | 196 : name_(name), |
202 priority_hint_(priority_hint), | 197 priority_hint_(priority_hint), |
203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), | |
204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 198 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
205 idle_workers_stack_cv_for_testing_( | 199 idle_workers_stack_cv_for_testing_( |
206 idle_workers_stack_lock_.CreateConditionVariable()), | 200 idle_workers_stack_lock_.CreateConditionVariable()), |
207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 201 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
208 WaitableEvent::InitialState::NOT_SIGNALED), | 202 WaitableEvent::InitialState::NOT_SIGNALED), |
209 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | 203 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. |
210 detach_duration_histogram_(Histogram::FactoryTimeGet( | 204 detach_duration_histogram_(Histogram::FactoryTimeGet( |
211 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | 205 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, |
212 TimeDelta::FromMilliseconds(1), | 206 TimeDelta::FromMilliseconds(1), |
213 TimeDelta::FromHours(1), | 207 TimeDelta::FromHours(1), |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
258 num_wake_ups_before_start_ + | 252 num_wake_ups_before_start_ + |
259 (params.standby_thread_policy() == | 253 (params.standby_thread_policy() == |
260 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE | 254 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE |
261 ? 1 | 255 ? 1 |
262 : 0); | 256 : 0); |
263 | 257 |
264 // Create workers in reverse order of index so that the worker with the | 258 // Create workers in reverse order of index so that the worker with the |
265 // highest index is at the bottom of the idle stack. | 259 // highest index is at the bottom of the idle stack. |
266 for (int index = params.max_threads() - 1; index >= 0; --index) { | 260 for (int index = params.max_threads() - 1; index >= 0; --index) { |
267 workers_[index] = make_scoped_refptr(new SchedulerWorker( | 261 workers_[index] = make_scoped_refptr(new SchedulerWorker( |
268 priority_hint_, | 262 priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index), |
269 MakeUnique<SchedulerWorkerDelegateImpl>( | |
270 this, re_enqueue_sequence_callback_, index), | |
271 task_tracker_, params.backward_compatibility(), | 263 task_tracker_, params.backward_compatibility(), |
272 index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE | 264 index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE |
273 : SchedulerWorker::InitialState::DETACHED)); | 265 : SchedulerWorker::InitialState::DETACHED)); |
274 | 266 |
275 // Put workers that won't be woken up at the end of this method on the | 267 // Put workers that won't be woken up at the end of this method on the |
276 // idle stack. | 268 // idle stack. |
277 if (index >= num_wake_ups_before_start_) | 269 if (index >= num_wake_ups_before_start_) |
278 idle_workers_stack_.Push(workers_[index].get()); | 270 idle_workers_stack_.Push(workers_[index].get()); |
279 } | 271 } |
280 | 272 |
(...skipping 23 matching lines...) Expand all Loading... |
304 const TaskTraits& traits) { | 296 const TaskTraits& traits) { |
305 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 297 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
306 } | 298 } |
307 | 299 |
308 scoped_refptr<SequencedTaskRunner> | 300 scoped_refptr<SequencedTaskRunner> |
309 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 301 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
310 const TaskTraits& traits) { | 302 const TaskTraits& traits) { |
311 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 303 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
312 } | 304 } |
313 | 305 |
314 void SchedulerWorkerPoolImpl::ReEnqueueSequence( | |
315 scoped_refptr<Sequence> sequence, | |
316 const SequenceSortKey& sequence_sort_key) { | |
317 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), | |
318 sequence_sort_key); | |
319 | |
320 // The thread calling this method just ran a Task from |sequence| and will | |
321 // soon try to get another Sequence from which to run a Task. If the thread | |
322 // belongs to this pool, it will get that Sequence from | |
323 // |shared_priority_queue_|. When that's the case, there is no need to wake up | |
324 // another worker after |sequence| is inserted in |shared_priority_queue_|. If | |
325 // we did wake up another worker, we would waste resources by having more | |
326 // workers trying to get a Sequence from |shared_priority_queue_| than the | |
327 // number of Sequences in it. | |
328 if (tls_current_worker_pool.Get().Get() != this) | |
329 WakeUpOneWorker(); | |
330 } | |
331 | |
332 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( | 306 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
333 std::unique_ptr<Task> task, | 307 std::unique_ptr<Task> task, |
334 scoped_refptr<Sequence> sequence) { | 308 scoped_refptr<Sequence> sequence) { |
335 DCHECK(task); | 309 DCHECK(task); |
336 DCHECK(sequence); | 310 DCHECK(sequence); |
337 | 311 |
338 if (!task_tracker_->WillPostTask(task.get())) | 312 if (!task_tracker_->WillPostTask(task.get())) |
339 return false; | 313 return false; |
340 | 314 |
341 if (task->delayed_run_time.is_null()) { | 315 if (task->delayed_run_time.is_null()) { |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
428 for (const auto& worker : workers_) { | 402 for (const auto& worker : workers_) { |
429 if (worker->ThreadAliveForTesting()) | 403 if (worker->ThreadAliveForTesting()) |
430 ++num_alive_workers; | 404 ++num_alive_workers; |
431 } | 405 } |
432 return num_alive_workers; | 406 return num_alive_workers; |
433 } | 407 } |
434 | 408 |
435 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 409 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
436 SchedulerWorkerDelegateImpl( | 410 SchedulerWorkerDelegateImpl( |
437 SchedulerWorkerPoolImpl* outer, | 411 SchedulerWorkerPoolImpl* outer, |
438 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
439 int index) | 412 int index) |
440 : outer_(outer), | 413 : outer_(outer), |
441 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | |
442 index_(index) {} | 414 index_(index) {} |
443 | 415 |
444 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 416 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
445 ~SchedulerWorkerDelegateImpl() = default; | 417 ~SchedulerWorkerDelegateImpl() = default; |
446 | 418 |
447 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 419 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
448 SchedulerWorker* worker) { | 420 SchedulerWorker* worker) { |
449 #if DCHECK_IS_ON() | 421 #if DCHECK_IS_ON() |
450 DCHECK(outer_->workers_created_.IsSet()); | 422 DCHECK(outer_->workers_created_.IsSet()); |
451 DCHECK(ContainsWorker(outer_->workers_, worker)); | 423 DCHECK(ContainsWorker(outer_->workers_, worker)); |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
527 return sequence; | 499 return sequence; |
528 } | 500 } |
529 | 501 |
530 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { | 502 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { |
531 ++num_tasks_since_last_wait_; | 503 ++num_tasks_since_last_wait_; |
532 ++num_tasks_since_last_detach_; | 504 ++num_tasks_since_last_detach_; |
533 } | 505 } |
534 | 506 |
535 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 507 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
536 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 508 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
537 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 509 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
538 // |sequence| must be enqueued. | 510 outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
539 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 511 sequence_sort_key); |
| 512 // The thread calling this method will soon call GetWork(). Therefore, there |
| 513 // is no need to wake up a worker to run the sequence that was just inserted |
| 514 // into |outer_->shared_priority_queue_|. |
540 } | 515 } |
541 | 516 |
542 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 517 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
543 GetSleepTimeout() { | 518 GetSleepTimeout() { |
544 return outer_->suggested_reclaim_time_; | 519 return outer_->suggested_reclaim_time_; |
545 } | 520 } |
546 | 521 |
547 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 522 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
548 SchedulerWorker* worker) { | 523 SchedulerWorker* worker) { |
549 const bool can_detach = | 524 const bool can_detach = |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
609 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 584 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
610 idle_workers_stack_.Remove(worker); | 585 idle_workers_stack_.Remove(worker); |
611 } | 586 } |
612 | 587 |
613 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 588 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
614 return !worker_detachment_disallowed_.IsSet(); | 589 return !worker_detachment_disallowed_.IsSet(); |
615 } | 590 } |
616 | 591 |
617 } // namespace internal | 592 } // namespace internal |
618 } // namespace base | 593 } // namespace base |
OLD | NEW |