Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(415)

Side by Side Diff: base/threading/worker_pool_posix_unittest.cc

Issue 6079009: Move some misc thread-related stuff from base to base/thread and into the bas... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « base/threading/worker_pool_posix.cc ('k') | base/threading/worker_pool_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/worker_pool_posix.h"
6
7 #include <set>
8
9 #include "base/condition_variable.h"
10 #include "base/lock.h"
11 #include "base/platform_thread.h"
12 #include "base/task.h"
13 #include "base/waitable_event.h"
14 #include "testing/gtest/include/gtest/gtest.h"
15
16 namespace base {
17
18 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
19 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
20 public:
21 explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
22 : pool_(pool) {}
23
24 Lock* lock() { return &pool_->lock_; }
25 ConditionVariable* tasks_available_cv() {
26 return &pool_->tasks_available_cv_;
27 }
28 const std::queue<Task*>& tasks() const { return pool_->tasks_; }
29 int num_idle_threads() const { return pool_->num_idle_threads_; }
30 ConditionVariable* num_idle_threads_cv() {
31 return pool_->num_idle_threads_cv_.get();
32 }
33 void set_num_idle_threads_cv(ConditionVariable* cv) {
34 pool_->num_idle_threads_cv_.reset(cv);
35 }
36
37 private:
38 PosixDynamicThreadPool* pool_;
39
40 DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
41 };
42
43 namespace {
44
45 // IncrementingTask's main purpose is to increment a counter. It also updates a
46 // set of unique thread ids, and signals a ConditionVariable on completion.
47 // Note that since it does not block, there is no way to control the number of
48 // threads used if more than one IncrementingTask is consecutively posted to the
49 // thread pool, since the first one might finish executing before the subsequent
50 // PostTask() calls get invoked.
51 class IncrementingTask : public Task {
52 public:
53 IncrementingTask(Lock* counter_lock,
54 int* counter,
55 Lock* unique_threads_lock,
56 std::set<PlatformThreadId>* unique_threads)
57 : counter_lock_(counter_lock),
58 unique_threads_lock_(unique_threads_lock),
59 unique_threads_(unique_threads),
60 counter_(counter) {}
61
62 virtual void Run() {
63 AddSelfToUniqueThreadSet();
64 AutoLock locked(*counter_lock_);
65 (*counter_)++;
66 }
67
68 void AddSelfToUniqueThreadSet() {
69 AutoLock locked(*unique_threads_lock_);
70 unique_threads_->insert(PlatformThread::CurrentId());
71 }
72
73 private:
74 Lock* counter_lock_;
75 Lock* unique_threads_lock_;
76 std::set<PlatformThreadId>* unique_threads_;
77 int* counter_;
78
79 DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
80 };
81
82 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
83 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
84 class BlockingIncrementingTask : public Task {
85 public:
86 BlockingIncrementingTask(Lock* counter_lock,
87 int* counter,
88 Lock* unique_threads_lock,
89 std::set<PlatformThreadId>* unique_threads,
90 Lock* num_waiting_to_start_lock,
91 int* num_waiting_to_start,
92 ConditionVariable* num_waiting_to_start_cv,
93 base::WaitableEvent* start)
94 : incrementer_(
95 counter_lock, counter, unique_threads_lock, unique_threads),
96 num_waiting_to_start_lock_(num_waiting_to_start_lock),
97 num_waiting_to_start_(num_waiting_to_start),
98 num_waiting_to_start_cv_(num_waiting_to_start_cv),
99 start_(start) {}
100
101 virtual void Run() {
102 {
103 AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
104 (*num_waiting_to_start_)++;
105 }
106 num_waiting_to_start_cv_->Signal();
107 CHECK(start_->Wait());
108 incrementer_.Run();
109 }
110
111 private:
112 IncrementingTask incrementer_;
113 Lock* num_waiting_to_start_lock_;
114 int* num_waiting_to_start_;
115 ConditionVariable* num_waiting_to_start_cv_;
116 base::WaitableEvent* start_;
117
118 DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
119 };
120
121 class PosixDynamicThreadPoolTest : public testing::Test {
122 protected:
123 PosixDynamicThreadPoolTest()
124 : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)),
125 peer_(pool_.get()),
126 counter_(0),
127 num_waiting_to_start_(0),
128 num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
129 start_(true, false) {}
130
131 virtual void SetUp() {
132 peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
133 }
134
135 virtual void TearDown() {
136 // Wake up the idle threads so they can terminate.
137 if (pool_.get()) pool_->Terminate();
138 }
139
140 void WaitForTasksToStart(int num_tasks) {
141 AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
142 while (num_waiting_to_start_ < num_tasks) {
143 num_waiting_to_start_cv_.Wait();
144 }
145 }
146
147 void WaitForIdleThreads(int num_idle_threads) {
148 AutoLock pool_locked(*peer_.lock());
149 while (peer_.num_idle_threads() < num_idle_threads) {
150 peer_.num_idle_threads_cv()->Wait();
151 }
152 }
153
154 Task* CreateNewIncrementingTask() {
155 return new IncrementingTask(&counter_lock_, &counter_,
156 &unique_threads_lock_, &unique_threads_);
157 }
158
159 Task* CreateNewBlockingIncrementingTask() {
160 return new BlockingIncrementingTask(
161 &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
162 &num_waiting_to_start_lock_, &num_waiting_to_start_,
163 &num_waiting_to_start_cv_, &start_);
164 }
165
166 scoped_refptr<base::PosixDynamicThreadPool> pool_;
167 base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
168 Lock counter_lock_;
169 int counter_;
170 Lock unique_threads_lock_;
171 std::set<PlatformThreadId> unique_threads_;
172 Lock num_waiting_to_start_lock_;
173 int num_waiting_to_start_;
174 ConditionVariable num_waiting_to_start_cv_;
175 base::WaitableEvent start_;
176 };
177
178 } // namespace
179
180 TEST_F(PosixDynamicThreadPoolTest, Basic) {
181 EXPECT_EQ(0, peer_.num_idle_threads());
182 EXPECT_EQ(0U, unique_threads_.size());
183 EXPECT_EQ(0U, peer_.tasks().size());
184
185 // Add one task and wait for it to be completed.
186 pool_->PostTask(CreateNewIncrementingTask());
187
188 WaitForIdleThreads(1);
189
190 EXPECT_EQ(1U, unique_threads_.size()) <<
191 "There should be only one thread allocated for one task.";
192 EXPECT_EQ(1, peer_.num_idle_threads());
193 EXPECT_EQ(1, counter_);
194 }
195
196 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
197 // Add one task and wait for it to be completed.
198 pool_->PostTask(CreateNewIncrementingTask());
199
200 WaitForIdleThreads(1);
201
202 // Add another 2 tasks. One should reuse the existing worker thread.
203 pool_->PostTask(CreateNewBlockingIncrementingTask());
204 pool_->PostTask(CreateNewBlockingIncrementingTask());
205
206 WaitForTasksToStart(2);
207 start_.Signal();
208 WaitForIdleThreads(2);
209
210 EXPECT_EQ(2U, unique_threads_.size());
211 EXPECT_EQ(2, peer_.num_idle_threads());
212 EXPECT_EQ(3, counter_);
213 }
214
215 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
216 // Add two blocking tasks.
217 pool_->PostTask(CreateNewBlockingIncrementingTask());
218 pool_->PostTask(CreateNewBlockingIncrementingTask());
219
220 EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
221
222 WaitForTasksToStart(2);
223 start_.Signal();
224 WaitForIdleThreads(2);
225
226 EXPECT_EQ(2U, unique_threads_.size());
227 EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
228 EXPECT_EQ(2, counter_);
229 }
230
231 TEST_F(PosixDynamicThreadPoolTest, Complex) {
232 // Add two non blocking tasks and wait for them to finish.
233 pool_->PostTask(CreateNewIncrementingTask());
234
235 WaitForIdleThreads(1);
236
237 // Add two blocking tasks, start them simultaneously, and wait for them to
238 // finish.
239 pool_->PostTask(CreateNewBlockingIncrementingTask());
240 pool_->PostTask(CreateNewBlockingIncrementingTask());
241
242 WaitForTasksToStart(2);
243 start_.Signal();
244 WaitForIdleThreads(2);
245
246 EXPECT_EQ(3, counter_);
247 EXPECT_EQ(2, peer_.num_idle_threads());
248 EXPECT_EQ(2U, unique_threads_.size());
249
250 // Wake up all idle threads so they can exit.
251 {
252 AutoLock locked(*peer_.lock());
253 while (peer_.num_idle_threads() > 0) {
254 peer_.tasks_available_cv()->Signal();
255 peer_.num_idle_threads_cv()->Wait();
256 }
257 }
258
259 // Add another non blocking task. There are no threads to reuse.
260 pool_->PostTask(CreateNewIncrementingTask());
261 WaitForIdleThreads(1);
262
263 EXPECT_EQ(3U, unique_threads_.size());
264 EXPECT_EQ(1, peer_.num_idle_threads());
265 EXPECT_EQ(4, counter_);
266 }
267
268 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/worker_pool_posix.cc ('k') | base/threading/worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698