OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <utility> | 10 #include <utility> |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
136 return i.get() == worker; | 136 return i.get() == worker; |
137 }); | 137 }); |
138 return it != workers.end(); | 138 return it != workers.end(); |
139 } | 139 } |
140 | 140 |
141 } // namespace | 141 } // namespace |
142 | 142 |
143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
144 : public SchedulerWorker::Delegate { | 144 : public SchedulerWorker::Delegate { |
145 public: | 145 public: |
146 // |outer| owns the worker for which this delegate is constructed. | 146 // |outer| owns the worker for which this delegate is constructed. |index| |
147 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 147 // will be appended to the pool name to label the underlying worker threads. |
148 // called. |index| will be appended to the pool name to label the underlying | |
149 // worker threads. | |
150 SchedulerWorkerDelegateImpl( | 148 SchedulerWorkerDelegateImpl( |
151 SchedulerWorkerPoolImpl* outer, | 149 SchedulerWorkerPoolImpl* outer, |
152 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
153 int index); | 150 int index); |
154 ~SchedulerWorkerDelegateImpl() override; | 151 ~SchedulerWorkerDelegateImpl() override; |
155 | 152 |
156 // SchedulerWorker::Delegate: | 153 // SchedulerWorker::Delegate: |
157 void OnMainEntry(SchedulerWorker* worker) override; | 154 void OnMainEntry(SchedulerWorker* worker) override; |
158 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 155 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
159 void DidRunTask() override; | 156 void DidRunTask() override; |
160 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 157 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
161 TimeDelta GetSleepTimeout() override; | 158 TimeDelta GetSleepTimeout() override; |
162 bool CanDetach(SchedulerWorker* worker) override; | 159 bool CanDetach(SchedulerWorker* worker) override; |
163 void OnDetach() override; | 160 void OnDetach() override; |
164 | 161 |
165 private: | 162 private: |
166 SchedulerWorkerPoolImpl* outer_; | 163 SchedulerWorkerPoolImpl* outer_; |
167 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | |
168 | 164 |
169 // Time of the last detach. | 165 // Time of the last detach. |
170 TimeTicks last_detach_time_; | 166 TimeTicks last_detach_time_; |
171 | 167 |
172 // Time when GetWork() first returned nullptr. | 168 // Time when GetWork() first returned nullptr. |
173 TimeTicks idle_start_time_; | 169 TimeTicks idle_start_time_; |
174 | 170 |
175 // Indicates whether the last call to GetWork() returned nullptr. | 171 // Indicates whether the last call to GetWork() returned nullptr. |
176 bool last_get_work_returned_nullptr_ = false; | 172 bool last_get_work_returned_nullptr_ = false; |
177 | 173 |
(...skipping 10 matching lines...) Expand all Loading... |
188 size_t num_tasks_since_last_detach_ = 0; | 184 size_t num_tasks_since_last_detach_ = 0; |
189 | 185 |
190 const int index_; | 186 const int index_; |
191 | 187 |
192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 188 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
193 }; | 189 }; |
194 | 190 |
195 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 191 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
196 const std::string& name, | 192 const std::string& name, |
197 ThreadPriority priority_hint, | 193 ThreadPriority priority_hint, |
198 ReEnqueueSequenceCallback re_enqueue_sequence_callback, | |
199 TaskTracker* task_tracker, | 194 TaskTracker* task_tracker, |
200 DelayedTaskManager* delayed_task_manager) | 195 DelayedTaskManager* delayed_task_manager) |
201 : name_(name), | 196 : name_(name), |
202 priority_hint_(priority_hint), | 197 priority_hint_(priority_hint), |
203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), | |
204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 198 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
205 idle_workers_stack_cv_for_testing_( | 199 idle_workers_stack_cv_for_testing_( |
206 idle_workers_stack_lock_.CreateConditionVariable()), | 200 idle_workers_stack_lock_.CreateConditionVariable()), |
207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 201 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
208 WaitableEvent::InitialState::NOT_SIGNALED), | 202 WaitableEvent::InitialState::NOT_SIGNALED), |
209 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | 203 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. |
210 detach_duration_histogram_(Histogram::FactoryTimeGet( | 204 detach_duration_histogram_(Histogram::FactoryTimeGet( |
211 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | 205 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, |
212 TimeDelta::FromMilliseconds(1), | 206 TimeDelta::FromMilliseconds(1), |
213 TimeDelta::FromHours(1), | 207 TimeDelta::FromHours(1), |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
247 #if DCHECK_IS_ON() | 241 #if DCHECK_IS_ON() |
248 DCHECK(!workers_created_.IsSet()); | 242 DCHECK(!workers_created_.IsSet()); |
249 #endif | 243 #endif |
250 | 244 |
251 DCHECK(workers_.empty()); | 245 DCHECK(workers_.empty()); |
252 workers_.resize(params.max_threads()); | 246 workers_.resize(params.max_threads()); |
253 | 247 |
254 // Create workers in reverse order of index so that the worker with the | 248 // Create workers in reverse order of index so that the worker with the |
255 // highest index is at the bottom of the idle stack. | 249 // highest index is at the bottom of the idle stack. |
256 for (int index = params.max_threads() - 1; index >= 0; --index) { | 250 for (int index = params.max_threads() - 1; index >= 0; --index) { |
257 workers_[index] = make_scoped_refptr( | 251 workers_[index] = make_scoped_refptr(new SchedulerWorker( |
258 new SchedulerWorker(priority_hint_, | 252 priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index), |
259 MakeUnique<SchedulerWorkerDelegateImpl>( | 253 task_tracker_, params.backward_compatibility())); |
260 this, re_enqueue_sequence_callback_, index), | |
261 task_tracker_, params.backward_compatibility())); | |
262 | |
263 if (index >= num_wake_ups_before_start_) | 254 if (index >= num_wake_ups_before_start_) |
264 idle_workers_stack_.Push(workers_[index].get()); | 255 idle_workers_stack_.Push(workers_[index].get()); |
265 } | 256 } |
266 | 257 |
267 #if DCHECK_IS_ON() | 258 #if DCHECK_IS_ON() |
268 workers_created_.Set(); | 259 workers_created_.Set(); |
269 #endif | 260 #endif |
270 } | 261 } |
271 | 262 |
272 // The number of workers created alive is |num_wake_ups_before_start_|, plus | 263 // The number of workers created alive is |num_wake_ups_before_start_|, plus |
(...skipping 26 matching lines...) Expand all Loading... |
299 const TaskTraits& traits) { | 290 const TaskTraits& traits) { |
300 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 291 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
301 } | 292 } |
302 | 293 |
303 scoped_refptr<SequencedTaskRunner> | 294 scoped_refptr<SequencedTaskRunner> |
304 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 295 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
305 const TaskTraits& traits) { | 296 const TaskTraits& traits) { |
306 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 297 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
307 } | 298 } |
308 | 299 |
309 void SchedulerWorkerPoolImpl::ReEnqueueSequence( | |
310 scoped_refptr<Sequence> sequence, | |
311 const SequenceSortKey& sequence_sort_key) { | |
312 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), | |
313 sequence_sort_key); | |
314 | |
315 // The thread calling this method just ran a Task from |sequence| and will | |
316 // soon try to get another Sequence from which to run a Task. If the thread | |
317 // belongs to this pool, it will get that Sequence from | |
318 // |shared_priority_queue_|. When that's the case, there is no need to wake up | |
319 // another worker after |sequence| is inserted in |shared_priority_queue_|. If | |
320 // we did wake up another worker, we would waste resources by having more | |
321 // workers trying to get a Sequence from |shared_priority_queue_| than the | |
322 // number of Sequences in it. | |
323 if (tls_current_worker_pool.Get().Get() != this) | |
324 WakeUpOneWorker(); | |
325 } | |
326 | |
327 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( | 300 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
328 std::unique_ptr<Task> task, | 301 std::unique_ptr<Task> task, |
329 scoped_refptr<Sequence> sequence) { | 302 scoped_refptr<Sequence> sequence) { |
330 DCHECK(task); | 303 DCHECK(task); |
331 DCHECK(sequence); | 304 DCHECK(sequence); |
332 | 305 |
333 if (!task_tracker_->WillPostTask(task.get())) | 306 if (!task_tracker_->WillPostTask(task.get())) |
334 return false; | 307 return false; |
335 | 308 |
336 if (task->delayed_run_time.is_null()) { | 309 if (task->delayed_run_time.is_null()) { |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
421 for (const auto& worker : workers_) { | 394 for (const auto& worker : workers_) { |
422 if (worker->ThreadAliveForTesting()) | 395 if (worker->ThreadAliveForTesting()) |
423 ++num_alive_workers; | 396 ++num_alive_workers; |
424 } | 397 } |
425 return num_alive_workers; | 398 return num_alive_workers; |
426 } | 399 } |
427 | 400 |
428 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 401 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
429 SchedulerWorkerDelegateImpl( | 402 SchedulerWorkerDelegateImpl( |
430 SchedulerWorkerPoolImpl* outer, | 403 SchedulerWorkerPoolImpl* outer, |
431 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
432 int index) | 404 int index) |
433 : outer_(outer), | 405 : outer_(outer), |
434 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | |
435 index_(index) {} | 406 index_(index) {} |
436 | 407 |
437 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 408 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
438 ~SchedulerWorkerDelegateImpl() = default; | 409 ~SchedulerWorkerDelegateImpl() = default; |
439 | 410 |
440 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 411 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
441 SchedulerWorker* worker) { | 412 SchedulerWorker* worker) { |
442 #if DCHECK_IS_ON() | 413 #if DCHECK_IS_ON() |
443 DCHECK(outer_->workers_created_.IsSet()); | 414 DCHECK(outer_->workers_created_.IsSet()); |
444 DCHECK(ContainsWorker(outer_->workers_, worker)); | 415 DCHECK(ContainsWorker(outer_->workers_, worker)); |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
520 return sequence; | 491 return sequence; |
521 } | 492 } |
522 | 493 |
523 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { | 494 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { |
524 ++num_tasks_since_last_wait_; | 495 ++num_tasks_since_last_wait_; |
525 ++num_tasks_since_last_detach_; | 496 ++num_tasks_since_last_detach_; |
526 } | 497 } |
527 | 498 |
528 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 499 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
529 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 500 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
530 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 501 const SequenceSortKey sort_key = sequence->GetSortKey(); |
531 // |sequence| must be enqueued. | 502 outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
532 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 503 sort_key); |
| 504 // No need to wake up a worker. The current worker will soon call GetWork(). |
533 } | 505 } |
534 | 506 |
535 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 507 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
536 GetSleepTimeout() { | 508 GetSleepTimeout() { |
537 return outer_->suggested_reclaim_time_; | 509 return outer_->suggested_reclaim_time_; |
538 } | 510 } |
539 | 511 |
540 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 512 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
541 SchedulerWorker* worker) { | 513 SchedulerWorker* worker) { |
542 const bool can_detach = | 514 const bool can_detach = |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
602 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 574 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
603 idle_workers_stack_.Remove(worker); | 575 idle_workers_stack_.Remove(worker); |
604 } | 576 } |
605 | 577 |
606 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 578 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
607 return !worker_detachment_disallowed_.IsSet(); | 579 return !worker_detachment_disallowed_.IsSet(); |
608 } | 580 } |
609 | 581 |
610 } // namespace internal | 582 } // namespace internal |
611 } // namespace base | 583 } // namespace base |
OLD | NEW |