Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool_impl.cc

Issue 1951453002: Name TaskScheduler's worker threads (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b7_fdoray_fixtracing
Patch Set: update comment Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <utility> 8 #include <utility>
fdoray 2016/05/03 19:01:50 #include <stddef.h> for size_t
gab 2016/05/04 18:11:37 Done.
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/bind_helpers.h" 11 #include "base/bind_helpers.h"
12 #include "base/lazy_instance.h" 12 #include "base/lazy_instance.h"
13 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
14 #include "base/sequenced_task_runner.h" 14 #include "base/sequenced_task_runner.h"
15 #include "base/single_thread_task_runner.h" 15 #include "base/single_thread_task_runner.h"
16 #include "base/strings/stringprintf.h"
16 #include "base/task_scheduler/delayed_task_manager.h" 17 #include "base/task_scheduler/delayed_task_manager.h"
17 #include "base/task_scheduler/task_tracker.h" 18 #include "base/task_scheduler/task_tracker.h"
19 #include "base/threading/platform_thread.h"
18 #include "base/threading/thread_local.h" 20 #include "base/threading/thread_local.h"
19 #include "base/threading/thread_restrictions.h" 21 #include "base/threading/thread_restrictions.h"
20 22
21 namespace base { 23 namespace base {
22 namespace internal { 24 namespace internal {
23 25
24 namespace { 26 namespace {
25 27
26 // SchedulerThreadPool that owns the current thread, if any. 28 // SchedulerThreadPool that owns the current thread, if any.
27 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky 29 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
173 175
174 } // namespace 176 } // namespace
175 177
176 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl 178 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
177 : public SchedulerWorkerThread::Delegate { 179 : public SchedulerWorkerThread::Delegate {
178 public: 180 public:
179 // |outer| owns the worker thread for which this delegate is constructed. 181 // |outer| owns the worker thread for which this delegate is constructed.
180 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is 182 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
181 // called with a non-single-threaded Sequence. |shared_priority_queue| is a 183 // called with a non-single-threaded Sequence. |shared_priority_queue| is a
182 // PriorityQueue whose transactions may overlap with the worker thread's 184 // PriorityQueue whose transactions may overlap with the worker thread's
183 // single-threaded PriorityQueue's transactions. 185 // single-threaded PriorityQueue's transactions. |index| will be appended to
186 // this thread's name to uniquely identify it.
184 SchedulerWorkerThreadDelegateImpl( 187 SchedulerWorkerThreadDelegateImpl(
185 SchedulerThreadPoolImpl* outer, 188 SchedulerThreadPoolImpl* outer,
186 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 189 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
187 const PriorityQueue* shared_priority_queue); 190 const PriorityQueue* shared_priority_queue,
191 size_t index);
188 ~SchedulerWorkerThreadDelegateImpl() override; 192 ~SchedulerWorkerThreadDelegateImpl() override;
189 193
190 PriorityQueue* single_threaded_priority_queue() { 194 PriorityQueue* single_threaded_priority_queue() {
191 return &single_threaded_priority_queue_; 195 return &single_threaded_priority_queue_;
192 } 196 }
193 197
194 // SchedulerWorkerThread::Delegate: 198 // SchedulerWorkerThread::Delegate:
195 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; 199 void OnMainEntry(SchedulerWorkerThread* worker_thread) override;
196 scoped_refptr<Sequence> GetWork( 200 scoped_refptr<Sequence> GetWork(
197 SchedulerWorkerThread* worker_thread) override; 201 SchedulerWorkerThread* worker_thread) override;
198 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; 202 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
199 203
200 private: 204 private:
201 SchedulerThreadPoolImpl* outer_; 205 SchedulerThreadPoolImpl* outer_;
202 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; 206 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
203 207
204 // Single-threaded PriorityQueue for the worker thread. 208 // Single-threaded PriorityQueue for the worker thread.
205 PriorityQueue single_threaded_priority_queue_; 209 PriorityQueue single_threaded_priority_queue_;
206 210
207 // True if the last Sequence returned by GetWork() was extracted from 211 // True if the last Sequence returned by GetWork() was extracted from
208 // |single_threaded_priority_queue_|. 212 // |single_threaded_priority_queue_|.
209 bool last_sequence_is_single_threaded_ = false; 213 bool last_sequence_is_single_threaded_ = false;
210 214
215 const size_t index_;
216
211 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); 217 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
212 }; 218 };
213 219
214 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { 220 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() {
215 // SchedulerThreadPool should never be deleted in production unless its 221 // SchedulerThreadPool should never be deleted in production unless its
216 // initialization failed. 222 // initialization failed.
217 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); 223 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
218 } 224 }
219 225
220 // static 226 // static
221 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( 227 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create(
228 StringPiece name,
222 ThreadPriority thread_priority, 229 ThreadPriority thread_priority,
223 size_t max_threads, 230 size_t max_threads,
224 IORestriction io_restriction, 231 IORestriction io_restriction,
225 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 232 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
226 TaskTracker* task_tracker, 233 TaskTracker* task_tracker,
227 DelayedTaskManager* delayed_task_manager) { 234 DelayedTaskManager* delayed_task_manager) {
228 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( 235 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool(
229 new SchedulerThreadPoolImpl(io_restriction, task_tracker, 236 new SchedulerThreadPoolImpl(name, io_restriction, task_tracker,
230 delayed_task_manager)); 237 delayed_task_manager));
231 if (thread_pool->Initialize(thread_priority, max_threads, 238 if (thread_pool->Initialize(thread_priority, max_threads,
232 re_enqueue_sequence_callback)) { 239 re_enqueue_sequence_callback)) {
233 return thread_pool; 240 return thread_pool;
234 } 241 }
235 return nullptr; 242 return nullptr;
236 } 243 }
237 244
238 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { 245 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() {
239 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 246 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
361 worker_thread->WakeUp(); 368 worker_thread->WakeUp();
362 else 369 else
363 WakeUpOneThread(); 370 WakeUpOneThread();
364 } 371 }
365 } 372 }
366 373
367 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: 374 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
368 SchedulerWorkerThreadDelegateImpl( 375 SchedulerWorkerThreadDelegateImpl(
369 SchedulerThreadPoolImpl* outer, 376 SchedulerThreadPoolImpl* outer,
370 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 377 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
371 const PriorityQueue* shared_priority_queue) 378 const PriorityQueue* shared_priority_queue,
379 size_t index)
372 : outer_(outer), 380 : outer_(outer),
373 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), 381 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
374 single_threaded_priority_queue_(shared_priority_queue) {} 382 single_threaded_priority_queue_(shared_priority_queue),
383 index_(index) {}
375 384
376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: 385 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
377 ~SchedulerWorkerThreadDelegateImpl() = default; 386 ~SchedulerWorkerThreadDelegateImpl() = default;
378 387
379 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( 388 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry(
380 SchedulerWorkerThread* worker_thread) { 389 SchedulerWorkerThread* worker_thread) {
381 #if DCHECK_IS_ON() 390 #if DCHECK_IS_ON()
382 // Wait for |outer_->threads_created_| to avoid traversing 391 // Wait for |outer_->threads_created_| to avoid traversing
383 // |outer_->worker_threads_| while it is being filled by Initialize(). 392 // |outer_->worker_threads_| while it is being filled by Initialize().
384 outer_->threads_created_.Wait(); 393 outer_->threads_created_.Wait();
385 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); 394 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread));
386 #endif 395 #endif
387 396
397 PlatformThread::SetName(StringPrintf("%sWorker%u/%d", outer_->name_.c_str(),
398 index_, PlatformThread::CurrentId()));
fdoray 2016/05/03 19:01:50 Why is it useful to have index + thread id? I thin
robliao 2016/05/03 23:35:38 Doesn't the debugger generate this for you too? 0:
gab 2016/05/04 18:11:37 Agreed, I initially added the index to have a uniq
399
388 DCHECK(!tls_current_worker_thread.Get().Get()); 400 DCHECK(!tls_current_worker_thread.Get().Get());
389 DCHECK(!tls_current_thread_pool.Get().Get()); 401 DCHECK(!tls_current_thread_pool.Get().Get());
390 tls_current_worker_thread.Get().Set(worker_thread); 402 tls_current_worker_thread.Get().Set(worker_thread);
391 tls_current_thread_pool.Get().Set(outer_); 403 tls_current_thread_pool.Get().Set(outer_);
392 404
393 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == 405 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ ==
394 IORestriction::ALLOWED); 406 IORestriction::ALLOWED);
395 } 407 }
396 408
397 scoped_refptr<Sequence> 409 scoped_refptr<Sequence>
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
459 single_threaded_priority_queue_.BeginTransaction()->Push( 471 single_threaded_priority_queue_.BeginTransaction()->Push(
460 std::move(sequence), sequence_sort_key); 472 std::move(sequence), sequence_sort_key);
461 } else { 473 } else {
462 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue 474 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
463 // |sequence| must be enqueued. 475 // |sequence| must be enqueued.
464 re_enqueue_sequence_callback_.Run(std::move(sequence)); 476 re_enqueue_sequence_callback_.Run(std::move(sequence));
465 } 477 }
466 } 478 }
467 479
468 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( 480 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
481 StringPiece name,
469 IORestriction io_restriction, 482 IORestriction io_restriction,
470 TaskTracker* task_tracker, 483 TaskTracker* task_tracker,
471 DelayedTaskManager* delayed_task_manager) 484 DelayedTaskManager* delayed_task_manager)
472 : io_restriction_(io_restriction), 485 : name_(name.as_string()),
486 io_restriction_(io_restriction),
473 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), 487 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
474 idle_worker_threads_stack_cv_for_testing_( 488 idle_worker_threads_stack_cv_for_testing_(
475 idle_worker_threads_stack_lock_.CreateConditionVariable()), 489 idle_worker_threads_stack_lock_.CreateConditionVariable()),
476 join_for_testing_returned_(true, false), 490 join_for_testing_returned_(true, false),
477 #if DCHECK_IS_ON() 491 #if DCHECK_IS_ON()
478 threads_created_(true, false), 492 threads_created_(true, false),
479 #endif 493 #endif
480 task_tracker_(task_tracker), 494 task_tracker_(task_tracker),
481 delayed_task_manager_(delayed_task_manager) { 495 delayed_task_manager_(delayed_task_manager) {
482 DCHECK(task_tracker_); 496 DCHECK(task_tracker_);
483 DCHECK(delayed_task_manager_); 497 DCHECK(delayed_task_manager_);
484 } 498 }
485 499
486 bool SchedulerThreadPoolImpl::Initialize( 500 bool SchedulerThreadPoolImpl::Initialize(
487 ThreadPriority thread_priority, 501 ThreadPriority thread_priority,
488 size_t max_threads, 502 size_t max_threads,
489 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { 503 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
490 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 504 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
491 505
492 DCHECK(worker_threads_.empty()); 506 DCHECK(worker_threads_.empty());
493 507
494 for (size_t i = 0; i < max_threads; ++i) { 508 for (size_t i = 0; i < max_threads; ++i) {
495 std::unique_ptr<SchedulerWorkerThread> worker_thread = 509 std::unique_ptr<SchedulerWorkerThread> worker_thread =
496 SchedulerWorkerThread::Create( 510 SchedulerWorkerThread::Create(
497 thread_priority, 511 thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl(
498 WrapUnique(new SchedulerWorkerThreadDelegateImpl( 512 this, re_enqueue_sequence_callback,
499 this, re_enqueue_sequence_callback, &shared_priority_queue_)), 513 &shared_priority_queue_, i)),
500 task_tracker_); 514 task_tracker_);
501 if (!worker_thread) 515 if (!worker_thread)
502 break; 516 break;
503 idle_worker_threads_stack_.Push(worker_thread.get()); 517 idle_worker_threads_stack_.Push(worker_thread.get());
504 worker_threads_.push_back(std::move(worker_thread)); 518 worker_threads_.push_back(std::move(worker_thread));
505 } 519 }
506 520
507 #if DCHECK_IS_ON() 521 #if DCHECK_IS_ON()
508 threads_created_.Signal(); 522 threads_created_.Signal();
509 #endif 523 #endif
(...skipping 22 matching lines...) Expand all
532 } 546 }
533 547
534 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( 548 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack(
535 SchedulerWorkerThread* worker_thread) { 549 SchedulerWorkerThread* worker_thread) {
536 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 550 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
537 idle_worker_threads_stack_.Remove(worker_thread); 551 idle_worker_threads_stack_.Remove(worker_thread);
538 } 552 }
539 553
540 } // namespace internal 554 } // namespace internal
541 } // namespace base 555 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698