| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> |
| 8 |
| 7 #include <algorithm> | 9 #include <algorithm> |
| 8 #include <utility> | 10 #include <utility> |
| 9 | 11 |
| 10 #include "base/bind.h" | 12 #include "base/bind.h" |
| 11 #include "base/bind_helpers.h" | 13 #include "base/bind_helpers.h" |
| 12 #include "base/lazy_instance.h" | 14 #include "base/lazy_instance.h" |
| 13 #include "base/memory/ptr_util.h" | 15 #include "base/memory/ptr_util.h" |
| 14 #include "base/sequenced_task_runner.h" | 16 #include "base/sequenced_task_runner.h" |
| 15 #include "base/single_thread_task_runner.h" | 17 #include "base/single_thread_task_runner.h" |
| 18 #include "base/strings/stringprintf.h" |
| 16 #include "base/task_scheduler/delayed_task_manager.h" | 19 #include "base/task_scheduler/delayed_task_manager.h" |
| 17 #include "base/task_scheduler/task_tracker.h" | 20 #include "base/task_scheduler/task_tracker.h" |
| 21 #include "base/threading/platform_thread.h" |
| 18 #include "base/threading/thread_local.h" | 22 #include "base/threading/thread_local.h" |
| 19 #include "base/threading/thread_restrictions.h" | 23 #include "base/threading/thread_restrictions.h" |
| 20 | 24 |
| 21 namespace base { | 25 namespace base { |
| 22 namespace internal { | 26 namespace internal { |
| 23 | 27 |
| 24 namespace { | 28 namespace { |
| 25 | 29 |
| 26 // SchedulerThreadPool that owns the current thread, if any. | 30 // SchedulerThreadPool that owns the current thread, if any. |
| 27 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky | 31 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky |
| (...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 173 | 177 |
| 174 } // namespace | 178 } // namespace |
| 175 | 179 |
| 176 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl | 180 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
| 177 : public SchedulerWorkerThread::Delegate { | 181 : public SchedulerWorkerThread::Delegate { |
| 178 public: | 182 public: |
| 179 // |outer| owns the worker thread for which this delegate is constructed. | 183 // |outer| owns the worker thread for which this delegate is constructed. |
| 180 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 184 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
| 181 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 185 // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
| 182 // PriorityQueue whose transactions may overlap with the worker thread's | 186 // PriorityQueue whose transactions may overlap with the worker thread's |
| 183 // single-threaded PriorityQueue's transactions. | 187 // single-threaded PriorityQueue's transactions. |index| will be appended to |
| 188 // this thread's name to uniquely identify it. |
| 184 SchedulerWorkerThreadDelegateImpl( | 189 SchedulerWorkerThreadDelegateImpl( |
| 185 SchedulerThreadPoolImpl* outer, | 190 SchedulerThreadPoolImpl* outer, |
| 186 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 191 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 187 const PriorityQueue* shared_priority_queue); | 192 const PriorityQueue* shared_priority_queue, |
| 193 int index); |
| 188 ~SchedulerWorkerThreadDelegateImpl() override; | 194 ~SchedulerWorkerThreadDelegateImpl() override; |
| 189 | 195 |
| 190 PriorityQueue* single_threaded_priority_queue() { | 196 PriorityQueue* single_threaded_priority_queue() { |
| 191 return &single_threaded_priority_queue_; | 197 return &single_threaded_priority_queue_; |
| 192 } | 198 } |
| 193 | 199 |
| 194 // SchedulerWorkerThread::Delegate: | 200 // SchedulerWorkerThread::Delegate: |
| 195 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; | 201 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; |
| 196 scoped_refptr<Sequence> GetWork( | 202 scoped_refptr<Sequence> GetWork( |
| 197 SchedulerWorkerThread* worker_thread) override; | 203 SchedulerWorkerThread* worker_thread) override; |
| 198 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 204 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
| 199 | 205 |
| 200 private: | 206 private: |
| 201 SchedulerThreadPoolImpl* outer_; | 207 SchedulerThreadPoolImpl* outer_; |
| 202 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| 203 | 209 |
| 204 // Single-threaded PriorityQueue for the worker thread. | 210 // Single-threaded PriorityQueue for the worker thread. |
| 205 PriorityQueue single_threaded_priority_queue_; | 211 PriorityQueue single_threaded_priority_queue_; |
| 206 | 212 |
| 207 // True if the last Sequence returned by GetWork() was extracted from | 213 // True if the last Sequence returned by GetWork() was extracted from |
| 208 // |single_threaded_priority_queue_|. | 214 // |single_threaded_priority_queue_|. |
| 209 bool last_sequence_is_single_threaded_ = false; | 215 bool last_sequence_is_single_threaded_ = false; |
| 210 | 216 |
| 217 const int index_; |
| 218 |
| 211 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); | 219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
| 212 }; | 220 }; |
| 213 | 221 |
| 214 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { | 222 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { |
| 215 // SchedulerThreadPool should never be deleted in production unless its | 223 // SchedulerThreadPool should never be deleted in production unless its |
| 216 // initialization failed. | 224 // initialization failed. |
| 217 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); | 225 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
| 218 } | 226 } |
| 219 | 227 |
| 220 // static | 228 // static |
| 221 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( | 229 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( |
| 230 StringPiece name, |
| 222 ThreadPriority thread_priority, | 231 ThreadPriority thread_priority, |
| 223 size_t max_threads, | 232 size_t max_threads, |
| 224 IORestriction io_restriction, | 233 IORestriction io_restriction, |
| 225 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 234 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 226 TaskTracker* task_tracker, | 235 TaskTracker* task_tracker, |
| 227 DelayedTaskManager* delayed_task_manager) { | 236 DelayedTaskManager* delayed_task_manager) { |
| 228 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( | 237 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( |
| 229 new SchedulerThreadPoolImpl(io_restriction, task_tracker, | 238 new SchedulerThreadPoolImpl(name, io_restriction, task_tracker, |
| 230 delayed_task_manager)); | 239 delayed_task_manager)); |
| 231 if (thread_pool->Initialize(thread_priority, max_threads, | 240 if (thread_pool->Initialize(thread_priority, max_threads, |
| 232 re_enqueue_sequence_callback)) { | 241 re_enqueue_sequence_callback)) { |
| 233 return thread_pool; | 242 return thread_pool; |
| 234 } | 243 } |
| 235 return nullptr; | 244 return nullptr; |
| 236 } | 245 } |
| 237 | 246 |
| 238 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { | 247 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { |
| 239 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 248 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 361 worker_thread->WakeUp(); | 370 worker_thread->WakeUp(); |
| 362 else | 371 else |
| 363 WakeUpOneThread(); | 372 WakeUpOneThread(); |
| 364 } | 373 } |
| 365 } | 374 } |
| 366 | 375 |
| 367 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
| 368 SchedulerWorkerThreadDelegateImpl( | 377 SchedulerWorkerThreadDelegateImpl( |
| 369 SchedulerThreadPoolImpl* outer, | 378 SchedulerThreadPoolImpl* outer, |
| 370 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 379 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 371 const PriorityQueue* shared_priority_queue) | 380 const PriorityQueue* shared_priority_queue, |
| 381 int index) |
| 372 : outer_(outer), | 382 : outer_(outer), |
| 373 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 383 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| 374 single_threaded_priority_queue_(shared_priority_queue) {} | 384 single_threaded_priority_queue_(shared_priority_queue), |
| 385 index_(index) {} |
| 375 | 386 |
| 376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 387 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
| 377 ~SchedulerWorkerThreadDelegateImpl() = default; | 388 ~SchedulerWorkerThreadDelegateImpl() = default; |
| 378 | 389 |
| 379 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( | 390 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( |
| 380 SchedulerWorkerThread* worker_thread) { | 391 SchedulerWorkerThread* worker_thread) { |
| 381 #if DCHECK_IS_ON() | 392 #if DCHECK_IS_ON() |
| 382 // Wait for |outer_->threads_created_| to avoid traversing | 393 // Wait for |outer_->threads_created_| to avoid traversing |
| 383 // |outer_->worker_threads_| while it is being filled by Initialize(). | 394 // |outer_->worker_threads_| while it is being filled by Initialize(). |
| 384 outer_->threads_created_.Wait(); | 395 outer_->threads_created_.Wait(); |
| 385 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); | 396 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
| 386 #endif | 397 #endif |
| 387 | 398 |
| 399 PlatformThread::SetName( |
| 400 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); |
| 401 |
| 388 DCHECK(!tls_current_worker_thread.Get().Get()); | 402 DCHECK(!tls_current_worker_thread.Get().Get()); |
| 389 DCHECK(!tls_current_thread_pool.Get().Get()); | 403 DCHECK(!tls_current_thread_pool.Get().Get()); |
| 390 tls_current_worker_thread.Get().Set(worker_thread); | 404 tls_current_worker_thread.Get().Set(worker_thread); |
| 391 tls_current_thread_pool.Get().Set(outer_); | 405 tls_current_thread_pool.Get().Set(outer_); |
| 392 | 406 |
| 393 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == | 407 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
| 394 IORestriction::ALLOWED); | 408 IORestriction::ALLOWED); |
| 395 } | 409 } |
| 396 | 410 |
| 397 scoped_refptr<Sequence> | 411 scoped_refptr<Sequence> |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 459 single_threaded_priority_queue_.BeginTransaction()->Push( | 473 single_threaded_priority_queue_.BeginTransaction()->Push( |
| 460 std::move(sequence), sequence_sort_key); | 474 std::move(sequence), sequence_sort_key); |
| 461 } else { | 475 } else { |
| 462 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 476 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
| 463 // |sequence| must be enqueued. | 477 // |sequence| must be enqueued. |
| 464 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 478 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| 465 } | 479 } |
| 466 } | 480 } |
| 467 | 481 |
| 468 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( | 482 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
| 483 StringPiece name, |
| 469 IORestriction io_restriction, | 484 IORestriction io_restriction, |
| 470 TaskTracker* task_tracker, | 485 TaskTracker* task_tracker, |
| 471 DelayedTaskManager* delayed_task_manager) | 486 DelayedTaskManager* delayed_task_manager) |
| 472 : io_restriction_(io_restriction), | 487 : name_(name.as_string()), |
| 488 io_restriction_(io_restriction), |
| 473 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), | 489 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), |
| 474 idle_worker_threads_stack_cv_for_testing_( | 490 idle_worker_threads_stack_cv_for_testing_( |
| 475 idle_worker_threads_stack_lock_.CreateConditionVariable()), | 491 idle_worker_threads_stack_lock_.CreateConditionVariable()), |
| 476 join_for_testing_returned_(true, false), | 492 join_for_testing_returned_(true, false), |
| 477 #if DCHECK_IS_ON() | 493 #if DCHECK_IS_ON() |
| 478 threads_created_(true, false), | 494 threads_created_(true, false), |
| 479 #endif | 495 #endif |
| 480 task_tracker_(task_tracker), | 496 task_tracker_(task_tracker), |
| 481 delayed_task_manager_(delayed_task_manager) { | 497 delayed_task_manager_(delayed_task_manager) { |
| 482 DCHECK(task_tracker_); | 498 DCHECK(task_tracker_); |
| 483 DCHECK(delayed_task_manager_); | 499 DCHECK(delayed_task_manager_); |
| 484 } | 500 } |
| 485 | 501 |
| 486 bool SchedulerThreadPoolImpl::Initialize( | 502 bool SchedulerThreadPoolImpl::Initialize( |
| 487 ThreadPriority thread_priority, | 503 ThreadPriority thread_priority, |
| 488 size_t max_threads, | 504 size_t max_threads, |
| 489 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | 505 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
| 490 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 506 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| 491 | 507 |
| 492 DCHECK(worker_threads_.empty()); | 508 DCHECK(worker_threads_.empty()); |
| 493 | 509 |
| 494 for (size_t i = 0; i < max_threads; ++i) { | 510 for (size_t i = 0; i < max_threads; ++i) { |
| 495 std::unique_ptr<SchedulerWorkerThread> worker_thread = | 511 std::unique_ptr<SchedulerWorkerThread> worker_thread = |
| 496 SchedulerWorkerThread::Create( | 512 SchedulerWorkerThread::Create( |
| 497 thread_priority, | 513 thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
| 498 WrapUnique(new SchedulerWorkerThreadDelegateImpl( | 514 this, re_enqueue_sequence_callback, |
| 499 this, re_enqueue_sequence_callback, &shared_priority_queue_)), | 515 &shared_priority_queue_, static_cast<int>(i))), |
| 500 task_tracker_); | 516 task_tracker_); |
| 501 if (!worker_thread) | 517 if (!worker_thread) |
| 502 break; | 518 break; |
| 503 idle_worker_threads_stack_.Push(worker_thread.get()); | 519 idle_worker_threads_stack_.Push(worker_thread.get()); |
| 504 worker_threads_.push_back(std::move(worker_thread)); | 520 worker_threads_.push_back(std::move(worker_thread)); |
| 505 } | 521 } |
| 506 | 522 |
| 507 #if DCHECK_IS_ON() | 523 #if DCHECK_IS_ON() |
| 508 threads_created_.Signal(); | 524 threads_created_.Signal(); |
| 509 #endif | 525 #endif |
| (...skipping 22 matching lines...) Expand all Loading... |
| 532 } | 548 } |
| 533 | 549 |
| 534 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( | 550 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( |
| 535 SchedulerWorkerThread* worker_thread) { | 551 SchedulerWorkerThread* worker_thread) { |
| 536 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 552 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
| 537 idle_worker_threads_stack_.Remove(worker_thread); | 553 idle_worker_threads_stack_.Remove(worker_thread); |
| 538 } | 554 } |
| 539 | 555 |
| 540 } // namespace internal | 556 } // namespace internal |
| 541 } // namespace base | 557 } // namespace base |
| OLD | NEW |