OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/compiler_specific.h" | 10 #include "base/compiler_specific.h" |
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 { | 90 { |
91 base::AutoLock lock(lock_); | 91 base::AutoLock lock(lock_); |
92 started_events_++; | 92 started_events_++; |
93 } | 93 } |
94 cond_var_.Signal(); | 94 cond_var_.Signal(); |
95 | 95 |
96 blocker->Block(); | 96 blocker->Block(); |
97 SignalWorkerDone(id); | 97 SignalWorkerDone(id); |
98 } | 98 } |
99 | 99 |
100 void PostAdditionalTasks(int id, SequencedWorkerPool* pool) { | 100 void PostAdditionalTasks( |
| 101 int id, SequencedWorkerPool* pool, |
| 102 bool expected_return_value) { |
101 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); | 103 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); |
102 EXPECT_FALSE( | 104 EXPECT_EQ(expected_return_value, |
103 pool->PostWorkerTaskWithShutdownBehavior( | 105 pool->PostWorkerTaskWithShutdownBehavior( |
104 FROM_HERE, fast_task, | 106 FROM_HERE, fast_task, |
105 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); | 107 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
106 EXPECT_FALSE( | 108 EXPECT_EQ(expected_return_value, |
107 pool->PostWorkerTaskWithShutdownBehavior( | 109 pool->PostWorkerTaskWithShutdownBehavior( |
108 FROM_HERE, fast_task, | 110 FROM_HERE, fast_task, |
109 SequencedWorkerPool::SKIP_ON_SHUTDOWN)); | 111 SequencedWorkerPool::SKIP_ON_SHUTDOWN)); |
110 pool->PostWorkerTaskWithShutdownBehavior( | 112 pool->PostWorkerTaskWithShutdownBehavior( |
111 FROM_HERE, fast_task, | 113 FROM_HERE, fast_task, |
112 SequencedWorkerPool::BLOCK_SHUTDOWN); | 114 SequencedWorkerPool::BLOCK_SHUTDOWN); |
113 SignalWorkerDone(id); | 115 SignalWorkerDone(id); |
114 } | 116 } |
115 | 117 |
116 // Waits until the given number of tasks have started executing. | 118 // Waits until the given number of tasks have started executing. |
117 void WaitUntilTasksBlocked(size_t count) { | 119 void WaitUntilTasksBlocked(size_t count) { |
118 { | 120 { |
119 base::AutoLock lock(lock_); | 121 base::AutoLock lock(lock_); |
(...skipping 10 matching lines...) Expand all Loading... |
130 { | 132 { |
131 base::AutoLock lock(lock_); | 133 base::AutoLock lock(lock_); |
132 while (complete_sequence_.size() < num_tasks) | 134 while (complete_sequence_.size() < num_tasks) |
133 cond_var_.Wait(); | 135 cond_var_.Wait(); |
134 ret = complete_sequence_; | 136 ret = complete_sequence_; |
135 } | 137 } |
136 cond_var_.Signal(); | 138 cond_var_.Signal(); |
137 return ret; | 139 return ret; |
138 } | 140 } |
139 | 141 |
| 142 size_t GetTasksCompletedCount() { |
| 143 base::AutoLock lock(lock_); |
| 144 return complete_sequence_.size(); |
| 145 } |
| 146 |
140 void ClearCompleteSequence() { | 147 void ClearCompleteSequence() { |
141 base::AutoLock lock(lock_); | 148 base::AutoLock lock(lock_); |
142 complete_sequence_.clear(); | 149 complete_sequence_.clear(); |
143 started_events_ = 0; | 150 started_events_ = 0; |
144 } | 151 } |
145 | 152 |
146 private: | 153 private: |
147 friend class base::RefCountedThreadSafe<TestTracker>; | 154 friend class base::RefCountedThreadSafe<TestTracker>; |
148 ~TestTracker() {} | 155 ~TestTracker() {} |
149 | 156 |
(...skipping 297 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
447 } | 454 } |
448 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); | 455 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
449 | 456 |
450 // Queue up shutdown blocking tasks behind those which will attempt to post | 457 // Queue up shutdown blocking tasks behind those which will attempt to post |
451 // additional tasks when run, PostAdditionalTasks attemtps to post 3 | 458 // additional tasks when run, PostAdditionalTasks attemtps to post 3 |
452 // new FastTasks, one for each shutdown_behavior. | 459 // new FastTasks, one for each shutdown_behavior. |
453 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); | 460 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); |
454 for (int i = 0; i < kNumQueuedTasks; ++i) { | 461 for (int i = 0; i < kNumQueuedTasks; ++i) { |
455 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( | 462 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( |
456 FROM_HERE, | 463 FROM_HERE, |
457 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool()), | 464 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(), |
| 465 false), |
458 SequencedWorkerPool::BLOCK_SHUTDOWN)); | 466 SequencedWorkerPool::BLOCK_SHUTDOWN)); |
459 } | 467 } |
460 | 468 |
461 // Setup to open the floodgates from within Shutdown(). | 469 // Setup to open the floodgates from within Shutdown(). |
462 SetWillWaitForShutdownCallback( | 470 SetWillWaitForShutdownCallback( |
463 base::Bind(&EnsureTasksToCompleteCountAndUnblock, | 471 base::Bind(&EnsureTasksToCompleteCountAndUnblock, |
464 scoped_refptr<TestTracker>(tracker()), | 472 scoped_refptr<TestTracker>(tracker()), |
465 0, &blocker, kNumBlockTasks)); | 473 0, &blocker, kNumBlockTasks)); |
466 | 474 |
467 // Allow half of the additional blocking tasks thru. | 475 // Allow half of the additional blocking tasks thru. |
(...skipping 211 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
679 base::Bind(&IsRunningOnCurrentThreadTask, | 687 base::Bind(&IsRunningOnCurrentThreadTask, |
680 token2, unsequenced_token, pool(), unused_pool)); | 688 token2, unsequenced_token, pool(), unused_pool)); |
681 pool()->PostWorkerTask( | 689 pool()->PostWorkerTask( |
682 FROM_HERE, | 690 FROM_HERE, |
683 base::Bind(&IsRunningOnCurrentThreadTask, | 691 base::Bind(&IsRunningOnCurrentThreadTask, |
684 unsequenced_token, token1, pool(), unused_pool)); | 692 unsequenced_token, token1, pool(), unused_pool)); |
685 pool()->Shutdown(); | 693 pool()->Shutdown(); |
686 unused_pool->Shutdown(); | 694 unused_pool->Shutdown(); |
687 } | 695 } |
688 | 696 |
| 697 // Verify that FlushForTesting works as intended. |
| 698 TEST_F(SequencedWorkerPoolTest, FlushForTesting) { |
| 699 // Should be fine to call on a new instance. |
| 700 pool()->FlushForTesting(); |
| 701 |
| 702 // Queue up a bunch of work, including a long delayed task and |
| 703 // a task that produces additional tasks as an artifact. |
| 704 pool()->PostDelayedWorkerTask( |
| 705 FROM_HERE, |
| 706 base::Bind(&TestTracker::FastTask, tracker(), 0), |
| 707 TimeDelta::FromMinutes(5)); |
| 708 pool()->PostWorkerTask(FROM_HERE, |
| 709 base::Bind(&TestTracker::SlowTask, tracker(), 0)); |
| 710 const size_t kNumFastTasks = 20; |
| 711 for (size_t i = 0; i < kNumFastTasks; i++) { |
| 712 pool()->PostWorkerTask(FROM_HERE, |
| 713 base::Bind(&TestTracker::FastTask, tracker(), 0)); |
| 714 } |
| 715 pool()->PostWorkerTask( |
| 716 FROM_HERE, |
| 717 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(), |
| 718 true)); |
| 719 |
| 720 // We expect all except the delayed task to have been run. We verify all |
| 721 // closures have been deleted deleted by looking at the refcount of the |
| 722 // tracker. |
| 723 EXPECT_FALSE(tracker()->HasOneRef()); |
| 724 pool()->FlushForTesting(); |
| 725 EXPECT_TRUE(tracker()->HasOneRef()); |
| 726 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount()); |
| 727 |
| 728 // Should be fine to call on an idle instance with all threads created, and |
| 729 // spamming the method shouldn't deadlock or confuse the class. |
| 730 pool()->FlushForTesting(); |
| 731 pool()->FlushForTesting(); |
| 732 |
| 733 // Should be fine to call after shutdown too. |
| 734 pool()->Shutdown(); |
| 735 pool()->FlushForTesting(); |
| 736 } |
| 737 |
689 class SequencedWorkerPoolTaskRunnerTestDelegate { | 738 class SequencedWorkerPoolTaskRunnerTestDelegate { |
690 public: | 739 public: |
691 SequencedWorkerPoolTaskRunnerTestDelegate() {} | 740 SequencedWorkerPoolTaskRunnerTestDelegate() {} |
692 | 741 |
693 ~SequencedWorkerPoolTaskRunnerTestDelegate() {} | 742 ~SequencedWorkerPoolTaskRunnerTestDelegate() {} |
694 | 743 |
695 void StartTaskRunner() { | 744 void StartTaskRunner() { |
696 pool_owner_.reset( | 745 pool_owner_.reset( |
697 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); | 746 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); |
698 } | 747 } |
699 | 748 |
700 scoped_refptr<SequencedWorkerPool> GetTaskRunner() { | 749 scoped_refptr<SequencedWorkerPool> GetTaskRunner() { |
701 return pool_owner_->pool(); | 750 return pool_owner_->pool(); |
702 } | 751 } |
703 | 752 |
704 void StopTaskRunner() { | 753 void StopTaskRunner() { |
705 // Make sure all tasks (including delayed ones) are run before shutting | 754 // Make sure all tasks are run before shutting down. Delayed tasks are |
706 // down. | 755 // not run, they're simply deleted. |
707 pool_owner_->pool()->FlushForTesting(); | 756 pool_owner_->pool()->FlushForTesting(); |
708 pool_owner_->pool()->Shutdown(); | 757 pool_owner_->pool()->Shutdown(); |
709 // Don't reset |pool_owner_| here, as the test may still hold a | 758 // Don't reset |pool_owner_| here, as the test may still hold a |
710 // reference to the pool. | 759 // reference to the pool. |
711 } | 760 } |
712 | 761 |
713 bool TaskRunnerHandlesNonZeroDelays() const { | 762 bool TaskRunnerHandlesNonZeroDelays() const { |
714 return true; | 763 return true; |
715 } | 764 } |
716 | 765 |
(...skipping 18 matching lines...) Expand all Loading... |
735 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); | 784 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); |
736 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( | 785 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( |
737 SequencedWorkerPool::BLOCK_SHUTDOWN); | 786 SequencedWorkerPool::BLOCK_SHUTDOWN); |
738 } | 787 } |
739 | 788 |
740 scoped_refptr<TaskRunner> GetTaskRunner() { | 789 scoped_refptr<TaskRunner> GetTaskRunner() { |
741 return task_runner_; | 790 return task_runner_; |
742 } | 791 } |
743 | 792 |
744 void StopTaskRunner() { | 793 void StopTaskRunner() { |
745 // Make sure all tasks (including delayed ones) are run before shutting | 794 // Make sure all tasks are run before shutting down. Delayed tasks are |
746 // down. | 795 // not run, they're simply deleted. |
747 pool_owner_->pool()->FlushForTesting(); | 796 pool_owner_->pool()->FlushForTesting(); |
748 pool_owner_->pool()->Shutdown(); | 797 pool_owner_->pool()->Shutdown(); |
749 // Don't reset |pool_owner_| here, as the test may still hold a | 798 // Don't reset |pool_owner_| here, as the test may still hold a |
750 // reference to the pool. | 799 // reference to the pool. |
751 } | 800 } |
752 | 801 |
753 bool TaskRunnerHandlesNonZeroDelays() const { | 802 bool TaskRunnerHandlesNonZeroDelays() const { |
754 return true; | 803 return true; |
755 } | 804 } |
756 | 805 |
(...skipping 19 matching lines...) Expand all Loading... |
776 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); | 825 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); |
777 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( | 826 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( |
778 pool_owner_->pool()->GetSequenceToken()); | 827 pool_owner_->pool()->GetSequenceToken()); |
779 } | 828 } |
780 | 829 |
781 scoped_refptr<SequencedTaskRunner> GetTaskRunner() { | 830 scoped_refptr<SequencedTaskRunner> GetTaskRunner() { |
782 return task_runner_; | 831 return task_runner_; |
783 } | 832 } |
784 | 833 |
785 void StopTaskRunner() { | 834 void StopTaskRunner() { |
786 // Make sure all tasks (including delayed ones) are run before shutting | 835 // Make sure all tasks are run before shutting down. Delayed tasks are |
787 // down. | 836 // not run, they're simply deleted. |
788 pool_owner_->pool()->FlushForTesting(); | 837 pool_owner_->pool()->FlushForTesting(); |
789 pool_owner_->pool()->Shutdown(); | 838 pool_owner_->pool()->Shutdown(); |
790 // Don't reset |pool_owner_| here, as the test may still hold a | 839 // Don't reset |pool_owner_| here, as the test may still hold a |
791 // reference to the pool. | 840 // reference to the pool. |
792 } | 841 } |
793 | 842 |
794 bool TaskRunnerHandlesNonZeroDelays() const { | 843 bool TaskRunnerHandlesNonZeroDelays() const { |
795 return true; | 844 return true; |
796 } | 845 } |
797 | 846 |
798 private: | 847 private: |
799 MessageLoop message_loop_; | 848 MessageLoop message_loop_; |
800 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; | 849 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; |
801 scoped_refptr<SequencedTaskRunner> task_runner_; | 850 scoped_refptr<SequencedTaskRunner> task_runner_; |
802 }; | 851 }; |
803 | 852 |
804 INSTANTIATE_TYPED_TEST_CASE_P( | 853 INSTANTIATE_TYPED_TEST_CASE_P( |
805 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, | 854 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, |
806 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); | 855 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
807 | 856 |
808 INSTANTIATE_TYPED_TEST_CASE_P( | 857 INSTANTIATE_TYPED_TEST_CASE_P( |
809 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, | 858 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, |
810 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); | 859 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
811 | 860 |
812 } // namespace | 861 } // namespace |
813 | 862 |
814 } // namespace base | 863 } // namespace base |
OLD | NEW |