OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <utility> | 8 #include <utility> |
fdoray
2016/05/03 19:01:50
#include <stddef.h> for size_t
gab
2016/05/04 18:11:37
Done.
| |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/bind_helpers.h" | 11 #include "base/bind_helpers.h" |
12 #include "base/lazy_instance.h" | 12 #include "base/lazy_instance.h" |
13 #include "base/memory/ptr_util.h" | 13 #include "base/memory/ptr_util.h" |
14 #include "base/sequenced_task_runner.h" | 14 #include "base/sequenced_task_runner.h" |
15 #include "base/single_thread_task_runner.h" | 15 #include "base/single_thread_task_runner.h" |
16 #include "base/strings/stringprintf.h" | |
16 #include "base/task_scheduler/delayed_task_manager.h" | 17 #include "base/task_scheduler/delayed_task_manager.h" |
17 #include "base/task_scheduler/task_tracker.h" | 18 #include "base/task_scheduler/task_tracker.h" |
19 #include "base/threading/platform_thread.h" | |
18 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
19 #include "base/threading/thread_restrictions.h" | 21 #include "base/threading/thread_restrictions.h" |
20 | 22 |
21 namespace base { | 23 namespace base { |
22 namespace internal { | 24 namespace internal { |
23 | 25 |
24 namespace { | 26 namespace { |
25 | 27 |
26 // SchedulerThreadPool that owns the current thread, if any. | 28 // SchedulerThreadPool that owns the current thread, if any. |
27 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky | 29 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky |
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
173 | 175 |
174 } // namespace | 176 } // namespace |
175 | 177 |
176 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl | 178 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl |
177 : public SchedulerWorkerThread::Delegate { | 179 : public SchedulerWorkerThread::Delegate { |
178 public: | 180 public: |
179 // |outer| owns the worker thread for which this delegate is constructed. | 181 // |outer| owns the worker thread for which this delegate is constructed. |
180 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 182 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
181 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 183 // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
182 // PriorityQueue whose transactions may overlap with the worker thread's | 184 // PriorityQueue whose transactions may overlap with the worker thread's |
183 // single-threaded PriorityQueue's transactions. | 185 // single-threaded PriorityQueue's transactions. |index| will be appended to |
186 // this thread's name to uniquely identify it. | |
184 SchedulerWorkerThreadDelegateImpl( | 187 SchedulerWorkerThreadDelegateImpl( |
185 SchedulerThreadPoolImpl* outer, | 188 SchedulerThreadPoolImpl* outer, |
186 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 189 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
187 const PriorityQueue* shared_priority_queue); | 190 const PriorityQueue* shared_priority_queue, |
191 size_t index); | |
188 ~SchedulerWorkerThreadDelegateImpl() override; | 192 ~SchedulerWorkerThreadDelegateImpl() override; |
189 | 193 |
190 PriorityQueue* single_threaded_priority_queue() { | 194 PriorityQueue* single_threaded_priority_queue() { |
191 return &single_threaded_priority_queue_; | 195 return &single_threaded_priority_queue_; |
192 } | 196 } |
193 | 197 |
194 // SchedulerWorkerThread::Delegate: | 198 // SchedulerWorkerThread::Delegate: |
195 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; | 199 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; |
196 scoped_refptr<Sequence> GetWork( | 200 scoped_refptr<Sequence> GetWork( |
197 SchedulerWorkerThread* worker_thread) override; | 201 SchedulerWorkerThread* worker_thread) override; |
198 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 202 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
199 | 203 |
200 private: | 204 private: |
201 SchedulerThreadPoolImpl* outer_; | 205 SchedulerThreadPoolImpl* outer_; |
202 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 206 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
203 | 207 |
204 // Single-threaded PriorityQueue for the worker thread. | 208 // Single-threaded PriorityQueue for the worker thread. |
205 PriorityQueue single_threaded_priority_queue_; | 209 PriorityQueue single_threaded_priority_queue_; |
206 | 210 |
207 // True if the last Sequence returned by GetWork() was extracted from | 211 // True if the last Sequence returned by GetWork() was extracted from |
208 // |single_threaded_priority_queue_|. | 212 // |single_threaded_priority_queue_|. |
209 bool last_sequence_is_single_threaded_ = false; | 213 bool last_sequence_is_single_threaded_ = false; |
210 | 214 |
215 const size_t index_; | |
216 | |
211 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); | 217 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
212 }; | 218 }; |
213 | 219 |
214 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { | 220 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { |
215 // SchedulerThreadPool should never be deleted in production unless its | 221 // SchedulerThreadPool should never be deleted in production unless its |
216 // initialization failed. | 222 // initialization failed. |
217 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); | 223 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
218 } | 224 } |
219 | 225 |
220 // static | 226 // static |
221 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( | 227 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( |
228 StringPiece name, | |
222 ThreadPriority thread_priority, | 229 ThreadPriority thread_priority, |
223 size_t max_threads, | 230 size_t max_threads, |
224 IORestriction io_restriction, | 231 IORestriction io_restriction, |
225 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 232 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
226 TaskTracker* task_tracker, | 233 TaskTracker* task_tracker, |
227 DelayedTaskManager* delayed_task_manager) { | 234 DelayedTaskManager* delayed_task_manager) { |
228 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( | 235 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( |
229 new SchedulerThreadPoolImpl(io_restriction, task_tracker, | 236 new SchedulerThreadPoolImpl(name, io_restriction, task_tracker, |
230 delayed_task_manager)); | 237 delayed_task_manager)); |
231 if (thread_pool->Initialize(thread_priority, max_threads, | 238 if (thread_pool->Initialize(thread_priority, max_threads, |
232 re_enqueue_sequence_callback)) { | 239 re_enqueue_sequence_callback)) { |
233 return thread_pool; | 240 return thread_pool; |
234 } | 241 } |
235 return nullptr; | 242 return nullptr; |
236 } | 243 } |
237 | 244 |
238 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { | 245 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { |
239 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 246 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
361 worker_thread->WakeUp(); | 368 worker_thread->WakeUp(); |
362 else | 369 else |
363 WakeUpOneThread(); | 370 WakeUpOneThread(); |
364 } | 371 } |
365 } | 372 } |
366 | 373 |
367 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 374 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
368 SchedulerWorkerThreadDelegateImpl( | 375 SchedulerWorkerThreadDelegateImpl( |
369 SchedulerThreadPoolImpl* outer, | 376 SchedulerThreadPoolImpl* outer, |
370 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 377 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
371 const PriorityQueue* shared_priority_queue) | 378 const PriorityQueue* shared_priority_queue, |
379 size_t index) | |
372 : outer_(outer), | 380 : outer_(outer), |
373 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 381 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
374 single_threaded_priority_queue_(shared_priority_queue) {} | 382 single_threaded_priority_queue_(shared_priority_queue), |
383 index_(index) {} | |
375 | 384 |
376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 385 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
377 ~SchedulerWorkerThreadDelegateImpl() = default; | 386 ~SchedulerWorkerThreadDelegateImpl() = default; |
378 | 387 |
379 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( | 388 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( |
380 SchedulerWorkerThread* worker_thread) { | 389 SchedulerWorkerThread* worker_thread) { |
381 #if DCHECK_IS_ON() | 390 #if DCHECK_IS_ON() |
382 // Wait for |outer_->threads_created_| to avoid traversing | 391 // Wait for |outer_->threads_created_| to avoid traversing |
383 // |outer_->worker_threads_| while it is being filled by Initialize(). | 392 // |outer_->worker_threads_| while it is being filled by Initialize(). |
384 outer_->threads_created_.Wait(); | 393 outer_->threads_created_.Wait(); |
385 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); | 394 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
386 #endif | 395 #endif |
387 | 396 |
397 PlatformThread::SetName(StringPrintf("%sWorker%u/%d", outer_->name_.c_str(), | |
398 index_, PlatformThread::CurrentId())); | |
fdoray
2016/05/03 19:01:50
Why is it useful to have index + thread id? I thin
robliao
2016/05/03 23:35:38
Doesn't the debugger generate this for you too?
0:
gab
2016/05/04 18:11:37
Agreed, I initially added the index to have a uniq
| |
399 | |
388 DCHECK(!tls_current_worker_thread.Get().Get()); | 400 DCHECK(!tls_current_worker_thread.Get().Get()); |
389 DCHECK(!tls_current_thread_pool.Get().Get()); | 401 DCHECK(!tls_current_thread_pool.Get().Get()); |
390 tls_current_worker_thread.Get().Set(worker_thread); | 402 tls_current_worker_thread.Get().Set(worker_thread); |
391 tls_current_thread_pool.Get().Set(outer_); | 403 tls_current_thread_pool.Get().Set(outer_); |
392 | 404 |
393 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == | 405 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
394 IORestriction::ALLOWED); | 406 IORestriction::ALLOWED); |
395 } | 407 } |
396 | 408 |
397 scoped_refptr<Sequence> | 409 scoped_refptr<Sequence> |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
459 single_threaded_priority_queue_.BeginTransaction()->Push( | 471 single_threaded_priority_queue_.BeginTransaction()->Push( |
460 std::move(sequence), sequence_sort_key); | 472 std::move(sequence), sequence_sort_key); |
461 } else { | 473 } else { |
462 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 474 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
463 // |sequence| must be enqueued. | 475 // |sequence| must be enqueued. |
464 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 476 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
465 } | 477 } |
466 } | 478 } |
467 | 479 |
468 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( | 480 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
481 StringPiece name, | |
469 IORestriction io_restriction, | 482 IORestriction io_restriction, |
470 TaskTracker* task_tracker, | 483 TaskTracker* task_tracker, |
471 DelayedTaskManager* delayed_task_manager) | 484 DelayedTaskManager* delayed_task_manager) |
472 : io_restriction_(io_restriction), | 485 : name_(name.as_string()), |
486 io_restriction_(io_restriction), | |
473 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), | 487 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), |
474 idle_worker_threads_stack_cv_for_testing_( | 488 idle_worker_threads_stack_cv_for_testing_( |
475 idle_worker_threads_stack_lock_.CreateConditionVariable()), | 489 idle_worker_threads_stack_lock_.CreateConditionVariable()), |
476 join_for_testing_returned_(true, false), | 490 join_for_testing_returned_(true, false), |
477 #if DCHECK_IS_ON() | 491 #if DCHECK_IS_ON() |
478 threads_created_(true, false), | 492 threads_created_(true, false), |
479 #endif | 493 #endif |
480 task_tracker_(task_tracker), | 494 task_tracker_(task_tracker), |
481 delayed_task_manager_(delayed_task_manager) { | 495 delayed_task_manager_(delayed_task_manager) { |
482 DCHECK(task_tracker_); | 496 DCHECK(task_tracker_); |
483 DCHECK(delayed_task_manager_); | 497 DCHECK(delayed_task_manager_); |
484 } | 498 } |
485 | 499 |
486 bool SchedulerThreadPoolImpl::Initialize( | 500 bool SchedulerThreadPoolImpl::Initialize( |
487 ThreadPriority thread_priority, | 501 ThreadPriority thread_priority, |
488 size_t max_threads, | 502 size_t max_threads, |
489 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | 503 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
490 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 504 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
491 | 505 |
492 DCHECK(worker_threads_.empty()); | 506 DCHECK(worker_threads_.empty()); |
493 | 507 |
494 for (size_t i = 0; i < max_threads; ++i) { | 508 for (size_t i = 0; i < max_threads; ++i) { |
495 std::unique_ptr<SchedulerWorkerThread> worker_thread = | 509 std::unique_ptr<SchedulerWorkerThread> worker_thread = |
496 SchedulerWorkerThread::Create( | 510 SchedulerWorkerThread::Create( |
497 thread_priority, | 511 thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
498 WrapUnique(new SchedulerWorkerThreadDelegateImpl( | 512 this, re_enqueue_sequence_callback, |
499 this, re_enqueue_sequence_callback, &shared_priority_queue_)), | 513 &shared_priority_queue_, i)), |
500 task_tracker_); | 514 task_tracker_); |
501 if (!worker_thread) | 515 if (!worker_thread) |
502 break; | 516 break; |
503 idle_worker_threads_stack_.Push(worker_thread.get()); | 517 idle_worker_threads_stack_.Push(worker_thread.get()); |
504 worker_threads_.push_back(std::move(worker_thread)); | 518 worker_threads_.push_back(std::move(worker_thread)); |
505 } | 519 } |
506 | 520 |
507 #if DCHECK_IS_ON() | 521 #if DCHECK_IS_ON() |
508 threads_created_.Signal(); | 522 threads_created_.Signal(); |
509 #endif | 523 #endif |
(...skipping 22 matching lines...) Expand all Loading... | |
532 } | 546 } |
533 | 547 |
534 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( | 548 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( |
535 SchedulerWorkerThread* worker_thread) { | 549 SchedulerWorkerThread* worker_thread) { |
536 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 550 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
537 idle_worker_threads_stack_.Remove(worker_thread); | 551 idle_worker_threads_stack_.Remove(worker_thread); |
538 } | 552 } |
539 | 553 |
540 } // namespace internal | 554 } // namespace internal |
541 } // namespace base | 555 } // namespace base |
OLD | NEW |