Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(434)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9689028: Move work signal count tracking from SequencedWorkerPool to the test file (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Sync to head Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
65 DISALLOW_COPY_AND_ASSIGN(Worker); 65 DISALLOW_COPY_AND_ASSIGN(Worker);
66 }; 66 };
67 67
68 // Inner ---------------------------------------------------------------------- 68 // Inner ----------------------------------------------------------------------
69 69
70 class SequencedWorkerPool::Inner { 70 class SequencedWorkerPool::Inner {
71 public: 71 public:
72 // Take a raw pointer to |worker| to avoid cycles (since we're owned 72 // Take a raw pointer to |worker| to avoid cycles (since we're owned
73 // by it). 73 // by it).
74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
75 const std::string& thread_name_prefix); 75 const std::string& thread_name_prefix,
76 TestingObserver* observer);
76 77
77 ~Inner(); 78 ~Inner();
78 79
79 SequenceToken GetSequenceToken(); 80 SequenceToken GetSequenceToken();
80 81
81 SequenceToken GetNamedSequenceToken(const std::string& name); 82 SequenceToken GetNamedSequenceToken(const std::string& name);
82 83
83 // This function accepts a name and an ID. If the name is null, the 84 // This function accepts a name and an ID. If the name is null, the
84 // token ID is used. This allows us to implement the optional name lookup 85 // token ID is used. This allows us to implement the optional name lookup
85 // from a single function without having to enter the lock a separate time. 86 // from a single function without having to enter the lock a separate time.
86 bool PostTask(const std::string* optional_token_name, 87 bool PostTask(const std::string* optional_token_name,
87 SequenceToken sequence_token, 88 SequenceToken sequence_token,
88 WorkerShutdown shutdown_behavior, 89 WorkerShutdown shutdown_behavior,
89 const tracked_objects::Location& from_here, 90 const tracked_objects::Location& from_here,
90 const Closure& task); 91 const Closure& task);
91 92
92 bool RunsTasksOnCurrentThread() const; 93 bool RunsTasksOnCurrentThread() const;
93 94
94 void FlushForTesting(); 95 void FlushForTesting();
95 96
96 void TriggerSpuriousWorkSignalForTesting(); 97 void SignalHasWorkForTesting();
97 98
98 int GetWorkSignalCountForTesting() const; 99 int GetWorkSignalCountForTesting() const;
99 100
100 void Shutdown(); 101 void Shutdown();
101 102
102 void SetTestingObserver(TestingObserver* observer);
103
104 // Runs the worker loop on the background thread. 103 // Runs the worker loop on the background thread.
105 void ThreadLoop(Worker* this_worker); 104 void ThreadLoop(Worker* this_worker);
106 105
107 private: 106 private:
108 // Returns whether there are no more pending tasks and all threads 107 // Returns whether there are no more pending tasks and all threads
109 // are idle. Must be called under lock. 108 // are idle. Must be called under lock.
110 bool IsIdle() const; 109 bool IsIdle() const;
111 110
112 // Called from within the lock, this converts the given token name into a 111 // Called from within the lock, this converts the given token name into a
113 // token ID, creating a new one if necessary. 112 // token ID, creating a new one if necessary.
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
166 165
167 // This lock protects |everything in this class|. Do not read or modify 166 // This lock protects |everything in this class|. Do not read or modify
168 // anything without holding this lock. Do not block while holding this 167 // anything without holding this lock. Do not block while holding this
169 // lock. 168 // lock.
170 mutable Lock lock_; 169 mutable Lock lock_;
171 170
172 // Condition variable that is waited on by worker threads until new 171 // Condition variable that is waited on by worker threads until new
173 // tasks are posted or shutdown starts. 172 // tasks are posted or shutdown starts.
174 ConditionVariable has_work_cv_; 173 ConditionVariable has_work_cv_;
175 174
176 // Number of times |has_work_| has been signalled. Used for testing.
177 int has_work_signal_count_;
178
179 // Condition variable that is waited on by non-worker threads (in 175 // Condition variable that is waited on by non-worker threads (in
180 // FlushForTesting()) until IsIdle() goes to true. 176 // FlushForTesting()) until IsIdle() goes to true.
181 ConditionVariable is_idle_cv_; 177 ConditionVariable is_idle_cv_;
182 178
183 // Condition variable that is waited on by non-worker threads (in 179 // Condition variable that is waited on by non-worker threads (in
184 // Shutdown()) until CanShutdown() goes to true. 180 // Shutdown()) until CanShutdown() goes to true.
185 ConditionVariable can_shutdown_cv_; 181 ConditionVariable can_shutdown_cv_;
186 182
187 // The maximum number of worker threads we'll create. 183 // The maximum number of worker threads we'll create.
188 const size_t max_threads_; 184 const size_t max_threads_;
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 // shutdown. 217 // shutdown.
222 size_t blocking_shutdown_pending_task_count_; 218 size_t blocking_shutdown_pending_task_count_;
223 219
224 // Lists all sequence tokens currently executing. 220 // Lists all sequence tokens currently executing.
225 std::set<int> current_sequences_; 221 std::set<int> current_sequences_;
226 222
227 // Set when Shutdown is called and no further tasks should be 223 // Set when Shutdown is called and no further tasks should be
228 // allowed, though we may still be running existing tasks. 224 // allowed, though we may still be running existing tasks.
229 bool shutdown_called_; 225 bool shutdown_called_;
230 226
231 TestingObserver* testing_observer_; 227 TestingObserver* const testing_observer_;
232 228
233 DISALLOW_COPY_AND_ASSIGN(Inner); 229 DISALLOW_COPY_AND_ASSIGN(Inner);
234 }; 230 };
235 231
236 // Worker definitions --------------------------------------------------------- 232 // Worker definitions ---------------------------------------------------------
237 233
238 SequencedWorkerPool::Worker::Worker( 234 SequencedWorkerPool::Worker::Worker(
239 const scoped_refptr<SequencedWorkerPool>& worker_pool, 235 const scoped_refptr<SequencedWorkerPool>& worker_pool,
240 int thread_number, 236 int thread_number,
241 const std::string& prefix) 237 const std::string& prefix)
(...skipping 15 matching lines...) Expand all
257 worker_pool_->inner_->ThreadLoop(this); 253 worker_pool_->inner_->ThreadLoop(this);
258 // Release our cyclic reference once we're done. 254 // Release our cyclic reference once we're done.
259 worker_pool_ = NULL; 255 worker_pool_ = NULL;
260 } 256 }
261 257
262 // Inner definitions --------------------------------------------------------- 258 // Inner definitions ---------------------------------------------------------
263 259
264 SequencedWorkerPool::Inner::Inner( 260 SequencedWorkerPool::Inner::Inner(
265 SequencedWorkerPool* worker_pool, 261 SequencedWorkerPool* worker_pool,
266 size_t max_threads, 262 size_t max_threads,
267 const std::string& thread_name_prefix) 263 const std::string& thread_name_prefix,
264 TestingObserver* observer)
268 : worker_pool_(worker_pool), 265 : worker_pool_(worker_pool),
269 last_sequence_number_(0), 266 last_sequence_number_(0),
270 lock_(), 267 lock_(),
271 has_work_cv_(&lock_), 268 has_work_cv_(&lock_),
272 has_work_signal_count_(0),
273 is_idle_cv_(&lock_), 269 is_idle_cv_(&lock_),
274 can_shutdown_cv_(&lock_), 270 can_shutdown_cv_(&lock_),
275 max_threads_(max_threads), 271 max_threads_(max_threads),
276 thread_name_prefix_(thread_name_prefix), 272 thread_name_prefix_(thread_name_prefix),
277 thread_being_created_(false), 273 thread_being_created_(false),
278 waiting_thread_count_(0), 274 waiting_thread_count_(0),
279 blocking_shutdown_thread_count_(0), 275 blocking_shutdown_thread_count_(0),
280 pending_task_count_(0), 276 pending_task_count_(0),
281 blocking_shutdown_pending_task_count_(0), 277 blocking_shutdown_pending_task_count_(0),
282 shutdown_called_(false), 278 shutdown_called_(false),
283 testing_observer_(NULL) {} 279 testing_observer_(observer) {}
284 280
285 SequencedWorkerPool::Inner::~Inner() { 281 SequencedWorkerPool::Inner::~Inner() {
286 // You must call Shutdown() before destroying the pool. 282 // You must call Shutdown() before destroying the pool.
287 DCHECK(shutdown_called_); 283 DCHECK(shutdown_called_);
288 284
289 // Need to explicitly join with the threads before they're destroyed or else 285 // Need to explicitly join with the threads before they're destroyed or else
290 // they will be running when our object is half torn down. 286 // they will be running when our object is half torn down.
291 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 287 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
292 it->second->Join(); 288 it->second->Join();
293 threads_.clear(); 289 threads_.clear();
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
353 AutoLock lock(lock_); 349 AutoLock lock(lock_);
354 return ContainsKey(threads_, PlatformThread::CurrentId()); 350 return ContainsKey(threads_, PlatformThread::CurrentId());
355 } 351 }
356 352
357 void SequencedWorkerPool::Inner::FlushForTesting() { 353 void SequencedWorkerPool::Inner::FlushForTesting() {
358 AutoLock lock(lock_); 354 AutoLock lock(lock_);
359 while (!IsIdle()) 355 while (!IsIdle())
360 is_idle_cv_.Wait(); 356 is_idle_cv_.Wait();
361 } 357 }
362 358
363 void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() { 359 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
364 SignalHasWork(); 360 SignalHasWork();
365 } 361 }
366 362
367 int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const {
368 AutoLock lock(lock_);
369 return has_work_signal_count_;
370 }
371
372 void SequencedWorkerPool::Inner::Shutdown() { 363 void SequencedWorkerPool::Inner::Shutdown() {
373 // Mark us as terminated and go through and drop all tasks that aren't 364 // Mark us as terminated and go through and drop all tasks that aren't
374 // required to run on shutdown. Since no new tasks will get posted once the 365 // required to run on shutdown. Since no new tasks will get posted once the
375 // terminated flag is set, this ensures that all remaining tasks are required 366 // terminated flag is set, this ensures that all remaining tasks are required
376 // for shutdown whenever the termianted_ flag is set. 367 // for shutdown whenever the termianted_ flag is set.
377 { 368 {
378 AutoLock lock(lock_); 369 AutoLock lock(lock_);
379 370
380 if (shutdown_called_) 371 if (shutdown_called_)
381 return; 372 return;
382 shutdown_called_ = true; 373 shutdown_called_ = true;
383 374
384 // Tickle the threads. This will wake up a waiting one so it will know that 375 // Tickle the threads. This will wake up a waiting one so it will know that
385 // it can exit, which in turn will wake up any other waiting ones. 376 // it can exit, which in turn will wake up any other waiting ones.
386 has_work_cv_.Signal(); 377 SignalHasWork();
387 378
388 // There are no pending or running tasks blocking shutdown, we're done. 379 // There are no pending or running tasks blocking shutdown, we're done.
389 if (CanShutdown()) 380 if (CanShutdown())
390 return; 381 return;
391 } 382 }
392 383
393 // If we're here, then something is blocking shutdown. So wait for 384 // If we're here, then something is blocking shutdown. So wait for
394 // CanShutdown() to go to true. 385 // CanShutdown() to go to true.
395 386
396 if (testing_observer_) 387 if (testing_observer_)
397 testing_observer_->WillWaitForShutdown(); 388 testing_observer_->WillWaitForShutdown();
398 389
399 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 390 TimeTicks shutdown_wait_begin = TimeTicks::Now();
400 391
401 { 392 {
402 AutoLock lock(lock_); 393 AutoLock lock(lock_);
403 while (!CanShutdown()) 394 while (!CanShutdown())
404 can_shutdown_cv_.Wait(); 395 can_shutdown_cv_.Wait();
405 } 396 }
406 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 397 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
407 TimeTicks::Now() - shutdown_wait_begin); 398 TimeTicks::Now() - shutdown_wait_begin);
408 } 399 }
409 400
410 void SequencedWorkerPool::Inner::SetTestingObserver(
411 TestingObserver* observer) {
412 AutoLock lock(lock_);
413 testing_observer_ = observer;
414 }
415
416 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 401 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
417 { 402 {
418 AutoLock lock(lock_); 403 AutoLock lock(lock_);
419 DCHECK(thread_being_created_); 404 DCHECK(thread_being_created_);
420 thread_being_created_ = false; 405 thread_being_created_ = false;
421 std::pair<ThreadMap::iterator, bool> result = 406 std::pair<ThreadMap::iterator, bool> result =
422 threads_.insert( 407 threads_.insert(
423 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 408 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
424 DCHECK(result.second); 409 DCHECK(result.second);
425 410
426 while (true) { 411 while (true) {
427 // See GetWork for what delete_these_outside_lock is doing. 412 // See GetWork for what delete_these_outside_lock is doing.
428 SequencedTask task; 413 SequencedTask task;
429 std::vector<Closure> delete_these_outside_lock; 414 std::vector<Closure> delete_these_outside_lock;
430 if (GetWork(&task, &delete_these_outside_lock)) { 415 if (GetWork(&task, &delete_these_outside_lock)) {
431 int new_thread_id = WillRunWorkerTask(task); 416 int new_thread_id = WillRunWorkerTask(task);
432 { 417 {
433 AutoUnlock unlock(lock_); 418 AutoUnlock unlock(lock_);
434 // There may be more work available, so wake up another 419 // There may be more work available, so wake up another
435 // worker thread. (Technically not required, since we 420 // worker thread. (Technically not required, since we
436 // already get a signal for each new task, but it doesn't 421 // already get a signal for each new task, but it doesn't
437 // hurt.) 422 // hurt.)
438 has_work_cv_.Signal(); 423 SignalHasWork();
439 delete_these_outside_lock.clear(); 424 delete_these_outside_lock.clear();
440 425
441 // Complete thread creation outside the lock if necessary. 426 // Complete thread creation outside the lock if necessary.
442 if (new_thread_id) 427 if (new_thread_id)
443 FinishStartingAdditionalThread(new_thread_id); 428 FinishStartingAdditionalThread(new_thread_id);
444 429
445 task.task.Run(); 430 task.task.Run();
446 431
447 // Make sure our task is erased outside the lock for the same reason 432 // Make sure our task is erased outside the lock for the same reason
448 // we do this with delete_these_oustide_lock. 433 // we do this with delete_these_oustide_lock.
(...skipping 13 matching lines...) Expand all
462 if (IsIdle()) 447 if (IsIdle())
463 is_idle_cv_.Signal(); 448 is_idle_cv_.Signal();
464 has_work_cv_.Wait(); 449 has_work_cv_.Wait();
465 waiting_thread_count_--; 450 waiting_thread_count_--;
466 } 451 }
467 } 452 }
468 } // Release lock_. 453 } // Release lock_.
469 454
470 // We noticed we should exit. Wake up the next worker so it knows it should 455 // We noticed we should exit. Wake up the next worker so it knows it should
471 // exit as well (because the Shutdown() code only signals once). 456 // exit as well (because the Shutdown() code only signals once).
472 has_work_cv_.Signal(); 457 SignalHasWork();
473 458
474 // Possibly unblock shutdown. 459 // Possibly unblock shutdown.
475 can_shutdown_cv_.Signal(); 460 can_shutdown_cv_.Signal();
476 } 461 }
477 462
478 bool SequencedWorkerPool::Inner::IsIdle() const { 463 bool SequencedWorkerPool::Inner::IsIdle() const {
479 lock_.AssertAcquired(); 464 lock_.AssertAcquired();
480 return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size(); 465 return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size();
481 } 466 }
482 467
(...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after
679 // Called outside of the lock. 664 // Called outside of the lock.
680 DCHECK(thread_number > 0); 665 DCHECK(thread_number > 0);
681 666
682 // The worker is assigned to the list when the thread actually starts, which 667 // The worker is assigned to the list when the thread actually starts, which
683 // will manage the memory of the pointer. 668 // will manage the memory of the pointer.
684 new Worker(worker_pool_, thread_number, thread_name_prefix_); 669 new Worker(worker_pool_, thread_number, thread_name_prefix_);
685 } 670 }
686 671
687 void SequencedWorkerPool::Inner::SignalHasWork() { 672 void SequencedWorkerPool::Inner::SignalHasWork() {
688 has_work_cv_.Signal(); 673 has_work_cv_.Signal();
689 { 674 if (testing_observer_) {
690 AutoLock lock(lock_); 675 testing_observer_->OnHasWork();
691 ++has_work_signal_count_;
692 } 676 }
693 } 677 }
694 678
695 bool SequencedWorkerPool::Inner::CanShutdown() const { 679 bool SequencedWorkerPool::Inner::CanShutdown() const {
696 lock_.AssertAcquired(); 680 lock_.AssertAcquired();
697 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 681 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
698 return !thread_being_created_ && 682 return !thread_being_created_ &&
699 blocking_shutdown_thread_count_ == 0 && 683 blocking_shutdown_thread_count_ == 0 &&
700 blocking_shutdown_pending_task_count_ == 0; 684 blocking_shutdown_pending_task_count_ == 0;
701 } 685 }
702 686
703 // SequencedWorkerPool -------------------------------------------------------- 687 // SequencedWorkerPool --------------------------------------------------------
704 688
705 SequencedWorkerPool::SequencedWorkerPool( 689 SequencedWorkerPool::SequencedWorkerPool(
706 size_t max_threads, 690 size_t max_threads,
707 const std::string& thread_name_prefix) 691 const std::string& thread_name_prefix)
708 : constructor_message_loop_(MessageLoopProxy::current()), 692 : constructor_message_loop_(MessageLoopProxy::current()),
709 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), 693 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this),
710 max_threads, thread_name_prefix)) { 694 max_threads, thread_name_prefix, NULL)) {
711 DCHECK(constructor_message_loop_.get()); 695 DCHECK(constructor_message_loop_.get());
712 } 696 }
713 697
698 SequencedWorkerPool::SequencedWorkerPool(
699 size_t max_threads,
700 const std::string& thread_name_prefix,
701 TestingObserver* observer)
702 : constructor_message_loop_(MessageLoopProxy::current()),
703 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this),
704 max_threads, thread_name_prefix, observer)) {
705 DCHECK(constructor_message_loop_.get());
706 }
707
714 SequencedWorkerPool::~SequencedWorkerPool() {} 708 SequencedWorkerPool::~SequencedWorkerPool() {}
715 709
716 void SequencedWorkerPool::OnDestruct() const { 710 void SequencedWorkerPool::OnDestruct() const {
717 // TODO(akalin): Once we can easily check if we're on a worker 711 // TODO(akalin): Once we can easily check if we're on a worker
718 // thread or not, use that instead of restricting destruction to 712 // thread or not, use that instead of restricting destruction to
719 // only the constructor message loop. 713 // only the constructor message loop.
720 if (constructor_message_loop_->BelongsToCurrentThread()) 714 if (constructor_message_loop_->BelongsToCurrentThread())
721 delete this; 715 delete this;
722 else 716 else
723 constructor_message_loop_->DeleteSoon(FROM_HERE, this); 717 constructor_message_loop_->DeleteSoon(FROM_HERE, this);
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
792 } 786 }
793 787
794 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 788 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
795 return inner_->RunsTasksOnCurrentThread(); 789 return inner_->RunsTasksOnCurrentThread();
796 } 790 }
797 791
798 void SequencedWorkerPool::FlushForTesting() { 792 void SequencedWorkerPool::FlushForTesting() {
799 inner_->FlushForTesting(); 793 inner_->FlushForTesting();
800 } 794 }
801 795
802 void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() { 796 void SequencedWorkerPool::SignalHasWorkForTesting() {
803 inner_->TriggerSpuriousWorkSignalForTesting(); 797 inner_->SignalHasWorkForTesting();
804 }
805
806 int SequencedWorkerPool::GetWorkSignalCountForTesting() const {
807 return inner_->GetWorkSignalCountForTesting();
808 } 798 }
809 799
810 void SequencedWorkerPool::Shutdown() { 800 void SequencedWorkerPool::Shutdown() {
811 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 801 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
812 inner_->Shutdown(); 802 inner_->Shutdown();
813 } 803 }
814 804
815 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
816 inner_->SetTestingObserver(observer);
817 }
818
819 } // namespace base 805 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698