Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(286)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 11649032: Flush SequenceWorkerPool tasks after each unit test. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after
274 SequenceToken sequence_token, 274 SequenceToken sequence_token,
275 WorkerShutdown shutdown_behavior, 275 WorkerShutdown shutdown_behavior,
276 const tracked_objects::Location& from_here, 276 const tracked_objects::Location& from_here,
277 const Closure& task, 277 const Closure& task,
278 TimeDelta delay); 278 TimeDelta delay);
279 279
280 bool RunsTasksOnCurrentThread() const; 280 bool RunsTasksOnCurrentThread() const;
281 281
282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
283 283
284 void FlushForTesting(); 284 void CleanupForTesting();
285 285
286 void SignalHasWorkForTesting(); 286 void SignalHasWorkForTesting();
287 287
288 int GetWorkSignalCountForTesting() const; 288 int GetWorkSignalCountForTesting() const;
289 289
290 void Shutdown(int max_blocking_tasks_after_shutdown); 290 void Shutdown(int max_blocking_tasks_after_shutdown);
291 291
292 // Runs the worker loop on the background thread. 292 // Runs the worker loop on the background thread.
293 void ThreadLoop(Worker* this_worker); 293 void ThreadLoop(Worker* this_worker);
294 294
295 private: 295 private:
296 enum GetWorkStatus { 296 enum GetWorkStatus {
297 GET_WORK_FOUND, 297 GET_WORK_FOUND,
298 GET_WORK_NOT_FOUND, 298 GET_WORK_NOT_FOUND,
299 GET_WORK_WAIT, 299 GET_WORK_WAIT,
300 }; 300 };
301 301
302 // Returns whether there are no more pending tasks and all threads 302 enum CleanupState {
303 // are idle. Must be called under lock. 303 CLEANUP_REQUESTED,
304 bool IsIdle() const; 304 CLEANUP_STARTING,
305 CLEANUP_RUNNING,
306 CLEANUP_FINISHING,
307 CLEANUP_DONE,
308 };
305 309
306 // Called from within the lock, this converts the given token name into a 310 // Called from within the lock, this converts the given token name into a
307 // token ID, creating a new one if necessary. 311 // token ID, creating a new one if necessary.
308 int LockedGetNamedTokenID(const std::string& name); 312 int LockedGetNamedTokenID(const std::string& name);
309 313
310 // Called from within the lock, this returns the next sequence task number. 314 // Called from within the lock, this returns the next sequence task number.
311 int64 LockedGetNextSequenceTaskNumber(); 315 int64 LockedGetNextSequenceTaskNumber();
312 316
313 // Called from within the lock, returns the shutdown behavior of the task 317 // Called from within the lock, returns the shutdown behavior of the task
314 // running on the currently executing worker thread. If invoked from a thread 318 // running on the currently executing worker thread. If invoked from a thread
(...skipping 12 matching lines...) Expand all
327 // filled in the time to wait until the next task to run. In this case, the 331 // filled in the time to wait until the next task to run. In this case, the
328 // caller should wait the time. 332 // caller should wait the time.
329 // 333 //
330 // In any case, the calling code should clear the given 334 // In any case, the calling code should clear the given
331 // delete_these_outside_lock vector the next time the lock is released. 335 // delete_these_outside_lock vector the next time the lock is released.
332 // See the implementation for a more detailed description. 336 // See the implementation for a more detailed description.
333 GetWorkStatus GetWork(SequencedTask* task, 337 GetWorkStatus GetWork(SequencedTask* task,
334 TimeDelta* wait_time, 338 TimeDelta* wait_time,
335 std::vector<Closure>* delete_these_outside_lock); 339 std::vector<Closure>* delete_these_outside_lock);
336 340
341 void HandleCleanup();
342
337 // Peforms init and cleanup around running the given task. WillRun... 343 // Peforms init and cleanup around running the given task. WillRun...
338 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 344 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
339 // The calling code should call FinishStartingAdditionalThread once the 345 // The calling code should call FinishStartingAdditionalThread once the
340 // lock is released if the return values is nonzero. 346 // lock is released if the return values is nonzero.
341 int WillRunWorkerTask(const SequencedTask& task); 347 int WillRunWorkerTask(const SequencedTask& task);
342 void DidRunWorkerTask(const SequencedTask& task); 348 void DidRunWorkerTask(const SequencedTask& task);
343 349
344 // Returns true if there are no threads currently running the given 350 // Returns true if there are no threads currently running the given
345 // sequence token. 351 // sequence token.
346 bool IsSequenceTokenRunnable(int sequence_token_id) const; 352 bool IsSequenceTokenRunnable(int sequence_token_id) const;
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
382 // This lock protects |everything in this class|. Do not read or modify 388 // This lock protects |everything in this class|. Do not read or modify
383 // anything without holding this lock. Do not block while holding this 389 // anything without holding this lock. Do not block while holding this
384 // lock. 390 // lock.
385 mutable Lock lock_; 391 mutable Lock lock_;
386 392
387 // Condition variable that is waited on by worker threads until new 393 // Condition variable that is waited on by worker threads until new
388 // tasks are posted or shutdown starts. 394 // tasks are posted or shutdown starts.
389 ConditionVariable has_work_cv_; 395 ConditionVariable has_work_cv_;
390 396
391 // Condition variable that is waited on by non-worker threads (in 397 // Condition variable that is waited on by non-worker threads (in
392 // FlushForTesting()) until IsIdle() goes to true.
393 ConditionVariable is_idle_cv_;
394
395 // Condition variable that is waited on by non-worker threads (in
396 // Shutdown()) until CanShutdown() goes to true. 398 // Shutdown()) until CanShutdown() goes to true.
397 ConditionVariable can_shutdown_cv_; 399 ConditionVariable can_shutdown_cv_;
398 400
399 // The maximum number of worker threads we'll create. 401 // The maximum number of worker threads we'll create.
400 const size_t max_threads_; 402 const size_t max_threads_;
401 403
402 const std::string thread_name_prefix_; 404 const std::string thread_name_prefix_;
403 405
404 // Associates all known sequence token names with their IDs. 406 // Associates all known sequence token names with their IDs.
405 std::map<std::string, int> named_sequence_tokens_; 407 std::map<std::string, int> named_sequence_tokens_;
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
443 int trace_id_; 445 int trace_id_;
444 446
445 // Set when Shutdown is called and no further tasks should be 447 // Set when Shutdown is called and no further tasks should be
446 // allowed, though we may still be running existing tasks. 448 // allowed, though we may still be running existing tasks.
447 bool shutdown_called_; 449 bool shutdown_called_;
448 450
449 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 451 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
450 // has been called. 452 // has been called.
451 int max_blocking_tasks_after_shutdown_; 453 int max_blocking_tasks_after_shutdown_;
452 454
455 // State used to cleanup for testing, all guarded by lock_.
456 CleanupState cleanup_state_;
457 size_t cleanup_idlers_;
458 ConditionVariable cleanup_cv_;
459
453 TestingObserver* const testing_observer_; 460 TestingObserver* const testing_observer_;
454 461
455 DISALLOW_COPY_AND_ASSIGN(Inner); 462 DISALLOW_COPY_AND_ASSIGN(Inner);
456 }; 463 };
457 464
458 // Worker definitions --------------------------------------------------------- 465 // Worker definitions ---------------------------------------------------------
459 466
460 SequencedWorkerPool::Worker::Worker( 467 SequencedWorkerPool::Worker::Worker(
461 const scoped_refptr<SequencedWorkerPool>& worker_pool, 468 const scoped_refptr<SequencedWorkerPool>& worker_pool,
462 int thread_number, 469 int thread_number,
(...skipping 23 matching lines...) Expand all
486 493
487 SequencedWorkerPool::Inner::Inner( 494 SequencedWorkerPool::Inner::Inner(
488 SequencedWorkerPool* worker_pool, 495 SequencedWorkerPool* worker_pool,
489 size_t max_threads, 496 size_t max_threads,
490 const std::string& thread_name_prefix, 497 const std::string& thread_name_prefix,
491 TestingObserver* observer) 498 TestingObserver* observer)
492 : worker_pool_(worker_pool), 499 : worker_pool_(worker_pool),
493 last_sequence_number_(0), 500 last_sequence_number_(0),
494 lock_(), 501 lock_(),
495 has_work_cv_(&lock_), 502 has_work_cv_(&lock_),
496 is_idle_cv_(&lock_),
497 can_shutdown_cv_(&lock_), 503 can_shutdown_cv_(&lock_),
498 max_threads_(max_threads), 504 max_threads_(max_threads),
499 thread_name_prefix_(thread_name_prefix), 505 thread_name_prefix_(thread_name_prefix),
500 thread_being_created_(false), 506 thread_being_created_(false),
501 waiting_thread_count_(0), 507 waiting_thread_count_(0),
502 blocking_shutdown_thread_count_(0), 508 blocking_shutdown_thread_count_(0),
503 next_sequence_task_number_(0), 509 next_sequence_task_number_(0),
504 blocking_shutdown_pending_task_count_(0), 510 blocking_shutdown_pending_task_count_(0),
505 trace_id_(0), 511 trace_id_(0),
506 shutdown_called_(false), 512 shutdown_called_(false),
507 max_blocking_tasks_after_shutdown_(0), 513 max_blocking_tasks_after_shutdown_(0),
514 cleanup_state_(CLEANUP_DONE),
515 cleanup_idlers_(0),
516 cleanup_cv_(&lock_),
508 testing_observer_(observer) {} 517 testing_observer_(observer) {}
509 518
510 SequencedWorkerPool::Inner::~Inner() { 519 SequencedWorkerPool::Inner::~Inner() {
511 // You must call Shutdown() before destroying the pool. 520 // You must call Shutdown() before destroying the pool.
512 DCHECK(shutdown_called_); 521 DCHECK(shutdown_called_);
513 522
514 // Need to explicitly join with the threads before they're destroyed or else 523 // Need to explicitly join with the threads before they're destroyed or else
515 // they will be running when our object is half torn down. 524 // they will be running when our object is half torn down.
516 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 525 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
517 it->second->Join(); 526 it->second->Join();
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
602 611
603 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
604 SequenceToken sequence_token) const { 613 SequenceToken sequence_token) const {
605 AutoLock lock(lock_); 614 AutoLock lock(lock_);
606 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
607 if (found == threads_.end()) 616 if (found == threads_.end())
608 return false; 617 return false;
609 return found->second->running_sequence().Equals(sequence_token); 618 return found->second->running_sequence().Equals(sequence_token);
610 } 619 }
611 620
612 void SequencedWorkerPool::Inner::FlushForTesting() { 621 // See https://code.google.com/p/chromium/issues/detail?id=168415
622 void SequencedWorkerPool::Inner::CleanupForTesting() {
623 DCHECK(!RunsTasksOnCurrentThread());
624 base::ThreadRestrictions::ScopedAllowWait allow_wait;
613 AutoLock lock(lock_); 625 AutoLock lock(lock_);
614 while (!IsIdle()) 626 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
615 is_idle_cv_.Wait(); 627 if (shutdown_called_)
628 return;
629 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
630 return;
631 cleanup_state_ = CLEANUP_REQUESTED;
632 cleanup_idlers_ = 0;
633 has_work_cv_.Signal();
634 while (cleanup_state_ != CLEANUP_DONE)
635 cleanup_cv_.Wait();
616 } 636 }
617 637
618 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 638 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
619 SignalHasWork(); 639 SignalHasWork();
620 } 640 }
621 641
622 void SequencedWorkerPool::Inner::Shutdown( 642 void SequencedWorkerPool::Inner::Shutdown(
623 int max_new_blocking_tasks_after_shutdown) { 643 int max_new_blocking_tasks_after_shutdown) {
624 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 644 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
625 { 645 {
626 AutoLock lock(lock_); 646 AutoLock lock(lock_);
627 647 // Cleanup and Shutdown should not be called concurrently.
648 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
628 if (shutdown_called_) 649 if (shutdown_called_)
629 return; 650 return;
630 shutdown_called_ = true; 651 shutdown_called_ = true;
631 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 652 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
632 653
633 // Tickle the threads. This will wake up a waiting one so it will know that 654 // Tickle the threads. This will wake up a waiting one so it will know that
634 // it can exit, which in turn will wake up any other waiting ones. 655 // it can exit, which in turn will wake up any other waiting ones.
635 SignalHasWork(); 656 SignalHasWork();
636 657
637 // There are no pending or running tasks blocking shutdown, we're done. 658 // There are no pending or running tasks blocking shutdown, we're done.
(...skipping 27 matching lines...) Expand all
665 std::pair<ThreadMap::iterator, bool> result = 686 std::pair<ThreadMap::iterator, bool> result =
666 threads_.insert( 687 threads_.insert(
667 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 688 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
668 DCHECK(result.second); 689 DCHECK(result.second);
669 690
670 while (true) { 691 while (true) {
671 #if defined(OS_MACOSX) 692 #if defined(OS_MACOSX)
672 base::mac::ScopedNSAutoreleasePool autorelease_pool; 693 base::mac::ScopedNSAutoreleasePool autorelease_pool;
673 #endif 694 #endif
674 695
696 HandleCleanup();
697
675 // See GetWork for what delete_these_outside_lock is doing. 698 // See GetWork for what delete_these_outside_lock is doing.
676 SequencedTask task; 699 SequencedTask task;
677 TimeDelta wait_time; 700 TimeDelta wait_time;
678 std::vector<Closure> delete_these_outside_lock; 701 std::vector<Closure> delete_these_outside_lock;
679 GetWorkStatus status = 702 GetWorkStatus status =
680 GetWork(&task, &wait_time, &delete_these_outside_lock); 703 GetWork(&task, &wait_time, &delete_these_outside_lock);
681 if (status == GET_WORK_FOUND) { 704 if (status == GET_WORK_FOUND) {
682 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", 705 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask",
683 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); 706 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
684 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", 707 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop",
(...skipping 28 matching lines...) Expand all
713 // same reason we do this with delete_these_oustide_lock. 736 // same reason we do this with delete_these_oustide_lock.
714 // Also, do it before calling set_running_task_info() so 737 // Also, do it before calling set_running_task_info() so
715 // that sequence-checking from within the task's destructor 738 // that sequence-checking from within the task's destructor
716 // still works. 739 // still works.
717 task.task = Closure(); 740 task.task = Closure();
718 741
719 this_worker->set_running_task_info( 742 this_worker->set_running_task_info(
720 SequenceToken(), CONTINUE_ON_SHUTDOWN); 743 SequenceToken(), CONTINUE_ON_SHUTDOWN);
721 } 744 }
722 DidRunWorkerTask(task); // Must be done inside the lock. 745 DidRunWorkerTask(task); // Must be done inside the lock.
746 } else if (cleanup_state_ == CLEANUP_RUNNING) {
747 switch (status) {
748 case GET_WORK_WAIT: {
749 AutoUnlock unlock(lock_);
750 delete_these_outside_lock.clear();
751 }
752 break;
753 case GET_WORK_NOT_FOUND:
754 CHECK(delete_these_outside_lock.empty());
755 cleanup_state_ = CLEANUP_FINISHING;
756 cleanup_cv_.Broadcast();
757 break;
758 default:
759 NOTREACHED();
760 }
723 } else { 761 } else {
724 // When we're terminating and there's no more work, we can 762 // When we're terminating and there's no more work, we can
725 // shut down, other workers can complete any pending or new tasks. 763 // shut down, other workers can complete any pending or new tasks.
726 // We can get additional tasks posted after shutdown_called_ is set 764 // We can get additional tasks posted after shutdown_called_ is set
727 // but only worker threads are allowed to post tasks at that time, and 765 // but only worker threads are allowed to post tasks at that time, and
728 // the workers responsible for posting those tasks will be available 766 // the workers responsible for posting those tasks will be available
729 // to run them. Also, there may be some tasks stuck behind running 767 // to run them. Also, there may be some tasks stuck behind running
730 // ones with the same sequence token, but additional threads won't 768 // ones with the same sequence token, but additional threads won't
731 // help this case. 769 // help this case.
732 if (shutdown_called_ && 770 if (shutdown_called_ &&
733 blocking_shutdown_pending_task_count_ == 0) 771 blocking_shutdown_pending_task_count_ == 0)
734 break; 772 break;
735 waiting_thread_count_++; 773 waiting_thread_count_++;
736 // This is the only time that IsIdle() can go to true.
737 if (IsIdle())
738 is_idle_cv_.Signal();
739 774
740 switch (status) { 775 switch (status) {
741 case GET_WORK_NOT_FOUND: 776 case GET_WORK_NOT_FOUND:
742 has_work_cv_.Wait(); 777 has_work_cv_.Wait();
743 break; 778 break;
744 case GET_WORK_WAIT: 779 case GET_WORK_WAIT:
745 has_work_cv_.TimedWait(wait_time); 780 has_work_cv_.TimedWait(wait_time);
746 break; 781 break;
747 default: 782 default:
748 NOTREACHED(); 783 NOTREACHED();
749 } 784 }
750 waiting_thread_count_--; 785 waiting_thread_count_--;
751 } 786 }
752 } 787 }
753 } // Release lock_. 788 } // Release lock_.
754 789
755 // We noticed we should exit. Wake up the next worker so it knows it should 790 // We noticed we should exit. Wake up the next worker so it knows it should
756 // exit as well (because the Shutdown() code only signals once). 791 // exit as well (because the Shutdown() code only signals once).
757 SignalHasWork(); 792 SignalHasWork();
758 793
759 // Possibly unblock shutdown. 794 // Possibly unblock shutdown.
760 can_shutdown_cv_.Signal(); 795 can_shutdown_cv_.Signal();
761 } 796 }
762 797
763 bool SequencedWorkerPool::Inner::IsIdle() const { 798 void SequencedWorkerPool::Inner::HandleCleanup() {
764 lock_.AssertAcquired(); 799 lock_.AssertAcquired();
765 return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); 800 if (cleanup_state_ == CLEANUP_DONE)
801 return;
802 if (cleanup_state_ == CLEANUP_REQUESTED) {
803 // We win, we get to do the cleanup as soon as the others wise up and idle.
804 cleanup_state_ = CLEANUP_STARTING;
805 while (thread_being_created_ ||
806 cleanup_idlers_ != threads_.size() - 1) {
807 has_work_cv_.Signal();
808 cleanup_cv_.Wait();
809 }
810 cleanup_state_ = CLEANUP_RUNNING;
811 return;
812 }
813 if (cleanup_state_ == CLEANUP_STARTING) {
814 // Another worker thread is cleaning up, we idle here until thats done.
815 ++cleanup_idlers_;
816 cleanup_cv_.Broadcast();
817 while (cleanup_state_ != CLEANUP_FINISHING) {
818 cleanup_cv_.Wait();
819 }
820 --cleanup_idlers_;
821 cleanup_cv_.Broadcast();
822 return;
823 }
824 if (cleanup_state_ == CLEANUP_FINISHING) {
825 // We wait for all idlers to wake up prior to being DONE.
826 while (cleanup_idlers_ != 0) {
827 cleanup_cv_.Broadcast();
828 cleanup_cv_.Wait();
829 }
830 if (cleanup_state_ == CLEANUP_FINISHING) {
831 cleanup_state_ = CLEANUP_DONE;
832 cleanup_cv_.Signal();
833 }
834 return;
835 }
766 } 836 }
767 837
768 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 838 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
769 const std::string& name) { 839 const std::string& name) {
770 lock_.AssertAcquired(); 840 lock_.AssertAcquired();
771 DCHECK(!name.empty()); 841 DCHECK(!name.empty());
772 842
773 std::map<std::string, int>::const_iterator found = 843 std::map<std::string, int>::const_iterator found =
774 named_sequence_tokens_.find(name); 844 named_sequence_tokens_.find(name);
775 if (found != named_sequence_tokens_.end()) 845 if (found != named_sequence_tokens_.end())
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
859 // happen. 929 // happen.
860 delete_these_outside_lock->push_back(i->task); 930 delete_these_outside_lock->push_back(i->task);
861 pending_tasks_.erase(i++); 931 pending_tasks_.erase(i++);
862 continue; 932 continue;
863 } 933 }
864 934
865 if (i->time_to_run > current_time) { 935 if (i->time_to_run > current_time) {
866 // The time to run has not come yet. 936 // The time to run has not come yet.
867 *wait_time = i->time_to_run - current_time; 937 *wait_time = i->time_to_run - current_time;
868 status = GET_WORK_WAIT; 938 status = GET_WORK_WAIT;
939 if (cleanup_state_ == CLEANUP_RUNNING) {
940 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
941 delete_these_outside_lock->push_back(i->task);
942 pending_tasks_.erase(i);
943 }
869 break; 944 break;
870 } 945 }
871 946
872 // Found a runnable task. 947 // Found a runnable task.
873 *task = *i; 948 *task = *i;
874 pending_tasks_.erase(i); 949 pending_tasks_.erase(i);
875 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 950 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
876 blocking_shutdown_pending_task_count_--; 951 blocking_shutdown_pending_task_count_--;
877 } 952 }
878 953
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
965 // cause a worker to start since one is pending. 1040 // cause a worker to start since one is pending.
966 // 3. Main thread initiates shutdown. 1041 // 3. Main thread initiates shutdown.
967 // 4. No more threads are created since the shutdown_called_ flag is set. 1042 // 4. No more threads are created since the shutdown_called_ flag is set.
968 // 1043 //
969 // The result is that one may expect that max_threads_ workers to be created 1044 // The result is that one may expect that max_threads_ workers to be created
970 // given the workload, but in reality fewer may be created because the 1045 // given the workload, but in reality fewer may be created because the
971 // sequence of thread creation on the background threads is racing with the 1046 // sequence of thread creation on the background threads is racing with the
972 // shutdown call. 1047 // shutdown call.
973 if (!shutdown_called_ && 1048 if (!shutdown_called_ &&
974 !thread_being_created_ && 1049 !thread_being_created_ &&
1050 cleanup_state_ == CLEANUP_DONE &&
975 threads_.size() < max_threads_ && 1051 threads_.size() < max_threads_ &&
976 waiting_thread_count_ == 0) { 1052 waiting_thread_count_ == 0) {
977 // We could use an additional thread if there's work to be done. 1053 // We could use an additional thread if there's work to be done.
978 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1054 for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
979 i != pending_tasks_.end(); ++i) { 1055 i != pending_tasks_.end(); ++i) {
980 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1056 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
981 // Found a runnable task, mark the thread as being started. 1057 // Found a runnable task, mark the thread as being started.
982 thread_being_created_ = true; 1058 thread_being_created_ = true;
983 return static_cast<int>(threads_.size() + 1); 1059 return static_cast<int>(threads_.size() + 1);
984 } 1060 }
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after
1143 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1219 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1144 return inner_->RunsTasksOnCurrentThread(); 1220 return inner_->RunsTasksOnCurrentThread();
1145 } 1221 }
1146 1222
1147 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1223 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1148 SequenceToken sequence_token) const { 1224 SequenceToken sequence_token) const {
1149 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1225 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1150 } 1226 }
1151 1227
1152 void SequencedWorkerPool::FlushForTesting() { 1228 void SequencedWorkerPool::FlushForTesting() {
1153 inner_->FlushForTesting(); 1229 inner_->CleanupForTesting();
1154 } 1230 }
1155 1231
1156 void SequencedWorkerPool::SignalHasWorkForTesting() { 1232 void SequencedWorkerPool::SignalHasWorkForTesting() {
1157 inner_->SignalHasWorkForTesting(); 1233 inner_->SignalHasWorkForTesting();
1158 } 1234 }
1159 1235
1160 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1236 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1161 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1237 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1162 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1238 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1163 } 1239 }
1164 1240
1165 } // namespace base 1241 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698