| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/compiler_specific.h" | 10 #include "base/compiler_specific.h" |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 92 { | 92 { |
| 93 base::AutoLock lock(lock_); | 93 base::AutoLock lock(lock_); |
| 94 started_events_++; | 94 started_events_++; |
| 95 } | 95 } |
| 96 cond_var_.Signal(); | 96 cond_var_.Signal(); |
| 97 | 97 |
| 98 blocker->Block(); | 98 blocker->Block(); |
| 99 SignalWorkerDone(id); | 99 SignalWorkerDone(id); |
| 100 } | 100 } |
| 101 | 101 |
| 102 void PostAdditionalTasks(int id, SequencedWorkerPool* pool) { | 102 void PostAdditionalTasks( |
| 103 int id, SequencedWorkerPool* pool, |
| 104 bool expected_return_value) { |
| 103 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); | 105 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); |
| 104 EXPECT_FALSE( | 106 EXPECT_EQ(expected_return_value, |
| 105 pool->PostWorkerTaskWithShutdownBehavior( | 107 pool->PostWorkerTaskWithShutdownBehavior( |
| 106 FROM_HERE, fast_task, | 108 FROM_HERE, fast_task, |
| 107 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); | 109 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| 108 EXPECT_FALSE( | 110 EXPECT_EQ(expected_return_value, |
| 109 pool->PostWorkerTaskWithShutdownBehavior( | 111 pool->PostWorkerTaskWithShutdownBehavior( |
| 110 FROM_HERE, fast_task, | 112 FROM_HERE, fast_task, |
| 111 SequencedWorkerPool::SKIP_ON_SHUTDOWN)); | 113 SequencedWorkerPool::SKIP_ON_SHUTDOWN)); |
| 112 pool->PostWorkerTaskWithShutdownBehavior( | 114 pool->PostWorkerTaskWithShutdownBehavior( |
| 113 FROM_HERE, fast_task, | 115 FROM_HERE, fast_task, |
| 114 SequencedWorkerPool::BLOCK_SHUTDOWN); | 116 SequencedWorkerPool::BLOCK_SHUTDOWN); |
| 115 SignalWorkerDone(id); | 117 SignalWorkerDone(id); |
| 116 } | 118 } |
| 117 | 119 |
| 118 // Waits until the given number of tasks have started executing. | 120 // Waits until the given number of tasks have started executing. |
| 119 void WaitUntilTasksBlocked(size_t count) { | 121 void WaitUntilTasksBlocked(size_t count) { |
| 120 { | 122 { |
| 121 base::AutoLock lock(lock_); | 123 base::AutoLock lock(lock_); |
| (...skipping 10 matching lines...) Expand all Loading... |
| 132 { | 134 { |
| 133 base::AutoLock lock(lock_); | 135 base::AutoLock lock(lock_); |
| 134 while (complete_sequence_.size() < num_tasks) | 136 while (complete_sequence_.size() < num_tasks) |
| 135 cond_var_.Wait(); | 137 cond_var_.Wait(); |
| 136 ret = complete_sequence_; | 138 ret = complete_sequence_; |
| 137 } | 139 } |
| 138 cond_var_.Signal(); | 140 cond_var_.Signal(); |
| 139 return ret; | 141 return ret; |
| 140 } | 142 } |
| 141 | 143 |
| 144 size_t GetTasksCompletedCount() { |
| 145 base::AutoLock lock(lock_); |
| 146 return complete_sequence_.size(); |
| 147 } |
| 148 |
| 142 void ClearCompleteSequence() { | 149 void ClearCompleteSequence() { |
| 143 base::AutoLock lock(lock_); | 150 base::AutoLock lock(lock_); |
| 144 complete_sequence_.clear(); | 151 complete_sequence_.clear(); |
| 145 started_events_ = 0; | 152 started_events_ = 0; |
| 146 } | 153 } |
| 147 | 154 |
| 148 private: | 155 private: |
| 149 friend class base::RefCountedThreadSafe<TestTracker>; | 156 friend class base::RefCountedThreadSafe<TestTracker>; |
| 150 ~TestTracker() {} | 157 ~TestTracker() {} |
| 151 | 158 |
| (...skipping 358 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 510 } | 517 } |
| 511 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); | 518 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| 512 | 519 |
| 513 // Queue up shutdown blocking tasks behind those which will attempt to post | 520 // Queue up shutdown blocking tasks behind those which will attempt to post |
| 514 // additional tasks when run, PostAdditionalTasks attemtps to post 3 | 521 // additional tasks when run, PostAdditionalTasks attemtps to post 3 |
| 515 // new FastTasks, one for each shutdown_behavior. | 522 // new FastTasks, one for each shutdown_behavior. |
| 516 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); | 523 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); |
| 517 for (int i = 0; i < kNumQueuedTasks; ++i) { | 524 for (int i = 0; i < kNumQueuedTasks; ++i) { |
| 518 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( | 525 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( |
| 519 FROM_HERE, | 526 FROM_HERE, |
| 520 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool()), | 527 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(), |
| 528 false), |
| 521 SequencedWorkerPool::BLOCK_SHUTDOWN)); | 529 SequencedWorkerPool::BLOCK_SHUTDOWN)); |
| 522 } | 530 } |
| 523 | 531 |
| 524 // Setup to open the floodgates from within Shutdown(). | 532 // Setup to open the floodgates from within Shutdown(). |
| 525 SetWillWaitForShutdownCallback( | 533 SetWillWaitForShutdownCallback( |
| 526 base::Bind(&EnsureTasksToCompleteCountAndUnblock, | 534 base::Bind(&EnsureTasksToCompleteCountAndUnblock, |
| 527 scoped_refptr<TestTracker>(tracker()), | 535 scoped_refptr<TestTracker>(tracker()), |
| 528 0, &blocker, kNumBlockTasks)); | 536 0, &blocker, kNumBlockTasks)); |
| 529 | 537 |
| 530 // Allow half of the additional blocking tasks thru. | 538 // Allow half of the additional blocking tasks thru. |
| (...skipping 211 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 742 base::Bind(&IsRunningOnCurrentThreadTask, | 750 base::Bind(&IsRunningOnCurrentThreadTask, |
| 743 token2, unsequenced_token, pool(), unused_pool)); | 751 token2, unsequenced_token, pool(), unused_pool)); |
| 744 pool()->PostWorkerTask( | 752 pool()->PostWorkerTask( |
| 745 FROM_HERE, | 753 FROM_HERE, |
| 746 base::Bind(&IsRunningOnCurrentThreadTask, | 754 base::Bind(&IsRunningOnCurrentThreadTask, |
| 747 unsequenced_token, token1, pool(), unused_pool)); | 755 unsequenced_token, token1, pool(), unused_pool)); |
| 748 pool()->Shutdown(); | 756 pool()->Shutdown(); |
| 749 unused_pool->Shutdown(); | 757 unused_pool->Shutdown(); |
| 750 } | 758 } |
| 751 | 759 |
| 760 // Verify that FlushForTesting works as intended. |
| 761 TEST_F(SequencedWorkerPoolTest, FlushForTesting) { |
| 762 // Should be fine to call on a new instance. |
| 763 pool()->FlushForTesting(); |
| 764 |
| 765 // Queue up a bunch of work, including a long delayed task and |
| 766 // a task that produces additional tasks as an artifact. |
| 767 pool()->PostDelayedWorkerTask( |
| 768 FROM_HERE, |
| 769 base::Bind(&TestTracker::FastTask, tracker(), 0), |
| 770 TimeDelta::FromMinutes(5)); |
| 771 pool()->PostWorkerTask(FROM_HERE, |
| 772 base::Bind(&TestTracker::SlowTask, tracker(), 0)); |
| 773 const size_t kNumFastTasks = 20; |
| 774 for (size_t i = 0; i < kNumFastTasks; i++) { |
| 775 pool()->PostWorkerTask(FROM_HERE, |
| 776 base::Bind(&TestTracker::FastTask, tracker(), 0)); |
| 777 } |
| 778 pool()->PostWorkerTask( |
| 779 FROM_HERE, |
| 780 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(), |
| 781 true)); |
| 782 |
| 783 // We expect all except the delayed task to have been run. We verify all |
| 784 // closures have been deleted deleted by looking at the refcount of the |
| 785 // tracker. |
| 786 EXPECT_FALSE(tracker()->HasOneRef()); |
| 787 pool()->FlushForTesting(); |
| 788 EXPECT_TRUE(tracker()->HasOneRef()); |
| 789 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount()); |
| 790 |
| 791 // Should be fine to call on an idle instance with all threads created, and |
| 792 // spamming the method shouldn't deadlock or confuse the class. |
| 793 pool()->FlushForTesting(); |
| 794 pool()->FlushForTesting(); |
| 795 |
| 796 // Should be fine to call after shutdown too. |
| 797 pool()->Shutdown(); |
| 798 pool()->FlushForTesting(); |
| 799 } |
| 800 |
| 752 class SequencedWorkerPoolTaskRunnerTestDelegate { | 801 class SequencedWorkerPoolTaskRunnerTestDelegate { |
| 753 public: | 802 public: |
| 754 SequencedWorkerPoolTaskRunnerTestDelegate() {} | 803 SequencedWorkerPoolTaskRunnerTestDelegate() {} |
| 755 | 804 |
| 756 ~SequencedWorkerPoolTaskRunnerTestDelegate() {} | 805 ~SequencedWorkerPoolTaskRunnerTestDelegate() {} |
| 757 | 806 |
| 758 void StartTaskRunner() { | 807 void StartTaskRunner() { |
| 759 pool_owner_.reset( | 808 pool_owner_.reset( |
| 760 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); | 809 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); |
| 761 } | 810 } |
| 762 | 811 |
| 763 scoped_refptr<SequencedWorkerPool> GetTaskRunner() { | 812 scoped_refptr<SequencedWorkerPool> GetTaskRunner() { |
| 764 return pool_owner_->pool(); | 813 return pool_owner_->pool(); |
| 765 } | 814 } |
| 766 | 815 |
| 767 void StopTaskRunner() { | 816 void StopTaskRunner() { |
| 768 // Make sure all tasks (including delayed ones) are run before shutting | 817 // Make sure all tasks are run before shutting down. Delayed tasks are |
| 769 // down. | 818 // not run, they're simply deleted. |
| 770 pool_owner_->pool()->FlushForTesting(); | 819 pool_owner_->pool()->FlushForTesting(); |
| 771 pool_owner_->pool()->Shutdown(); | 820 pool_owner_->pool()->Shutdown(); |
| 772 // Don't reset |pool_owner_| here, as the test may still hold a | 821 // Don't reset |pool_owner_| here, as the test may still hold a |
| 773 // reference to the pool. | 822 // reference to the pool. |
| 774 } | 823 } |
| 775 | 824 |
| 776 bool TaskRunnerHandlesNonZeroDelays() const { | 825 bool TaskRunnerHandlesNonZeroDelays() const { |
| 777 return true; | 826 return true; |
| 778 } | 827 } |
| 779 | 828 |
| (...skipping 18 matching lines...) Expand all Loading... |
| 798 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); | 847 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); |
| 799 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( | 848 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( |
| 800 SequencedWorkerPool::BLOCK_SHUTDOWN); | 849 SequencedWorkerPool::BLOCK_SHUTDOWN); |
| 801 } | 850 } |
| 802 | 851 |
| 803 scoped_refptr<TaskRunner> GetTaskRunner() { | 852 scoped_refptr<TaskRunner> GetTaskRunner() { |
| 804 return task_runner_; | 853 return task_runner_; |
| 805 } | 854 } |
| 806 | 855 |
| 807 void StopTaskRunner() { | 856 void StopTaskRunner() { |
| 808 // Make sure all tasks (including delayed ones) are run before shutting | 857 // Make sure all tasks are run before shutting down. Delayed tasks are |
| 809 // down. | 858 // not run, they're simply deleted. |
| 810 pool_owner_->pool()->FlushForTesting(); | 859 pool_owner_->pool()->FlushForTesting(); |
| 811 pool_owner_->pool()->Shutdown(); | 860 pool_owner_->pool()->Shutdown(); |
| 812 // Don't reset |pool_owner_| here, as the test may still hold a | 861 // Don't reset |pool_owner_| here, as the test may still hold a |
| 813 // reference to the pool. | 862 // reference to the pool. |
| 814 } | 863 } |
| 815 | 864 |
| 816 bool TaskRunnerHandlesNonZeroDelays() const { | 865 bool TaskRunnerHandlesNonZeroDelays() const { |
| 817 return true; | 866 return true; |
| 818 } | 867 } |
| 819 | 868 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 839 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); | 888 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); |
| 840 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( | 889 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( |
| 841 pool_owner_->pool()->GetSequenceToken()); | 890 pool_owner_->pool()->GetSequenceToken()); |
| 842 } | 891 } |
| 843 | 892 |
| 844 scoped_refptr<SequencedTaskRunner> GetTaskRunner() { | 893 scoped_refptr<SequencedTaskRunner> GetTaskRunner() { |
| 845 return task_runner_; | 894 return task_runner_; |
| 846 } | 895 } |
| 847 | 896 |
| 848 void StopTaskRunner() { | 897 void StopTaskRunner() { |
| 849 // Make sure all tasks (including delayed ones) are run before shutting | 898 // Make sure all tasks are run before shutting down. Delayed tasks are |
| 850 // down. | 899 // not run, they're simply deleted. |
| 851 pool_owner_->pool()->FlushForTesting(); | 900 pool_owner_->pool()->FlushForTesting(); |
| 852 pool_owner_->pool()->Shutdown(); | 901 pool_owner_->pool()->Shutdown(); |
| 853 // Don't reset |pool_owner_| here, as the test may still hold a | 902 // Don't reset |pool_owner_| here, as the test may still hold a |
| 854 // reference to the pool. | 903 // reference to the pool. |
| 855 } | 904 } |
| 856 | 905 |
| 857 bool TaskRunnerHandlesNonZeroDelays() const { | 906 bool TaskRunnerHandlesNonZeroDelays() const { |
| 858 return true; | 907 return true; |
| 859 } | 908 } |
| 860 | 909 |
| 861 private: | 910 private: |
| 862 MessageLoop message_loop_; | 911 MessageLoop message_loop_; |
| 863 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; | 912 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; |
| 864 scoped_refptr<SequencedTaskRunner> task_runner_; | 913 scoped_refptr<SequencedTaskRunner> task_runner_; |
| 865 }; | 914 }; |
| 866 | 915 |
| 867 INSTANTIATE_TYPED_TEST_CASE_P( | 916 INSTANTIATE_TYPED_TEST_CASE_P( |
| 868 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, | 917 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, |
| 869 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); | 918 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
| 870 | 919 |
| 871 INSTANTIATE_TYPED_TEST_CASE_P( | 920 INSTANTIATE_TYPED_TEST_CASE_P( |
| 872 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, | 921 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, |
| 873 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); | 922 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
| 874 | 923 |
| 875 } // namespace | 924 } // namespace |
| 876 | 925 |
| 877 } // namespace base | 926 } // namespace base |
| OLD | NEW |