Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(357)

Side by Side Diff: base/worker_pool_posix_unittest.cc

Issue 6079009: Move some misc thread-related stuff from base to base/thread and into the bas... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « base/worker_pool_posix.cc ('k') | base/worker_pool_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/worker_pool_posix.h"
6
7 #include <set>
8
9 #include "base/condition_variable.h"
10 #include "base/lock.h"
11 #include "base/platform_thread.h"
12 #include "base/task.h"
13 #include "base/waitable_event.h"
14 #include "testing/gtest/include/gtest/gtest.h"
15
16 namespace base {
17
18 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
19 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
20 public:
21 explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
22 : pool_(pool) {}
23
24 Lock* lock() { return &pool_->lock_; }
25 ConditionVariable* tasks_available_cv() {
26 return &pool_->tasks_available_cv_;
27 }
28 const std::queue<Task*>& tasks() const { return pool_->tasks_; }
29 int num_idle_threads() const { return pool_->num_idle_threads_; }
30 ConditionVariable* num_idle_threads_cv() {
31 return pool_->num_idle_threads_cv_.get();
32 }
33 void set_num_idle_threads_cv(ConditionVariable* cv) {
34 pool_->num_idle_threads_cv_.reset(cv);
35 }
36
37 private:
38 PosixDynamicThreadPool* pool_;
39
40 DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
41 };
42
43 } // namespace base
44
45 namespace {
46
47 // IncrementingTask's main purpose is to increment a counter. It also updates a
48 // set of unique thread ids, and signals a ConditionVariable on completion.
49 // Note that since it does not block, there is no way to control the number of
50 // threads used if more than one IncrementingTask is consecutively posted to the
51 // thread pool, since the first one might finish executing before the subsequent
52 // PostTask() calls get invoked.
53 class IncrementingTask : public Task {
54 public:
55 IncrementingTask(Lock* counter_lock,
56 int* counter,
57 Lock* unique_threads_lock,
58 std::set<PlatformThreadId>* unique_threads)
59 : counter_lock_(counter_lock),
60 unique_threads_lock_(unique_threads_lock),
61 unique_threads_(unique_threads),
62 counter_(counter) {}
63
64 virtual void Run() {
65 AddSelfToUniqueThreadSet();
66 AutoLock locked(*counter_lock_);
67 (*counter_)++;
68 }
69
70 void AddSelfToUniqueThreadSet() {
71 AutoLock locked(*unique_threads_lock_);
72 unique_threads_->insert(PlatformThread::CurrentId());
73 }
74
75 private:
76 Lock* counter_lock_;
77 Lock* unique_threads_lock_;
78 std::set<PlatformThreadId>* unique_threads_;
79 int* counter_;
80
81 DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
82 };
83
84 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
85 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
86 class BlockingIncrementingTask : public Task {
87 public:
88 BlockingIncrementingTask(Lock* counter_lock,
89 int* counter,
90 Lock* unique_threads_lock,
91 std::set<PlatformThreadId>* unique_threads,
92 Lock* num_waiting_to_start_lock,
93 int* num_waiting_to_start,
94 ConditionVariable* num_waiting_to_start_cv,
95 base::WaitableEvent* start)
96 : incrementer_(
97 counter_lock, counter, unique_threads_lock, unique_threads),
98 num_waiting_to_start_lock_(num_waiting_to_start_lock),
99 num_waiting_to_start_(num_waiting_to_start),
100 num_waiting_to_start_cv_(num_waiting_to_start_cv),
101 start_(start) {}
102
103 virtual void Run() {
104 {
105 AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
106 (*num_waiting_to_start_)++;
107 }
108 num_waiting_to_start_cv_->Signal();
109 CHECK(start_->Wait());
110 incrementer_.Run();
111 }
112
113 private:
114 IncrementingTask incrementer_;
115 Lock* num_waiting_to_start_lock_;
116 int* num_waiting_to_start_;
117 ConditionVariable* num_waiting_to_start_cv_;
118 base::WaitableEvent* start_;
119
120 DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
121 };
122
123 class PosixDynamicThreadPoolTest : public testing::Test {
124 protected:
125 PosixDynamicThreadPoolTest()
126 : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)),
127 peer_(pool_.get()),
128 counter_(0),
129 num_waiting_to_start_(0),
130 num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
131 start_(true, false) {}
132
133 virtual void SetUp() {
134 peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
135 }
136
137 virtual void TearDown() {
138 // Wake up the idle threads so they can terminate.
139 if (pool_.get()) pool_->Terminate();
140 }
141
142 void WaitForTasksToStart(int num_tasks) {
143 AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
144 while (num_waiting_to_start_ < num_tasks) {
145 num_waiting_to_start_cv_.Wait();
146 }
147 }
148
149 void WaitForIdleThreads(int num_idle_threads) {
150 AutoLock pool_locked(*peer_.lock());
151 while (peer_.num_idle_threads() < num_idle_threads) {
152 peer_.num_idle_threads_cv()->Wait();
153 }
154 }
155
156 Task* CreateNewIncrementingTask() {
157 return new IncrementingTask(&counter_lock_, &counter_,
158 &unique_threads_lock_, &unique_threads_);
159 }
160
161 Task* CreateNewBlockingIncrementingTask() {
162 return new BlockingIncrementingTask(
163 &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
164 &num_waiting_to_start_lock_, &num_waiting_to_start_,
165 &num_waiting_to_start_cv_, &start_);
166 }
167
168 scoped_refptr<base::PosixDynamicThreadPool> pool_;
169 base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
170 Lock counter_lock_;
171 int counter_;
172 Lock unique_threads_lock_;
173 std::set<PlatformThreadId> unique_threads_;
174 Lock num_waiting_to_start_lock_;
175 int num_waiting_to_start_;
176 ConditionVariable num_waiting_to_start_cv_;
177 base::WaitableEvent start_;
178 };
179
180 TEST_F(PosixDynamicThreadPoolTest, Basic) {
181 EXPECT_EQ(0, peer_.num_idle_threads());
182 EXPECT_EQ(0U, unique_threads_.size());
183 EXPECT_EQ(0U, peer_.tasks().size());
184
185 // Add one task and wait for it to be completed.
186 pool_->PostTask(CreateNewIncrementingTask());
187
188 WaitForIdleThreads(1);
189
190 EXPECT_EQ(1U, unique_threads_.size()) <<
191 "There should be only one thread allocated for one task.";
192 EXPECT_EQ(1, peer_.num_idle_threads());
193 EXPECT_EQ(1, counter_);
194 }
195
196 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
197 // Add one task and wait for it to be completed.
198 pool_->PostTask(CreateNewIncrementingTask());
199
200 WaitForIdleThreads(1);
201
202 // Add another 2 tasks. One should reuse the existing worker thread.
203 pool_->PostTask(CreateNewBlockingIncrementingTask());
204 pool_->PostTask(CreateNewBlockingIncrementingTask());
205
206 WaitForTasksToStart(2);
207 start_.Signal();
208 WaitForIdleThreads(2);
209
210 EXPECT_EQ(2U, unique_threads_.size());
211 EXPECT_EQ(2, peer_.num_idle_threads());
212 EXPECT_EQ(3, counter_);
213 }
214
215 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
216 // Add two blocking tasks.
217 pool_->PostTask(CreateNewBlockingIncrementingTask());
218 pool_->PostTask(CreateNewBlockingIncrementingTask());
219
220 EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
221
222 WaitForTasksToStart(2);
223 start_.Signal();
224 WaitForIdleThreads(2);
225
226 EXPECT_EQ(2U, unique_threads_.size());
227 EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
228 EXPECT_EQ(2, counter_);
229 }
230
231 TEST_F(PosixDynamicThreadPoolTest, Complex) {
232 // Add two non blocking tasks and wait for them to finish.
233 pool_->PostTask(CreateNewIncrementingTask());
234
235 WaitForIdleThreads(1);
236
237 // Add two blocking tasks, start them simultaneously, and wait for them to
238 // finish.
239 pool_->PostTask(CreateNewBlockingIncrementingTask());
240 pool_->PostTask(CreateNewBlockingIncrementingTask());
241
242 WaitForTasksToStart(2);
243 start_.Signal();
244 WaitForIdleThreads(2);
245
246 EXPECT_EQ(3, counter_);
247 EXPECT_EQ(2, peer_.num_idle_threads());
248 EXPECT_EQ(2U, unique_threads_.size());
249
250 // Wake up all idle threads so they can exit.
251 {
252 AutoLock locked(*peer_.lock());
253 while (peer_.num_idle_threads() > 0) {
254 peer_.tasks_available_cv()->Signal();
255 peer_.num_idle_threads_cv()->Wait();
256 }
257 }
258
259 // Add another non blocking task. There are no threads to reuse.
260 pool_->PostTask(CreateNewIncrementingTask());
261 WaitForIdleThreads(1);
262
263 EXPECT_EQ(3U, unique_threads_.size());
264 EXPECT_EQ(1, peer_.num_idle_threads());
265 EXPECT_EQ(4, counter_);
266 }
267
268 } // namespace
OLDNEW
« no previous file with comments | « base/worker_pool_posix.cc ('k') | base/worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698