Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(229)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool_impl.cc

Issue 1951453002: Name TaskScheduler's worker threads (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b7_fdoray_fixtracing
Patch Set: explicit cast to int Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h"
6 6
7 #include <stddef.h>
8
7 #include <algorithm> 9 #include <algorithm>
8 #include <utility> 10 #include <utility>
9 11
10 #include "base/bind.h" 12 #include "base/bind.h"
11 #include "base/bind_helpers.h" 13 #include "base/bind_helpers.h"
12 #include "base/lazy_instance.h" 14 #include "base/lazy_instance.h"
13 #include "base/memory/ptr_util.h" 15 #include "base/memory/ptr_util.h"
14 #include "base/sequenced_task_runner.h" 16 #include "base/sequenced_task_runner.h"
15 #include "base/single_thread_task_runner.h" 17 #include "base/single_thread_task_runner.h"
18 #include "base/strings/stringprintf.h"
16 #include "base/task_scheduler/delayed_task_manager.h" 19 #include "base/task_scheduler/delayed_task_manager.h"
17 #include "base/task_scheduler/task_tracker.h" 20 #include "base/task_scheduler/task_tracker.h"
21 #include "base/threading/platform_thread.h"
18 #include "base/threading/thread_local.h" 22 #include "base/threading/thread_local.h"
19 #include "base/threading/thread_restrictions.h" 23 #include "base/threading/thread_restrictions.h"
20 24
21 namespace base { 25 namespace base {
22 namespace internal { 26 namespace internal {
23 27
24 namespace { 28 namespace {
25 29
26 // SchedulerThreadPool that owns the current thread, if any. 30 // SchedulerThreadPool that owns the current thread, if any.
27 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky 31 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
173 177
174 } // namespace 178 } // namespace
175 179
176 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl 180 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
177 : public SchedulerWorkerThread::Delegate { 181 : public SchedulerWorkerThread::Delegate {
178 public: 182 public:
179 // |outer| owns the worker thread for which this delegate is constructed. 183 // |outer| owns the worker thread for which this delegate is constructed.
180 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is 184 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
181 // called with a non-single-threaded Sequence. |shared_priority_queue| is a 185 // called with a non-single-threaded Sequence. |shared_priority_queue| is a
182 // PriorityQueue whose transactions may overlap with the worker thread's 186 // PriorityQueue whose transactions may overlap with the worker thread's
183 // single-threaded PriorityQueue's transactions. 187 // single-threaded PriorityQueue's transactions. |index| will be appended to
188 // this thread's name to uniquely identify it.
184 SchedulerWorkerThreadDelegateImpl( 189 SchedulerWorkerThreadDelegateImpl(
185 SchedulerThreadPoolImpl* outer, 190 SchedulerThreadPoolImpl* outer,
186 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 191 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
187 const PriorityQueue* shared_priority_queue); 192 const PriorityQueue* shared_priority_queue,
193 int index);
188 ~SchedulerWorkerThreadDelegateImpl() override; 194 ~SchedulerWorkerThreadDelegateImpl() override;
189 195
190 PriorityQueue* single_threaded_priority_queue() { 196 PriorityQueue* single_threaded_priority_queue() {
191 return &single_threaded_priority_queue_; 197 return &single_threaded_priority_queue_;
192 } 198 }
193 199
194 // SchedulerWorkerThread::Delegate: 200 // SchedulerWorkerThread::Delegate:
195 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; 201 void OnMainEntry(SchedulerWorkerThread* worker_thread) override;
196 scoped_refptr<Sequence> GetWork( 202 scoped_refptr<Sequence> GetWork(
197 SchedulerWorkerThread* worker_thread) override; 203 SchedulerWorkerThread* worker_thread) override;
198 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; 204 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
199 205
200 private: 206 private:
201 SchedulerThreadPoolImpl* outer_; 207 SchedulerThreadPoolImpl* outer_;
202 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; 208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
203 209
204 // Single-threaded PriorityQueue for the worker thread. 210 // Single-threaded PriorityQueue for the worker thread.
205 PriorityQueue single_threaded_priority_queue_; 211 PriorityQueue single_threaded_priority_queue_;
206 212
207 // True if the last Sequence returned by GetWork() was extracted from 213 // True if the last Sequence returned by GetWork() was extracted from
208 // |single_threaded_priority_queue_|. 214 // |single_threaded_priority_queue_|.
209 bool last_sequence_is_single_threaded_ = false; 215 bool last_sequence_is_single_threaded_ = false;
210 216
217 const int index_;
218
211 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); 219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
212 }; 220 };
213 221
214 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { 222 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() {
215 // SchedulerThreadPool should never be deleted in production unless its 223 // SchedulerThreadPool should never be deleted in production unless its
216 // initialization failed. 224 // initialization failed.
217 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); 225 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
218 } 226 }
219 227
220 // static 228 // static
221 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( 229 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create(
230 StringPiece name,
222 ThreadPriority thread_priority, 231 ThreadPriority thread_priority,
223 size_t max_threads, 232 size_t max_threads,
224 IORestriction io_restriction, 233 IORestriction io_restriction,
225 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 234 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
226 TaskTracker* task_tracker, 235 TaskTracker* task_tracker,
227 DelayedTaskManager* delayed_task_manager) { 236 DelayedTaskManager* delayed_task_manager) {
228 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( 237 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool(
229 new SchedulerThreadPoolImpl(io_restriction, task_tracker, 238 new SchedulerThreadPoolImpl(name, io_restriction, task_tracker,
230 delayed_task_manager)); 239 delayed_task_manager));
231 if (thread_pool->Initialize(thread_priority, max_threads, 240 if (thread_pool->Initialize(thread_priority, max_threads,
232 re_enqueue_sequence_callback)) { 241 re_enqueue_sequence_callback)) {
233 return thread_pool; 242 return thread_pool;
234 } 243 }
235 return nullptr; 244 return nullptr;
236 } 245 }
237 246
238 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { 247 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() {
239 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 248 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
361 worker_thread->WakeUp(); 370 worker_thread->WakeUp();
362 else 371 else
363 WakeUpOneThread(); 372 WakeUpOneThread();
364 } 373 }
365 } 374 }
366 375
367 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: 376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
368 SchedulerWorkerThreadDelegateImpl( 377 SchedulerWorkerThreadDelegateImpl(
369 SchedulerThreadPoolImpl* outer, 378 SchedulerThreadPoolImpl* outer,
370 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 379 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
371 const PriorityQueue* shared_priority_queue) 380 const PriorityQueue* shared_priority_queue,
381 int index)
372 : outer_(outer), 382 : outer_(outer),
373 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), 383 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
374 single_threaded_priority_queue_(shared_priority_queue) {} 384 single_threaded_priority_queue_(shared_priority_queue),
385 index_(index) {}
375 386
376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: 387 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
377 ~SchedulerWorkerThreadDelegateImpl() = default; 388 ~SchedulerWorkerThreadDelegateImpl() = default;
378 389
379 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( 390 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry(
380 SchedulerWorkerThread* worker_thread) { 391 SchedulerWorkerThread* worker_thread) {
381 #if DCHECK_IS_ON() 392 #if DCHECK_IS_ON()
382 // Wait for |outer_->threads_created_| to avoid traversing 393 // Wait for |outer_->threads_created_| to avoid traversing
383 // |outer_->worker_threads_| while it is being filled by Initialize(). 394 // |outer_->worker_threads_| while it is being filled by Initialize().
384 outer_->threads_created_.Wait(); 395 outer_->threads_created_.Wait();
385 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); 396 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread));
386 #endif 397 #endif
387 398
399 PlatformThread::SetName(
400 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_));
401
388 DCHECK(!tls_current_worker_thread.Get().Get()); 402 DCHECK(!tls_current_worker_thread.Get().Get());
389 DCHECK(!tls_current_thread_pool.Get().Get()); 403 DCHECK(!tls_current_thread_pool.Get().Get());
390 tls_current_worker_thread.Get().Set(worker_thread); 404 tls_current_worker_thread.Get().Set(worker_thread);
391 tls_current_thread_pool.Get().Set(outer_); 405 tls_current_thread_pool.Get().Set(outer_);
392 406
393 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == 407 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ ==
394 IORestriction::ALLOWED); 408 IORestriction::ALLOWED);
395 } 409 }
396 410
397 scoped_refptr<Sequence> 411 scoped_refptr<Sequence>
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
459 single_threaded_priority_queue_.BeginTransaction()->Push( 473 single_threaded_priority_queue_.BeginTransaction()->Push(
460 std::move(sequence), sequence_sort_key); 474 std::move(sequence), sequence_sort_key);
461 } else { 475 } else {
462 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue 476 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
463 // |sequence| must be enqueued. 477 // |sequence| must be enqueued.
464 re_enqueue_sequence_callback_.Run(std::move(sequence)); 478 re_enqueue_sequence_callback_.Run(std::move(sequence));
465 } 479 }
466 } 480 }
467 481
468 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( 482 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
483 StringPiece name,
469 IORestriction io_restriction, 484 IORestriction io_restriction,
470 TaskTracker* task_tracker, 485 TaskTracker* task_tracker,
471 DelayedTaskManager* delayed_task_manager) 486 DelayedTaskManager* delayed_task_manager)
472 : io_restriction_(io_restriction), 487 : name_(name.as_string()),
488 io_restriction_(io_restriction),
473 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), 489 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
474 idle_worker_threads_stack_cv_for_testing_( 490 idle_worker_threads_stack_cv_for_testing_(
475 idle_worker_threads_stack_lock_.CreateConditionVariable()), 491 idle_worker_threads_stack_lock_.CreateConditionVariable()),
476 join_for_testing_returned_(true, false), 492 join_for_testing_returned_(true, false),
477 #if DCHECK_IS_ON() 493 #if DCHECK_IS_ON()
478 threads_created_(true, false), 494 threads_created_(true, false),
479 #endif 495 #endif
480 task_tracker_(task_tracker), 496 task_tracker_(task_tracker),
481 delayed_task_manager_(delayed_task_manager) { 497 delayed_task_manager_(delayed_task_manager) {
482 DCHECK(task_tracker_); 498 DCHECK(task_tracker_);
483 DCHECK(delayed_task_manager_); 499 DCHECK(delayed_task_manager_);
484 } 500 }
485 501
486 bool SchedulerThreadPoolImpl::Initialize( 502 bool SchedulerThreadPoolImpl::Initialize(
487 ThreadPriority thread_priority, 503 ThreadPriority thread_priority,
488 size_t max_threads, 504 size_t max_threads,
489 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { 505 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
490 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 506 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
491 507
492 DCHECK(worker_threads_.empty()); 508 DCHECK(worker_threads_.empty());
493 509
494 for (size_t i = 0; i < max_threads; ++i) { 510 for (size_t i = 0; i < max_threads; ++i) {
495 std::unique_ptr<SchedulerWorkerThread> worker_thread = 511 std::unique_ptr<SchedulerWorkerThread> worker_thread =
496 SchedulerWorkerThread::Create( 512 SchedulerWorkerThread::Create(
497 thread_priority, 513 thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl(
498 WrapUnique(new SchedulerWorkerThreadDelegateImpl( 514 this, re_enqueue_sequence_callback,
499 this, re_enqueue_sequence_callback, &shared_priority_queue_)), 515 &shared_priority_queue_, static_cast<int>(i))),
500 task_tracker_); 516 task_tracker_);
501 if (!worker_thread) 517 if (!worker_thread)
502 break; 518 break;
503 idle_worker_threads_stack_.Push(worker_thread.get()); 519 idle_worker_threads_stack_.Push(worker_thread.get());
504 worker_threads_.push_back(std::move(worker_thread)); 520 worker_threads_.push_back(std::move(worker_thread));
505 } 521 }
506 522
507 #if DCHECK_IS_ON() 523 #if DCHECK_IS_ON()
508 threads_created_.Signal(); 524 threads_created_.Signal();
509 #endif 525 #endif
(...skipping 22 matching lines...) Expand all
532 } 548 }
533 549
534 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( 550 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack(
535 SchedulerWorkerThread* worker_thread) { 551 SchedulerWorkerThread* worker_thread) {
536 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 552 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
537 idle_worker_threads_stack_.Remove(worker_thread); 553 idle_worker_threads_stack_.Remove(worker_thread);
538 } 554 }
539 555
540 } // namespace internal 556 } // namespace internal
541 } // namespace base 557 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_thread_pool_impl.h ('k') | base/task_scheduler/scheduler_thread_pool_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698