Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 11649032: Flush SequenceWorkerPool tasks after each unit test. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after
274 SequenceToken sequence_token, 274 SequenceToken sequence_token,
275 WorkerShutdown shutdown_behavior, 275 WorkerShutdown shutdown_behavior,
276 const tracked_objects::Location& from_here, 276 const tracked_objects::Location& from_here,
277 const Closure& task, 277 const Closure& task,
278 TimeDelta delay); 278 TimeDelta delay);
279 279
280 bool RunsTasksOnCurrentThread() const; 280 bool RunsTasksOnCurrentThread() const;
281 281
282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
283 283
284 void FlushForTesting(); 284 void CleanupForTesting();
285 285
286 void SignalHasWorkForTesting(); 286 void SignalHasWorkForTesting();
287 287
288 int GetWorkSignalCountForTesting() const; 288 int GetWorkSignalCountForTesting() const;
289 289
290 void Shutdown(int max_blocking_tasks_after_shutdown); 290 void Shutdown(int max_blocking_tasks_after_shutdown);
291 291
292 // Runs the worker loop on the background thread. 292 // Runs the worker loop on the background thread.
293 void ThreadLoop(Worker* this_worker); 293 void ThreadLoop(Worker* this_worker);
294 294
295 private: 295 private:
296 enum GetWorkStatus { 296 enum GetWorkStatus {
297 GET_WORK_FOUND, 297 GET_WORK_FOUND,
298 GET_WORK_NOT_FOUND, 298 GET_WORK_NOT_FOUND,
299 GET_WORK_WAIT, 299 GET_WORK_WAIT,
300 }; 300 };
301 301
302 // Returns whether there are no more pending tasks and all threads 302 enum CleanupState {
303 // are idle. Must be called under lock. 303 CLEANUP_REQUESTED,
304 bool IsIdle() const; 304 CLEANUP_STARTING,
305 CLEANUP_RUNNING,
306 CLEANUP_FINISHING,
307 CLEANUP_DONE,
308 };
305 309
306 // Called from within the lock, this converts the given token name into a 310 // Called from within the lock, this converts the given token name into a
307 // token ID, creating a new one if necessary. 311 // token ID, creating a new one if necessary.
308 int LockedGetNamedTokenID(const std::string& name); 312 int LockedGetNamedTokenID(const std::string& name);
309 313
310 // Called from within the lock, this returns the next sequence task number. 314 // Called from within the lock, this returns the next sequence task number.
311 int64 LockedGetNextSequenceTaskNumber(); 315 int64 LockedGetNextSequenceTaskNumber();
312 316
313 // Called from within the lock, returns the shutdown behavior of the task 317 // Called from within the lock, returns the shutdown behavior of the task
314 // running on the currently executing worker thread. If invoked from a thread 318 // running on the currently executing worker thread. If invoked from a thread
(...skipping 12 matching lines...) Expand all
327 // filled in the time to wait until the next task to run. In this case, the 331 // filled in the time to wait until the next task to run. In this case, the
328 // caller should wait the time. 332 // caller should wait the time.
329 // 333 //
330 // In any case, the calling code should clear the given 334 // In any case, the calling code should clear the given
331 // delete_these_outside_lock vector the next time the lock is released. 335 // delete_these_outside_lock vector the next time the lock is released.
332 // See the implementation for a more detailed description. 336 // See the implementation for a more detailed description.
333 GetWorkStatus GetWork(SequencedTask* task, 337 GetWorkStatus GetWork(SequencedTask* task,
334 TimeDelta* wait_time, 338 TimeDelta* wait_time,
335 std::vector<Closure>* delete_these_outside_lock); 339 std::vector<Closure>* delete_these_outside_lock);
336 340
341 void HandleCleanup();
342
337 // Peforms init and cleanup around running the given task. WillRun... 343 // Peforms init and cleanup around running the given task. WillRun...
338 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 344 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
339 // The calling code should call FinishStartingAdditionalThread once the 345 // The calling code should call FinishStartingAdditionalThread once the
340 // lock is released if the return values is nonzero. 346 // lock is released if the return values is nonzero.
341 int WillRunWorkerTask(const SequencedTask& task); 347 int WillRunWorkerTask(const SequencedTask& task);
342 void DidRunWorkerTask(const SequencedTask& task); 348 void DidRunWorkerTask(const SequencedTask& task);
343 349
344 // Returns true if there are no threads currently running the given 350 // Returns true if there are no threads currently running the given
345 // sequence token. 351 // sequence token.
346 bool IsSequenceTokenRunnable(int sequence_token_id) const; 352 bool IsSequenceTokenRunnable(int sequence_token_id) const;
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
382 // This lock protects |everything in this class|. Do not read or modify 388 // This lock protects |everything in this class|. Do not read or modify
383 // anything without holding this lock. Do not block while holding this 389 // anything without holding this lock. Do not block while holding this
384 // lock. 390 // lock.
385 mutable Lock lock_; 391 mutable Lock lock_;
386 392
387 // Condition variable that is waited on by worker threads until new 393 // Condition variable that is waited on by worker threads until new
388 // tasks are posted or shutdown starts. 394 // tasks are posted or shutdown starts.
389 ConditionVariable has_work_cv_; 395 ConditionVariable has_work_cv_;
390 396
391 // Condition variable that is waited on by non-worker threads (in 397 // Condition variable that is waited on by non-worker threads (in
392 // FlushForTesting()) until IsIdle() goes to true.
393 ConditionVariable is_idle_cv_;
394
395 // Condition variable that is waited on by non-worker threads (in
396 // Shutdown()) until CanShutdown() goes to true. 398 // Shutdown()) until CanShutdown() goes to true.
397 ConditionVariable can_shutdown_cv_; 399 ConditionVariable can_shutdown_cv_;
398 400
399 // The maximum number of worker threads we'll create. 401 // The maximum number of worker threads we'll create.
400 const size_t max_threads_; 402 const size_t max_threads_;
401 403
402 const std::string thread_name_prefix_; 404 const std::string thread_name_prefix_;
403 405
404 // Associates all known sequence token names with their IDs. 406 // Associates all known sequence token names with their IDs.
405 std::map<std::string, int> named_sequence_tokens_; 407 std::map<std::string, int> named_sequence_tokens_;
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
443 int trace_id_; 445 int trace_id_;
444 446
445 // Set when Shutdown is called and no further tasks should be 447 // Set when Shutdown is called and no further tasks should be
446 // allowed, though we may still be running existing tasks. 448 // allowed, though we may still be running existing tasks.
447 bool shutdown_called_; 449 bool shutdown_called_;
448 450
449 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 451 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
450 // has been called. 452 // has been called.
451 int max_blocking_tasks_after_shutdown_; 453 int max_blocking_tasks_after_shutdown_;
452 454
455 // State used to cleanup for testing, all guarded by lock_.
456 CleanupState cleanup_state_;
457 size_t cleanup_idlers_;
458 ConditionVariable cleanup_cv_;
459
453 TestingObserver* const testing_observer_; 460 TestingObserver* const testing_observer_;
454 461
455 DISALLOW_COPY_AND_ASSIGN(Inner); 462 DISALLOW_COPY_AND_ASSIGN(Inner);
456 }; 463 };
457 464
458 // Worker definitions --------------------------------------------------------- 465 // Worker definitions ---------------------------------------------------------
459 466
460 SequencedWorkerPool::Worker::Worker( 467 SequencedWorkerPool::Worker::Worker(
461 const scoped_refptr<SequencedWorkerPool>& worker_pool, 468 const scoped_refptr<SequencedWorkerPool>& worker_pool,
462 int thread_number, 469 int thread_number,
(...skipping 23 matching lines...) Expand all
486 493
487 SequencedWorkerPool::Inner::Inner( 494 SequencedWorkerPool::Inner::Inner(
488 SequencedWorkerPool* worker_pool, 495 SequencedWorkerPool* worker_pool,
489 size_t max_threads, 496 size_t max_threads,
490 const std::string& thread_name_prefix, 497 const std::string& thread_name_prefix,
491 TestingObserver* observer) 498 TestingObserver* observer)
492 : worker_pool_(worker_pool), 499 : worker_pool_(worker_pool),
493 last_sequence_number_(0), 500 last_sequence_number_(0),
494 lock_(), 501 lock_(),
495 has_work_cv_(&lock_), 502 has_work_cv_(&lock_),
496 is_idle_cv_(&lock_),
497 can_shutdown_cv_(&lock_), 503 can_shutdown_cv_(&lock_),
498 max_threads_(max_threads), 504 max_threads_(max_threads),
499 thread_name_prefix_(thread_name_prefix), 505 thread_name_prefix_(thread_name_prefix),
500 thread_being_created_(false), 506 thread_being_created_(false),
501 waiting_thread_count_(0), 507 waiting_thread_count_(0),
502 blocking_shutdown_thread_count_(0), 508 blocking_shutdown_thread_count_(0),
503 next_sequence_task_number_(0), 509 next_sequence_task_number_(0),
504 blocking_shutdown_pending_task_count_(0), 510 blocking_shutdown_pending_task_count_(0),
505 trace_id_(0), 511 trace_id_(0),
506 shutdown_called_(false), 512 shutdown_called_(false),
507 max_blocking_tasks_after_shutdown_(0), 513 max_blocking_tasks_after_shutdown_(0),
514 cleanup_state_(CLEANUP_DONE),
515 cleanup_idlers_(0),
516 cleanup_cv_(&lock_),
508 testing_observer_(observer) {} 517 testing_observer_(observer) {}
509 518
510 SequencedWorkerPool::Inner::~Inner() { 519 SequencedWorkerPool::Inner::~Inner() {
511 // You must call Shutdown() before destroying the pool. 520 // You must call Shutdown() before destroying the pool.
512 DCHECK(shutdown_called_); 521 DCHECK(shutdown_called_);
513 522
514 // Need to explicitly join with the threads before they're destroyed or else 523 // Need to explicitly join with the threads before they're destroyed or else
515 // they will be running when our object is half torn down. 524 // they will be running when our object is half torn down.
516 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 525 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
517 it->second->Join(); 526 it->second->Join();
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
602 611
603 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
604 SequenceToken sequence_token) const { 613 SequenceToken sequence_token) const {
605 AutoLock lock(lock_); 614 AutoLock lock(lock_);
606 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
607 if (found == threads_.end()) 616 if (found == threads_.end())
608 return false; 617 return false;
609 return found->second->running_sequence().Equals(sequence_token); 618 return found->second->running_sequence().Equals(sequence_token);
610 } 619 }
611 620
612 void SequencedWorkerPool::Inner::FlushForTesting() { 621 // See https://code.google.com/p/chromium/issues/detail?id=168415
622 void SequencedWorkerPool::Inner::CleanupForTesting() {
623 DCHECK(!RunsTasksOnCurrentThread());
624 base::ThreadRestrictions::ScopedAllowWait allow_wait;
613 AutoLock lock(lock_); 625 AutoLock lock(lock_);
614 while (!IsIdle()) 626 if (shutdown_called_)
615 is_idle_cv_.Wait(); 627 return;
628 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
629 if (!thread_being_created_ && threads_.empty())
630 return;
631 cleanup_state_ = CLEANUP_REQUESTED;
632 cleanup_idlers_ = 0;
633 has_work_cv_.Signal();
634 while (cleanup_state_ != CLEANUP_DONE)
635 cleanup_cv_.Wait();
616 } 636 }
617 637
618 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 638 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
619 SignalHasWork(); 639 SignalHasWork();
620 } 640 }
621 641
622 void SequencedWorkerPool::Inner::Shutdown( 642 void SequencedWorkerPool::Inner::Shutdown(
623 int max_new_blocking_tasks_after_shutdown) { 643 int max_new_blocking_tasks_after_shutdown) {
624 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 644 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
625 { 645 {
626 AutoLock lock(lock_); 646 AutoLock lock(lock_);
627
628 if (shutdown_called_) 647 if (shutdown_called_)
629 return; 648 return;
649 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
630 shutdown_called_ = true; 650 shutdown_called_ = true;
631 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 651 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
632 652
633 // Tickle the threads. This will wake up a waiting one so it will know that 653 // Tickle the threads. This will wake up a waiting one so it will know that
634 // it can exit, which in turn will wake up any other waiting ones. 654 // it can exit, which in turn will wake up any other waiting ones.
635 SignalHasWork(); 655 SignalHasWork();
636 656
637 // There are no pending or running tasks blocking shutdown, we're done. 657 // There are no pending or running tasks blocking shutdown, we're done.
638 if (CanShutdown()) 658 if (CanShutdown())
639 return; 659 return;
(...skipping 25 matching lines...) Expand all
665 std::pair<ThreadMap::iterator, bool> result = 685 std::pair<ThreadMap::iterator, bool> result =
666 threads_.insert( 686 threads_.insert(
667 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 687 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
668 DCHECK(result.second); 688 DCHECK(result.second);
669 689
670 while (true) { 690 while (true) {
671 #if defined(OS_MACOSX) 691 #if defined(OS_MACOSX)
672 base::mac::ScopedNSAutoreleasePool autorelease_pool; 692 base::mac::ScopedNSAutoreleasePool autorelease_pool;
673 #endif 693 #endif
674 694
695 HandleCleanup();
696
675 // See GetWork for what delete_these_outside_lock is doing. 697 // See GetWork for what delete_these_outside_lock is doing.
676 SequencedTask task; 698 SequencedTask task;
677 TimeDelta wait_time; 699 TimeDelta wait_time;
678 std::vector<Closure> delete_these_outside_lock; 700 std::vector<Closure> delete_these_outside_lock;
679 GetWorkStatus status = 701 GetWorkStatus status =
680 GetWork(&task, &wait_time, &delete_these_outside_lock); 702 GetWork(&task, &wait_time, &delete_these_outside_lock);
681 if (status == GET_WORK_FOUND) { 703 if (status == GET_WORK_FOUND) {
682 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", 704 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask",
683 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); 705 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
684 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", 706 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop",
(...skipping 25 matching lines...) Expand all
710 start_time, tracked_objects::ThreadData::NowForEndOfRun()); 732 start_time, tracked_objects::ThreadData::NowForEndOfRun());
711 733
712 this_worker->set_running_task_info( 734 this_worker->set_running_task_info(
713 SequenceToken(), CONTINUE_ON_SHUTDOWN); 735 SequenceToken(), CONTINUE_ON_SHUTDOWN);
714 736
715 // Make sure our task is erased outside the lock for the same reason 737 // Make sure our task is erased outside the lock for the same reason
716 // we do this with delete_these_oustide_lock. 738 // we do this with delete_these_oustide_lock.
717 task.task = Closure(); 739 task.task = Closure();
718 } 740 }
719 DidRunWorkerTask(task); // Must be done inside the lock. 741 DidRunWorkerTask(task); // Must be done inside the lock.
742 } else if (cleanup_state_ == CLEANUP_RUNNING) {
743 switch (status) {
744 case GET_WORK_WAIT: {
745 AutoUnlock unlock(lock_);
746 delete_these_outside_lock.clear();
747 }
748 break;
749 case GET_WORK_NOT_FOUND:
750 CHECK(delete_these_outside_lock.empty());
751 cleanup_state_ = CLEANUP_FINISHING;
752 cleanup_cv_.Broadcast();
753 break;
754 default:
755 NOTREACHED();
756 }
720 } else { 757 } else {
721 // When we're terminating and there's no more work, we can 758 // When we're terminating and there's no more work, we can
722 // shut down, other workers can complete any pending or new tasks. 759 // shut down, other workers can complete any pending or new tasks.
723 // We can get additional tasks posted after shutdown_called_ is set 760 // We can get additional tasks posted after shutdown_called_ is set
724 // but only worker threads are allowed to post tasks at that time, and 761 // but only worker threads are allowed to post tasks at that time, and
725 // the workers responsible for posting those tasks will be available 762 // the workers responsible for posting those tasks will be available
726 // to run them. Also, there may be some tasks stuck behind running 763 // to run them. Also, there may be some tasks stuck behind running
727 // ones with the same sequence token, but additional threads won't 764 // ones with the same sequence token, but additional threads won't
728 // help this case. 765 // help this case.
729 if (shutdown_called_ && 766 if (shutdown_called_ &&
730 blocking_shutdown_pending_task_count_ == 0) 767 blocking_shutdown_pending_task_count_ == 0)
731 break; 768 break;
732 waiting_thread_count_++; 769 waiting_thread_count_++;
733 // This is the only time that IsIdle() can go to true.
734 if (IsIdle())
735 is_idle_cv_.Signal();
736 770
737 switch (status) { 771 switch (status) {
738 case GET_WORK_NOT_FOUND: 772 case GET_WORK_NOT_FOUND:
739 has_work_cv_.Wait(); 773 has_work_cv_.Wait();
740 break; 774 break;
741 case GET_WORK_WAIT: 775 case GET_WORK_WAIT:
742 has_work_cv_.TimedWait(wait_time); 776 has_work_cv_.TimedWait(wait_time);
743 break; 777 break;
744 default: 778 default:
745 NOTREACHED(); 779 NOTREACHED();
746 } 780 }
747 waiting_thread_count_--; 781 waiting_thread_count_--;
748 } 782 }
749 } 783 }
750 } // Release lock_. 784 } // Release lock_.
751 785
752 // We noticed we should exit. Wake up the next worker so it knows it should 786 // We noticed we should exit. Wake up the next worker so it knows it should
753 // exit as well (because the Shutdown() code only signals once). 787 // exit as well (because the Shutdown() code only signals once).
754 SignalHasWork(); 788 SignalHasWork();
755 789
756 // Possibly unblock shutdown. 790 // Possibly unblock shutdown.
757 can_shutdown_cv_.Signal(); 791 can_shutdown_cv_.Signal();
758 } 792 }
759 793
760 bool SequencedWorkerPool::Inner::IsIdle() const { 794 void SequencedWorkerPool::Inner::HandleCleanup() {
michaeln 2013/01/23 00:16:08 The synchronization logic is mostly in here. This
761 lock_.AssertAcquired(); 795 lock_.AssertAcquired();
762 return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); 796 if (cleanup_state_ == CLEANUP_DONE)
797 return;
798 if (cleanup_state_ == CLEANUP_REQUESTED) {
799 // We win, we get to do the cleanup as soon as the others wise up and idle.
800 cleanup_state_ = CLEANUP_STARTING;
801 while (thread_being_created_ ||
802 cleanup_idlers_ != threads_.size() - 1) {
803 has_work_cv_.Signal();
804 cleanup_cv_.Wait();
805 }
806 cleanup_state_ = CLEANUP_RUNNING;
807 return;
808 }
809 if (cleanup_state_ == CLEANUP_STARTING) {
810 // Another worker thread is cleaning up, we idle here until thats done.
811 ++cleanup_idlers_;
812 cleanup_cv_.Broadcast();
813 while (cleanup_state_ != CLEANUP_FINISHING) {
814 cleanup_cv_.Wait();
815 }
816 --cleanup_idlers_;
817 cleanup_cv_.Broadcast();
818 return;
819 }
820 if (cleanup_state_ == CLEANUP_FINISHING) {
821 // We wait for all idlers to wake up prior to being DONE.
822 while (cleanup_idlers_ != 0) {
823 cleanup_cv_.Broadcast();
824 cleanup_cv_.Wait();
825 }
826 if (cleanup_state_ == CLEANUP_FINISHING) {
827 cleanup_state_ = CLEANUP_DONE;
828 cleanup_cv_.Signal();
829 }
830 return;
831 }
763 } 832 }
764 833
765 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 834 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
766 const std::string& name) { 835 const std::string& name) {
767 lock_.AssertAcquired(); 836 lock_.AssertAcquired();
768 DCHECK(!name.empty()); 837 DCHECK(!name.empty());
769 838
770 std::map<std::string, int>::const_iterator found = 839 std::map<std::string, int>::const_iterator found =
771 named_sequence_tokens_.find(name); 840 named_sequence_tokens_.find(name);
772 if (found != named_sequence_tokens_.end()) 841 if (found != named_sequence_tokens_.end())
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
835 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 904 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
836 unrunnable_tasks++; 905 unrunnable_tasks++;
837 ++i; 906 ++i;
838 continue; 907 continue;
839 } 908 }
840 909
841 if (i->time_to_run > current_time) { 910 if (i->time_to_run > current_time) {
842 // The time to run has not come yet. 911 // The time to run has not come yet.
843 *wait_time = i->time_to_run - current_time; 912 *wait_time = i->time_to_run - current_time;
844 status = GET_WORK_WAIT; 913 status = GET_WORK_WAIT;
914 if (cleanup_state_ == CLEANUP_RUNNING) {
915 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
916 delete_these_outside_lock->push_back(i->task);
917 pending_tasks_.erase(i);
918 }
845 break; 919 break;
846 } 920 }
847 921
848 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 922 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
849 // We're shutting down and the task we just found isn't blocking 923 // We're shutting down and the task we just found isn't blocking
850 // shutdown. Delete it and get more work. 924 // shutdown. Delete it and get more work.
851 // 925 //
852 // Note that we do not want to delete unrunnable tasks. Deleting a task 926 // Note that we do not want to delete unrunnable tasks. Deleting a task
853 // can have side effects (like freeing some objects) and deleting a 927 // can have side effects (like freeing some objects) and deleting a
854 // task that's supposed to run after one that's currently running could 928 // task that's supposed to run after one that's currently running could
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
961 // cause a worker to start since one is pending. 1035 // cause a worker to start since one is pending.
962 // 3. Main thread initiates shutdown. 1036 // 3. Main thread initiates shutdown.
963 // 4. No more threads are created since the shutdown_called_ flag is set. 1037 // 4. No more threads are created since the shutdown_called_ flag is set.
964 // 1038 //
965 // The result is that one may expect that max_threads_ workers to be created 1039 // The result is that one may expect that max_threads_ workers to be created
966 // given the workload, but in reality fewer may be created because the 1040 // given the workload, but in reality fewer may be created because the
967 // sequence of thread creation on the background threads is racing with the 1041 // sequence of thread creation on the background threads is racing with the
968 // shutdown call. 1042 // shutdown call.
969 if (!shutdown_called_ && 1043 if (!shutdown_called_ &&
970 !thread_being_created_ && 1044 !thread_being_created_ &&
1045 cleanup_state_ == CLEANUP_DONE &&
971 threads_.size() < max_threads_ && 1046 threads_.size() < max_threads_ &&
972 waiting_thread_count_ == 0) { 1047 waiting_thread_count_ == 0) {
973 // We could use an additional thread if there's work to be done. 1048 // We could use an additional thread if there's work to be done.
974 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1049 for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
975 i != pending_tasks_.end(); ++i) { 1050 i != pending_tasks_.end(); ++i) {
976 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1051 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
977 // Found a runnable task, mark the thread as being started. 1052 // Found a runnable task, mark the thread as being started.
978 thread_being_created_ = true; 1053 thread_being_created_ = true;
979 return static_cast<int>(threads_.size() + 1); 1054 return static_cast<int>(threads_.size() + 1);
980 } 1055 }
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after
1139 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1214 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1140 return inner_->RunsTasksOnCurrentThread(); 1215 return inner_->RunsTasksOnCurrentThread();
1141 } 1216 }
1142 1217
1143 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1218 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1144 SequenceToken sequence_token) const { 1219 SequenceToken sequence_token) const {
1145 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1220 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1146 } 1221 }
1147 1222
1148 void SequencedWorkerPool::FlushForTesting() { 1223 void SequencedWorkerPool::FlushForTesting() {
1149 inner_->FlushForTesting(); 1224 inner_->CleanupForTesting();
1150 } 1225 }
1151 1226
1152 void SequencedWorkerPool::SignalHasWorkForTesting() { 1227 void SequencedWorkerPool::SignalHasWorkForTesting() {
1153 inner_->SignalHasWorkForTesting(); 1228 inner_->SignalHasWorkForTesting();
1154 } 1229 }
1155 1230
1156 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1231 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1157 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1232 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1158 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1233 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1159 } 1234 }
1160 1235
1161 } // namespace base 1236 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698