Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(154)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool_impl.cc

Issue 1958973003: Revert of Name TaskScheduler's worker threads (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b7_fdoray_fixtracing
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h"
6 6
7 #include <stddef.h>
8
9 #include <algorithm> 7 #include <algorithm>
10 #include <utility> 8 #include <utility>
11 9
12 #include "base/bind.h" 10 #include "base/bind.h"
13 #include "base/bind_helpers.h" 11 #include "base/bind_helpers.h"
14 #include "base/lazy_instance.h" 12 #include "base/lazy_instance.h"
15 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
16 #include "base/sequenced_task_runner.h" 14 #include "base/sequenced_task_runner.h"
17 #include "base/single_thread_task_runner.h" 15 #include "base/single_thread_task_runner.h"
18 #include "base/strings/stringprintf.h"
19 #include "base/task_scheduler/delayed_task_manager.h" 16 #include "base/task_scheduler/delayed_task_manager.h"
20 #include "base/task_scheduler/task_tracker.h" 17 #include "base/task_scheduler/task_tracker.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/threading/thread_local.h" 18 #include "base/threading/thread_local.h"
23 #include "base/threading/thread_restrictions.h" 19 #include "base/threading/thread_restrictions.h"
24 20
25 namespace base { 21 namespace base {
26 namespace internal { 22 namespace internal {
27 23
28 namespace { 24 namespace {
29 25
30 // SchedulerThreadPool that owns the current thread, if any. 26 // SchedulerThreadPool that owns the current thread, if any.
31 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky 27 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
177 173
178 } // namespace 174 } // namespace
179 175
180 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl 176 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
181 : public SchedulerWorkerThread::Delegate { 177 : public SchedulerWorkerThread::Delegate {
182 public: 178 public:
183 // |outer| owns the worker thread for which this delegate is constructed. 179 // |outer| owns the worker thread for which this delegate is constructed.
184 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is 180 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
185 // called with a non-single-threaded Sequence. |shared_priority_queue| is a 181 // called with a non-single-threaded Sequence. |shared_priority_queue| is a
186 // PriorityQueue whose transactions may overlap with the worker thread's 182 // PriorityQueue whose transactions may overlap with the worker thread's
187 // single-threaded PriorityQueue's transactions. |index| will be appended to 183 // single-threaded PriorityQueue's transactions.
188 // this thread's name to uniquely identify it.
189 SchedulerWorkerThreadDelegateImpl( 184 SchedulerWorkerThreadDelegateImpl(
190 SchedulerThreadPoolImpl* outer, 185 SchedulerThreadPoolImpl* outer,
191 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 186 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
192 const PriorityQueue* shared_priority_queue, 187 const PriorityQueue* shared_priority_queue);
193 int index);
194 ~SchedulerWorkerThreadDelegateImpl() override; 188 ~SchedulerWorkerThreadDelegateImpl() override;
195 189
196 PriorityQueue* single_threaded_priority_queue() { 190 PriorityQueue* single_threaded_priority_queue() {
197 return &single_threaded_priority_queue_; 191 return &single_threaded_priority_queue_;
198 } 192 }
199 193
200 // SchedulerWorkerThread::Delegate: 194 // SchedulerWorkerThread::Delegate:
201 void OnMainEntry(SchedulerWorkerThread* worker_thread) override; 195 void OnMainEntry(SchedulerWorkerThread* worker_thread) override;
202 scoped_refptr<Sequence> GetWork( 196 scoped_refptr<Sequence> GetWork(
203 SchedulerWorkerThread* worker_thread) override; 197 SchedulerWorkerThread* worker_thread) override;
204 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; 198 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
205 199
206 private: 200 private:
207 SchedulerThreadPoolImpl* outer_; 201 SchedulerThreadPoolImpl* outer_;
208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; 202 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
209 203
210 // Single-threaded PriorityQueue for the worker thread. 204 // Single-threaded PriorityQueue for the worker thread.
211 PriorityQueue single_threaded_priority_queue_; 205 PriorityQueue single_threaded_priority_queue_;
212 206
213 // True if the last Sequence returned by GetWork() was extracted from 207 // True if the last Sequence returned by GetWork() was extracted from
214 // |single_threaded_priority_queue_|. 208 // |single_threaded_priority_queue_|.
215 bool last_sequence_is_single_threaded_ = false; 209 bool last_sequence_is_single_threaded_ = false;
216 210
217 const int index_;
218
219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); 211 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
220 }; 212 };
221 213
222 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() { 214 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() {
223 // SchedulerThreadPool should never be deleted in production unless its 215 // SchedulerThreadPool should never be deleted in production unless its
224 // initialization failed. 216 // initialization failed.
225 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); 217 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
226 } 218 }
227 219
228 // static 220 // static
229 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create( 221 std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create(
230 StringPiece name,
231 ThreadPriority thread_priority, 222 ThreadPriority thread_priority,
232 size_t max_threads, 223 size_t max_threads,
233 IORestriction io_restriction, 224 IORestriction io_restriction,
234 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 225 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
235 TaskTracker* task_tracker, 226 TaskTracker* task_tracker,
236 DelayedTaskManager* delayed_task_manager) { 227 DelayedTaskManager* delayed_task_manager) {
237 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool( 228 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool(
238 new SchedulerThreadPoolImpl(name, io_restriction, task_tracker, 229 new SchedulerThreadPoolImpl(io_restriction, task_tracker,
239 delayed_task_manager)); 230 delayed_task_manager));
240 if (thread_pool->Initialize(thread_priority, max_threads, 231 if (thread_pool->Initialize(thread_priority, max_threads,
241 re_enqueue_sequence_callback)) { 232 re_enqueue_sequence_callback)) {
242 return thread_pool; 233 return thread_pool;
243 } 234 }
244 return nullptr; 235 return nullptr;
245 } 236 }
246 237
247 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() { 238 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() {
248 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 239 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
370 worker_thread->WakeUp(); 361 worker_thread->WakeUp();
371 else 362 else
372 WakeUpOneThread(); 363 WakeUpOneThread();
373 } 364 }
374 } 365 }
375 366
376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: 367 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
377 SchedulerWorkerThreadDelegateImpl( 368 SchedulerWorkerThreadDelegateImpl(
378 SchedulerThreadPoolImpl* outer, 369 SchedulerThreadPoolImpl* outer,
379 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 370 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
380 const PriorityQueue* shared_priority_queue, 371 const PriorityQueue* shared_priority_queue)
381 int index)
382 : outer_(outer), 372 : outer_(outer),
383 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), 373 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
384 single_threaded_priority_queue_(shared_priority_queue), 374 single_threaded_priority_queue_(shared_priority_queue) {}
385 index_(index) {}
386 375
387 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: 376 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
388 ~SchedulerWorkerThreadDelegateImpl() = default; 377 ~SchedulerWorkerThreadDelegateImpl() = default;
389 378
390 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( 379 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry(
391 SchedulerWorkerThread* worker_thread) { 380 SchedulerWorkerThread* worker_thread) {
392 #if DCHECK_IS_ON() 381 #if DCHECK_IS_ON()
393 // Wait for |outer_->threads_created_| to avoid traversing 382 // Wait for |outer_->threads_created_| to avoid traversing
394 // |outer_->worker_threads_| while it is being filled by Initialize(). 383 // |outer_->worker_threads_| while it is being filled by Initialize().
395 outer_->threads_created_.Wait(); 384 outer_->threads_created_.Wait();
396 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); 385 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread));
397 #endif 386 #endif
398 387
399 PlatformThread::SetName(
400 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_));
401
402 DCHECK(!tls_current_worker_thread.Get().Get()); 388 DCHECK(!tls_current_worker_thread.Get().Get());
403 DCHECK(!tls_current_thread_pool.Get().Get()); 389 DCHECK(!tls_current_thread_pool.Get().Get());
404 tls_current_worker_thread.Get().Set(worker_thread); 390 tls_current_worker_thread.Get().Set(worker_thread);
405 tls_current_thread_pool.Get().Set(outer_); 391 tls_current_thread_pool.Get().Set(outer_);
406 392
407 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == 393 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ ==
408 IORestriction::ALLOWED); 394 IORestriction::ALLOWED);
409 } 395 }
410 396
411 scoped_refptr<Sequence> 397 scoped_refptr<Sequence>
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
473 single_threaded_priority_queue_.BeginTransaction()->Push( 459 single_threaded_priority_queue_.BeginTransaction()->Push(
474 std::move(sequence), sequence_sort_key); 460 std::move(sequence), sequence_sort_key);
475 } else { 461 } else {
476 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue 462 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
477 // |sequence| must be enqueued. 463 // |sequence| must be enqueued.
478 re_enqueue_sequence_callback_.Run(std::move(sequence)); 464 re_enqueue_sequence_callback_.Run(std::move(sequence));
479 } 465 }
480 } 466 }
481 467
482 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( 468 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
483 StringPiece name,
484 IORestriction io_restriction, 469 IORestriction io_restriction,
485 TaskTracker* task_tracker, 470 TaskTracker* task_tracker,
486 DelayedTaskManager* delayed_task_manager) 471 DelayedTaskManager* delayed_task_manager)
487 : name_(name.as_string()), 472 : io_restriction_(io_restriction),
488 io_restriction_(io_restriction),
489 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), 473 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
490 idle_worker_threads_stack_cv_for_testing_( 474 idle_worker_threads_stack_cv_for_testing_(
491 idle_worker_threads_stack_lock_.CreateConditionVariable()), 475 idle_worker_threads_stack_lock_.CreateConditionVariable()),
492 join_for_testing_returned_(true, false), 476 join_for_testing_returned_(true, false),
493 #if DCHECK_IS_ON() 477 #if DCHECK_IS_ON()
494 threads_created_(true, false), 478 threads_created_(true, false),
495 #endif 479 #endif
496 task_tracker_(task_tracker), 480 task_tracker_(task_tracker),
497 delayed_task_manager_(delayed_task_manager) { 481 delayed_task_manager_(delayed_task_manager) {
498 DCHECK(task_tracker_); 482 DCHECK(task_tracker_);
499 DCHECK(delayed_task_manager_); 483 DCHECK(delayed_task_manager_);
500 } 484 }
501 485
502 bool SchedulerThreadPoolImpl::Initialize( 486 bool SchedulerThreadPoolImpl::Initialize(
503 ThreadPriority thread_priority, 487 ThreadPriority thread_priority,
504 size_t max_threads, 488 size_t max_threads,
505 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { 489 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
506 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 490 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
507 491
508 DCHECK(worker_threads_.empty()); 492 DCHECK(worker_threads_.empty());
509 493
510 for (size_t i = 0; i < max_threads; ++i) { 494 for (size_t i = 0; i < max_threads; ++i) {
511 std::unique_ptr<SchedulerWorkerThread> worker_thread = 495 std::unique_ptr<SchedulerWorkerThread> worker_thread =
512 SchedulerWorkerThread::Create( 496 SchedulerWorkerThread::Create(
513 thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl( 497 thread_priority,
514 this, re_enqueue_sequence_callback, 498 WrapUnique(new SchedulerWorkerThreadDelegateImpl(
515 &shared_priority_queue_, i)), 499 this, re_enqueue_sequence_callback, &shared_priority_queue_)),
516 task_tracker_); 500 task_tracker_);
517 if (!worker_thread) 501 if (!worker_thread)
518 break; 502 break;
519 idle_worker_threads_stack_.Push(worker_thread.get()); 503 idle_worker_threads_stack_.Push(worker_thread.get());
520 worker_threads_.push_back(std::move(worker_thread)); 504 worker_threads_.push_back(std::move(worker_thread));
521 } 505 }
522 506
523 #if DCHECK_IS_ON() 507 #if DCHECK_IS_ON()
524 threads_created_.Signal(); 508 threads_created_.Signal();
525 #endif 509 #endif
(...skipping 22 matching lines...) Expand all
548 } 532 }
549 533
550 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( 534 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack(
551 SchedulerWorkerThread* worker_thread) { 535 SchedulerWorkerThread* worker_thread) {
552 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 536 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
553 idle_worker_threads_stack_.Remove(worker_thread); 537 idle_worker_threads_stack_.Remove(worker_thread);
554 } 538 }
555 539
556 } // namespace internal 540 } // namespace internal
557 } // namespace base 541 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_thread_pool_impl.h ('k') | base/task_scheduler/scheduler_thread_pool_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698