OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | |
8 | |
9 #include <algorithm> | 7 #include <algorithm> |
10 #include <utility> | 8 #include <utility> |
11 | 9 |
12 #include "base/bind.h" | 10 #include "base/bind.h" |
13 #include "base/bind_helpers.h" | 11 #include "base/bind_helpers.h" |
14 #include "base/lazy_instance.h" | 12 #include "base/lazy_instance.h" |
15 #include "base/memory/ptr_util.h" | 13 #include "base/memory/ptr_util.h" |
16 #include "base/sequenced_task_runner.h" | 14 #include "base/sequenced_task_runner.h" |
17 #include "base/single_thread_task_runner.h" | 15 #include "base/single_thread_task_runner.h" |
18 #include "base/strings/stringprintf.h" | |
19 #include "base/task_scheduler/delayed_task_manager.h" | 16 #include "base/task_scheduler/delayed_task_manager.h" |
20 #include "base/task_scheduler/task_tracker.h" | 17 #include "base/task_scheduler/task_tracker.h" |
21 #include "base/threading/platform_thread.h" | |
22 #include "base/threading/thread_local.h" | 18 #include "base/threading/thread_local.h" |
23 #include "base/threading/thread_restrictions.h" | 19 #include "base/threading/thread_restrictions.h" |
24 | 20 |
25 namespace base { | 21 namespace base { |
26 namespace internal { | 22 namespace internal { |
27 | 23 |
28 namespace { | 24 namespace { |
29 | 25 |
30 // SchedulerThreadPool that owns the current thread, if any. | 26 // SchedulerThreadPool that owns the current thread, if any. |
31 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky | 27 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky |
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
177 | 173 |
178 } // namespace | 174 } // namespace |
179 | 175 |
180 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl | 176 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
181 : public SchedulerWorkerThread::Delegate { | 177 : public SchedulerWorkerThread::Delegate { |
182 public: | 178 public: |
183 // |outer| owns the worker thread for which this delegate is constructed. | 179 // |outer| owns the worker thread for which this delegate is constructed. |
184 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 180 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
185 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 181 // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
186 // PriorityQueue whose transactions may overlap with the worker thread's | 182 // PriorityQueue whose transactions may overlap with the worker thread's |
187 // single-threaded PriorityQueue's transactions. |index| will be appended to | 183 // single-threaded PriorityQueue's transactions. |
188 // this thread's name to uniquely identify it. | |
189 SchedulerWorkerThreadDelegateImpl( | 184 SchedulerWorkerThreadDelegateImpl( |
190 SchedulerThreadPoolImpl* outer, | 185 SchedulerThreadPoolImpl* outer, |
191 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 186 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
192 const PriorityQueue* shared_priority_queue, | 187 const PriorityQueue* shared_priority_queue); |
193 int index); | |
194 ~SchedulerWorkerThreadDelegateImpl() override; | 188 ~SchedulerWorkerThreadDelegateImpl() override; |
195 | 189 |
196 PriorityQueue* single_threaded_priority_queue() { | 190 PriorityQueue* single_threaded_priority_queue() { |
197 return &single_threaded_priority_queue_; | 191 return &single_threaded_priority_queue_; |
198 } | 192 } |
199 | 193 |
200 // SchedulerWorkerThread::Delegate: | 194 // SchedulerWorkerThread::Delegate: |
201 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; | 195 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; |
202 scoped_refptr<Sequence> GetWork( | 196 scoped_refptr<Sequence> GetWork( |
203 SchedulerWorkerThread* worker_thread) override; | 197 SchedulerWorkerThread* worker_thread) override; |
204 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 198 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
205 | 199 |
206 private: | 200 private: |
207 SchedulerThreadPoolImpl* outer_; | 201 SchedulerThreadPoolImpl* outer_; |
208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 202 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
209 | 203 |
210 // Single-threaded PriorityQueue for the worker thread. | 204 // Single-threaded PriorityQueue for the worker thread. |
211 PriorityQueue single_threaded_priority_queue_; | 205 PriorityQueue single_threaded_priority_queue_; |
212 | 206 |
213 // True if the last Sequence returned by GetWork() was extracted from | 207 // True if the last Sequence returned by GetWork() was extracted from |
214 // |single_threaded_priority_queue_|. | 208 // |single_threaded_priority_queue_|. |
215 bool last_sequence_is_single_threaded_ = false; | 209 bool last_sequence_is_single_threaded_ = false; |
216 | 210 |
217 const int index_; | |
218 | |
219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); | 211 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
220 }; | 212 }; |
221 | 213 |
222 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { | 214 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { |
223 // SchedulerThreadPool should never be deleted in production unless its | 215 // SchedulerThreadPool should never be deleted in production unless its |
224 // initialization failed. | 216 // initialization failed. |
225 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); | 217 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
226 } | 218 } |
227 | 219 |
228 // static | 220 // static |
229 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( | 221 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( |
230 StringPiece name, | |
231 ThreadPriority thread_priority, | 222 ThreadPriority thread_priority, |
232 size_t max_threads, | 223 size_t max_threads, |
233 IORestriction io_restriction, | 224 IORestriction io_restriction, |
234 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 225 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
235 TaskTracker* task_tracker, | 226 TaskTracker* task_tracker, |
236 DelayedTaskManager* delayed_task_manager) { | 227 DelayedTaskManager* delayed_task_manager) { |
237 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( | 228 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( |
238 new SchedulerThreadPoolImpl(name, io_restriction, task_tracker, | 229 new SchedulerThreadPoolImpl(io_restriction, task_tracker, |
239 delayed_task_manager)); | 230 delayed_task_manager)); |
240 if (thread_pool->Initialize(thread_priority, max_threads, | 231 if (thread_pool->Initialize(thread_priority, max_threads, |
241 re_enqueue_sequence_callback)) { | 232 re_enqueue_sequence_callback)) { |
242 return thread_pool; | 233 return thread_pool; |
243 } | 234 } |
244 return nullptr; | 235 return nullptr; |
245 } | 236 } |
246 | 237 |
247 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { | 238 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { |
248 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 239 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
370 worker_thread->WakeUp(); | 361 worker_thread->WakeUp(); |
371 else | 362 else |
372 WakeUpOneThread(); | 363 WakeUpOneThread(); |
373 } | 364 } |
374 } | 365 } |
375 | 366 |
376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 367 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
377 SchedulerWorkerThreadDelegateImpl( | 368 SchedulerWorkerThreadDelegateImpl( |
378 SchedulerThreadPoolImpl* outer, | 369 SchedulerThreadPoolImpl* outer, |
379 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 370 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
380 const PriorityQueue* shared_priority_queue, | 371 const PriorityQueue* shared_priority_queue) |
381 int index) | |
382 : outer_(outer), | 372 : outer_(outer), |
383 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 373 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
384 single_threaded_priority_queue_(shared_priority_queue), | 374 single_threaded_priority_queue_(shared_priority_queue) {} |
385 index_(index) {} | |
386 | 375 |
387 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
388 ~SchedulerWorkerThreadDelegateImpl() = default; | 377 ~SchedulerWorkerThreadDelegateImpl() = default; |
389 | 378 |
390 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( | 379 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( |
391 SchedulerWorkerThread* worker_thread) { | 380 SchedulerWorkerThread* worker_thread) { |
392 #if DCHECK_IS_ON() | 381 #if DCHECK_IS_ON() |
393 // Wait for |outer_->threads_created_| to avoid traversing | 382 // Wait for |outer_->threads_created_| to avoid traversing |
394 // |outer_->worker_threads_| while it is being filled by Initialize(). | 383 // |outer_->worker_threads_| while it is being filled by Initialize(). |
395 outer_->threads_created_.Wait(); | 384 outer_->threads_created_.Wait(); |
396 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); | 385 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
397 #endif | 386 #endif |
398 | 387 |
399 PlatformThread::SetName( | |
400 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); | |
401 | |
402 DCHECK(!tls_current_worker_thread.Get().Get()); | 388 DCHECK(!tls_current_worker_thread.Get().Get()); |
403 DCHECK(!tls_current_thread_pool.Get().Get()); | 389 DCHECK(!tls_current_thread_pool.Get().Get()); |
404 tls_current_worker_thread.Get().Set(worker_thread); | 390 tls_current_worker_thread.Get().Set(worker_thread); |
405 tls_current_thread_pool.Get().Set(outer_); | 391 tls_current_thread_pool.Get().Set(outer_); |
406 | 392 |
407 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == | 393 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
408 IORestriction::ALLOWED); | 394 IORestriction::ALLOWED); |
409 } | 395 } |
410 | 396 |
411 scoped_refptr<Sequence> | 397 scoped_refptr<Sequence> |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
473 single_threaded_priority_queue_.BeginTransaction()->Push( | 459 single_threaded_priority_queue_.BeginTransaction()->Push( |
474 std::move(sequence), sequence_sort_key); | 460 std::move(sequence), sequence_sort_key); |
475 } else { | 461 } else { |
476 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 462 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
477 // |sequence| must be enqueued. | 463 // |sequence| must be enqueued. |
478 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 464 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
479 } | 465 } |
480 } | 466 } |
481 | 467 |
482 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( | 468 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
483 StringPiece name, | |
484 IORestriction io_restriction, | 469 IORestriction io_restriction, |
485 TaskTracker* task_tracker, | 470 TaskTracker* task_tracker, |
486 DelayedTaskManager* delayed_task_manager) | 471 DelayedTaskManager* delayed_task_manager) |
487 : name_(name.as_string()), | 472 : io_restriction_(io_restriction), |
488 io_restriction_(io_restriction), | |
489 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), | 473 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), |
490 idle_worker_threads_stack_cv_for_testing_( | 474 idle_worker_threads_stack_cv_for_testing_( |
491 idle_worker_threads_stack_lock_.CreateConditionVariable()), | 475 idle_worker_threads_stack_lock_.CreateConditionVariable()), |
492 join_for_testing_returned_(true, false), | 476 join_for_testing_returned_(true, false), |
493 #if DCHECK_IS_ON() | 477 #if DCHECK_IS_ON() |
494 threads_created_(true, false), | 478 threads_created_(true, false), |
495 #endif | 479 #endif |
496 task_tracker_(task_tracker), | 480 task_tracker_(task_tracker), |
497 delayed_task_manager_(delayed_task_manager) { | 481 delayed_task_manager_(delayed_task_manager) { |
498 DCHECK(task_tracker_); | 482 DCHECK(task_tracker_); |
499 DCHECK(delayed_task_manager_); | 483 DCHECK(delayed_task_manager_); |
500 } | 484 } |
501 | 485 |
502 bool SchedulerThreadPoolImpl::Initialize( | 486 bool SchedulerThreadPoolImpl::Initialize( |
503 ThreadPriority thread_priority, | 487 ThreadPriority thread_priority, |
504 size_t max_threads, | 488 size_t max_threads, |
505 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | 489 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
506 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 490 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
507 | 491 |
508 DCHECK(worker_threads_.empty()); | 492 DCHECK(worker_threads_.empty()); |
509 | 493 |
510 for (size_t i = 0; i < max_threads; ++i) { | 494 for (size_t i = 0; i < max_threads; ++i) { |
511 std::unique_ptr<SchedulerWorkerThread> worker_thread = | 495 std::unique_ptr<SchedulerWorkerThread> worker_thread = |
512 SchedulerWorkerThread::Create( | 496 SchedulerWorkerThread::Create( |
513 thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl( | 497 thread_priority, |
514 this, re_enqueue_sequence_callback, | 498 WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
515 &shared_priority_queue_, i)), | 499 this, re_enqueue_sequence_callback, &shared_priority_queue_)), |
516 task_tracker_); | 500 task_tracker_); |
517 if (!worker_thread) | 501 if (!worker_thread) |
518 break; | 502 break; |
519 idle_worker_threads_stack_.Push(worker_thread.get()); | 503 idle_worker_threads_stack_.Push(worker_thread.get()); |
520 worker_threads_.push_back(std::move(worker_thread)); | 504 worker_threads_.push_back(std::move(worker_thread)); |
521 } | 505 } |
522 | 506 |
523 #if DCHECK_IS_ON() | 507 #if DCHECK_IS_ON() |
524 threads_created_.Signal(); | 508 threads_created_.Signal(); |
525 #endif | 509 #endif |
(...skipping 22 matching lines...) Expand all Loading... |
548 } | 532 } |
549 | 533 |
550 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( | 534 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( |
551 SchedulerWorkerThread* worker_thread) { | 535 SchedulerWorkerThread* worker_thread) { |
552 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 536 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
553 idle_worker_threads_stack_.Remove(worker_thread); | 537 idle_worker_threads_stack_.Remove(worker_thread); |
554 } | 538 } |
555 | 539 |
556 } // namespace internal | 540 } // namespace internal |
557 } // namespace base | 541 } // namespace base |
OLD | NEW |