Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(115)

Side by Side Diff: base/threading/sequenced_worker_pool_unittest.cc

Issue 8416019: Add a sequenced worker pool (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: '' Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <algorithm>
6
7 #include "base/bind.h"
8 #include "base/memory/ref_counted.h"
9 #include "base/synchronization/condition_variable.h"
10 #include "base/synchronization/lock.h"
11 #include "base/threading/platform_thread.h"
12 #include "base/threading/sequenced_worker_pool.h"
13 #include "testing/gtest/include/gtest/gtest.h"
14
15 namespace base {
16
17 // IMPORTANT NOTE:
18 //
19 // Many of these tests have failure modes where they'll hang forever. These
20 // tests should not be flaky, and hangling indicates a type of failure. Do not
21 // mark as flaky if they're hanging, it's likely an actual bug.
22
23 namespace {
24
25 const size_t kNumWorkerThreads = 3;
26
27 // Allows a number of threads to all be blocked on the same event, and
28 // provides a way to unblock a certain number of them.
29 class ThreadBlocker {
30 public:
31 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {
32 }
33
34 void Block() {
35 {
36 base::AutoLock lock(lock_);
37 while (unblock_counter_ == 0)
38 cond_var_.Wait();
39 unblock_counter_--;
40 }
41 cond_var_.Signal();
42 }
43
44 void Unblock(size_t count) {
45 {
46 base::AutoLock lock(lock_);
47 DCHECK(unblock_counter_ == 0);
48 unblock_counter_ = count;
49 }
50 cond_var_.Signal();
51 }
52
53 private:
54 base::Lock lock_;
55 base::ConditionVariable cond_var_;
56
57 size_t unblock_counter_;
58 };
59
60 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
61 public:
62 TestTracker()
63 : lock_(),
64 cond_var_(&lock_),
65 started_events_(0) {
66 }
67
68 // Each of these tasks appends the argument to the complete sequence vector
69 // so calling code can see what order they finished in.
70 void FastTask(int id) {
71 SignalWorkerDone(id);
72 }
73 void SlowTask(int id) {
74 base::PlatformThread::Sleep(1000);
75 SignalWorkerDone(id);
76 }
77
78 void BlockTask(int id, ThreadBlocker* blocker) {
79 // Note that this task has started and signal anybody waiting for that
80 // to happen.
81 {
82 base::AutoLock lock(lock_);
83 started_events_++;
84 }
85 cond_var_.Signal();
86
87 blocker->Block();
88 SignalWorkerDone(id);
89 }
90
91 // Waits until the given number of tasks have started executing.
92 void WaitUntilTasksBlocked(size_t count) {
93 {
94 base::AutoLock lock(lock_);
95 while (started_events_ < count)
96 cond_var_.Wait();
97 }
98 cond_var_.Signal();
99 }
100
101 // Blocks the current thread until at least the given number of tasks are in
102 // the completed vector, and then returns a copy.
103 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
104 std::vector<int> ret;
105 {
106 base::AutoLock lock(lock_);
107 while (complete_sequence_.size() < num_tasks)
108 cond_var_.Wait();
109 ret = complete_sequence_;
110 }
111 cond_var_.Signal();
112 return ret;
113 }
114
115 void ClearCompleteSequence() {
116 base::AutoLock lock(lock_);
117 complete_sequence_.clear();
118 started_events_ = 0;
119 }
120
121 private:
122 void SignalWorkerDone(int id) {
123 {
124 base::AutoLock lock(lock_);
125 complete_sequence_.push_back(id);
126 }
127 cond_var_.Signal();
128 }
129
130 // Protects the complete_sequence.
131 base::Lock lock_;
132
133 base::ConditionVariable cond_var_;
134
135 // Protected by lock_.
136 std::vector<int> complete_sequence_;
137
138 // Counter of the number of "block" workers that have started.
139 size_t started_events_;
140 };
141
142 class SequencedWorkerPoolTest : public testing::Test,
143 public SequencedWorkerPool::TestingObserver {
144 public:
145 SequencedWorkerPoolTest()
146 : pool_(kNumWorkerThreads, "test"),
147 tracker_(new TestTracker) {
148 pool_.SetTestingObserver(this);
149 }
150 ~SequencedWorkerPoolTest() {
151 }
152
153 virtual void SetUp() {
154 }
155 virtual void TearDown() {
156 pool_.Shutdown();
157 }
158
159 SequencedWorkerPool& pool() { return pool_; }
160 TestTracker* tracker() { return tracker_.get(); }
161
162 // Ensures that the given number of worker threads is created by adding
163 // tasks and waiting until they complete. Worker thread creation is
164 // serialized, can happen on background threads asynchronously, and doesn't
165 // happen any more at shutdown. This means that if a test posts a bunch of
166 // tasks and calls shutdown, fewer workers will be created than the test may
167 // expect.
168 //
169 // This function ensures that this condition can't happen so tests can make
170 // assumptions about the number of workers active. See the comment in
171 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
172 // details.
173 //
174 // It will post tasks to the queue with id -1. It also assumes this is the
175 // first thing called in a test since it will clear the complete_sequence_.
176 void EnsureAllWorkersCreated() {
177 // Create a bunch of threads, all waiting. This will cause that may
178 // workers to be created.
179 ThreadBlocker blocker;
180 for (size_t i = 0; i < kNumWorkerThreads; i++) {
181 pool().PostWorkerTask(FROM_HERE,
182 base::Bind(&TestTracker::BlockTask,
183 tracker(), -1, &blocker));
184 }
185 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
186
187 // Now wake them up and wait until they're done.
188 blocker.Unblock(kNumWorkerThreads);
189 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
190
191 // Clean up the task IDs we added.
192 tracker()->ClearCompleteSequence();
193 }
194
195 protected:
196 // This closure will be executed right before the pool blocks on shutdown.
197 base::Closure before_wait_for_shutdown_;
198
199 private:
200 // SequencedWorkerPool::TestingObserver implementation.
201 virtual void WillWaitForShutdown() {
202 if (!before_wait_for_shutdown_.is_null())
203 before_wait_for_shutdown_.Run();
204 }
205
206 SequencedWorkerPool pool_;
207 scoped_refptr<TestTracker> tracker_;
208 };
209
210 // Checks that the given number of entries are in the tasks to complete of
211 // the given tracker, and then signals the given event the given number of
212 // times. This is used to wakt up blocked background threads before blocking
213 // on shutdown.
214 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
215 size_t expected_tasks_to_complete,
216 ThreadBlocker* blocker,
217 size_t threads_to_awake) {
218 EXPECT_EQ(
219 expected_tasks_to_complete,
220 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
221
222 blocker->Unblock(threads_to_awake);
223 }
224
225 } // namespace
226
227 // Tests that same-named tokens have the same ID.
228 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
229 const std::string name1("hello");
230 SequencedWorkerPool::SequenceToken token1 =
231 pool().GetNamedSequenceToken(name1);
232
233 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
234
235 const std::string name3("goodbye");
236 SequencedWorkerPool::SequenceToken token3 =
237 pool().GetNamedSequenceToken(name3);
238
239 // All 3 tokens should be different.
240 EXPECT_FALSE(token1.Equals(token2));
241 EXPECT_FALSE(token1.Equals(token3));
242 EXPECT_FALSE(token2.Equals(token3));
243
244 // Requesting the same name again should give the same value.
245 SequencedWorkerPool::SequenceToken token1again =
246 pool().GetNamedSequenceToken(name1);
247 EXPECT_TRUE(token1.Equals(token1again));
248
249 SequencedWorkerPool::SequenceToken token3again =
250 pool().GetNamedSequenceToken(name3);
251 EXPECT_TRUE(token3.Equals(token3again));
252 }
253
254 // Tests that posting a bunch of tasks (many more than the number of worker
255 // threads) runs them all.
256 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
257 pool().PostWorkerTask(FROM_HERE,
258 base::Bind(&TestTracker::SlowTask, tracker(), 0));
259
260 const size_t kNumTasks = 20;
261 for (size_t i = 1; i < kNumTasks; i++) {
262 pool().PostWorkerTask(FROM_HERE,
263 base::Bind(&TestTracker::FastTask, tracker(), i));
264 }
265
266 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
267 EXPECT_EQ(kNumTasks, result.size());
268 }
269
270 // Test that tasks with the same sequence token are executed in order but don't
271 // affect other tasks.
272 TEST_F(SequencedWorkerPoolTest, Sequence) {
273 // Fill all the worker threads except one.
274 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
275 ThreadBlocker background_blocker;
276 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
277 pool().PostWorkerTask(FROM_HERE,
278 base::Bind(&TestTracker::BlockTask,
279 tracker(), i, &background_blocker));
280 }
281 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
282
283 // Create two tasks with the same sequence token, one that will block on the
284 // event, and one which will just complete quickly when it's run. Since there
285 // is one worker thread free, the first task will start and then block, and
286 // the second task should be waiting.
287 ThreadBlocker blocker;
288 SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken();
289 pool().PostSequencedWorkerTask(
290 token1, FROM_HERE,
291 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
292 pool().PostSequencedWorkerTask(
293 token1, FROM_HERE,
294 base::Bind(&TestTracker::FastTask, tracker(), 101));
295 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
296
297 // Create another two tasks as above with a different token. These will be
298 // blocked since there are no slots to run.
299 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
300 pool().PostSequencedWorkerTask(
301 token2, FROM_HERE,
302 base::Bind(&TestTracker::FastTask, tracker(), 200));
303 pool().PostSequencedWorkerTask(
304 token2, FROM_HERE,
305 base::Bind(&TestTracker::FastTask, tracker(), 201));
306 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
307
308 // Let one background task complete. This should then let both tasks of
309 // token2 run to completion in order. The second task of token1 should still
310 // be blocked.
311 background_blocker.Unblock(1);
312 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
313 ASSERT_EQ(3u, result.size());
314 EXPECT_EQ(200, result[1]);
315 EXPECT_EQ(201, result[2]);
316
317 // Finish the rest of the background tasks. This should leave some workers
318 // free with the second token1 task still blocked on the first.
319 background_blocker.Unblock(kNumBackgroundTasks - 1);
320 EXPECT_EQ(kNumBackgroundTasks + 2,
321 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
322
323 // Allow the first task of token1 to complete. This should run the second.
324 blocker.Unblock(1);
325 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
326 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
327 EXPECT_EQ(100, result[result.size() - 2]);
328 EXPECT_EQ(101, result[result.size() - 1]);
329 }
330
331 // Tests that unrun tasks are discarded properly according to their shutdown
332 // mode.
333 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
334 // Start tasks to take all the threads and block them.
335 EnsureAllWorkersCreated();
336 ThreadBlocker blocker;
337 for (size_t i = 0; i < kNumWorkerThreads; i++) {
338 pool().PostWorkerTask(FROM_HERE,
339 base::Bind(&TestTracker::BlockTask,
340 tracker(), i, &blocker));
341 }
342 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
343
344 // Create some tasks with different shutdown modes.
345 pool().PostWorkerTaskWithShutdownBehavior(
346 FROM_HERE,
347 base::Bind(&TestTracker::FastTask, tracker(), 100),
348 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
349 pool().PostWorkerTaskWithShutdownBehavior(
350 FROM_HERE,
351 base::Bind(&TestTracker::FastTask, tracker(), 101),
352 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
353 pool().PostWorkerTaskWithShutdownBehavior(
354 FROM_HERE,
355 base::Bind(&TestTracker::FastTask, tracker(), 102),
356 SequencedWorkerPool::BLOCK_SHUTDOWN);
357
358 // Shutdown the worker pool. This should discard all non-blocking tasks.
359 before_wait_for_shutdown_ =
360 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
361 scoped_refptr<TestTracker>(tracker()), 0,
362 &blocker, kNumWorkerThreads);
363 pool().Shutdown();
364
365 std::vector<int> result = tracker()->WaitUntilTasksComplete(4);
366
367 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
368 // one, in no particular order.
369 ASSERT_EQ(4u, result.size());
370 for (size_t i = 0; i < kNumWorkerThreads; i++) {
371 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
372 result.end());
373 }
374 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
375 }
376
377 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
378 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
379 EnsureAllWorkersCreated();
380 ThreadBlocker blocker;
381 pool().PostWorkerTaskWithShutdownBehavior(
382 FROM_HERE,
383 base::Bind(&TestTracker::BlockTask,
384 tracker(), 0, &blocker),
385 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
386 tracker()->WaitUntilTasksBlocked(1);
387
388 // This should not block. If this test hangs, it means it failed.
389 pool().Shutdown();
390
391 // The task should not have completed yet.
392 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
393
394 // Posting more tasks should fail.
395 EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior(
396 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
397 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
398
399 // Continue the background thread and make sure the task can complete.
400 blocker.Unblock(1);
401 std::vector<int> result = tracker()->WaitUntilTasksComplete(1);
402 EXPECT_EQ(1u, result.size());
403 }
404
405 } // namespace base
OLDNEW
« base/threading/sequenced_worker_pool.cc ('K') | « base/threading/sequenced_worker_pool.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698