Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(98)

Side by Side Diff: base/threading/sequenced_worker_pool_unittest.cc

Issue 8416019: Add a sequenced worker pool (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: '' Created 8 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « base/threading/sequenced_worker_pool.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <algorithm>
6
7 #include "base/bind.h"
8 #include "base/memory/ref_counted.h"
9 #include "base/synchronization/condition_variable.h"
10 #include "base/synchronization/lock.h"
11 #include "base/synchronization/waitable_event.h"
12 #include "base/threading/platform_thread.h"
13 #include "base/threading/sequenced_worker_pool.h"
14 #include "testing/gtest/include/gtest/gtest.h"
15
16 namespace base {
17
18 // IMPORTANT NOTE:
19 //
20 // Many of these tests have failure modes where they'll hang forever. These
21 // tests should not be flaky, and hangling indicates a type of failure. Do not
22 // mark as flaky if they're hanging, it's likely an actual bug.
23
24 namespace {
25
26 const size_t kNumWorkerThreads = 3;
27
28 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
29 public:
30 TestTracker()
31 : lock_(),
32 cond_var_(&lock_),
33 completed_event_(false, false),
34 started_events_(0) {
35 }
36
37 // Each of these tasks appends the argument to the complete sequence vector
38 // so calling code can see what order they finished in.
39 void FastTask(int id) {
40 SignalWorkerDone(id);
41 }
42 void SlowTask(int id) {
43 base::PlatformThread::Sleep(1000);
44 SignalWorkerDone(id);
45 }
46 void BlockOnEventTask(int id, base::WaitableEvent* event) {
47 // Note that this task has started and signal anwaybody waiting for that
jar (doing other things) 2011/12/29 18:15:45 nit: anwaybody-->anybody
48 // to happen.
49 {
50 base::AutoLock lock(lock_);
51 started_events_++;
52 }
53 cond_var_.Signal();
54
jar (doing other things) 2011/12/29 18:15:45 With murphy threads, all the tasks *could* pause a
brettw 2011/12/29 18:23:17 Ah, good catch. I'll fix this.
55 event->Wait();
56 SignalWorkerDone(id);
57 }
58
59 // Waits until the given number of tasks have started executing.
60 void WaitUntilEventTasksBlocked(int count) {
61 base::AutoLock lock(lock_);
62 while (started_events_ < count)
63 cond_var_.Wait();
64 }
65
66 // Blocks the current thread until at least the given number of tasks are in
67 // the completed vector, and then returns a copy.
68 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
69 for (;;) {
70 {
71 base::AutoLock lock(lock_);
72 if (complete_sequence_.size() >= num_tasks)
73 return complete_sequence_;
74 }
75 completed_event_.Wait();
76 }
77 }
78
79 void ClearCompleteSequence() {
80 base::AutoLock lock(lock_);
81 complete_sequence_.clear();
82 started_events_ = 0;
83 }
84
85 private:
86 void SignalWorkerDone(int id) {
87 base::AutoLock lock(lock_);
88 complete_sequence_.push_back(id);
89 completed_event_.Signal();
90 }
91
92 // Protects the complete_sequence.
93 base::Lock lock_;
94
95 base::ConditionVariable cond_var_;
96
97 // Signaled every time something is posted to the complete_sequence_.
98 base::WaitableEvent completed_event_;
99
100 // Protected by lock_.
101 std::vector<int> complete_sequence_;
102
103 int started_events_;
104 };
105
106 class SequencedWorkerPoolTest : public testing::Test,
107 public SequencedWorkerPool::TestingObserver {
108 public:
109 SequencedWorkerPoolTest()
110 : pool_(kNumWorkerThreads, "test"),
111 tracker_(new TestTracker) {
112 pool_.SetTestingObserver(this);
113 }
114 ~SequencedWorkerPoolTest() {
115 }
116
117 virtual void SetUp() {
118 }
119 virtual void TearDown() {
120 pool_.Shutdown();
121 }
122
123 SequencedWorkerPool& pool() { return pool_; }
124 TestTracker* tracker() { return tracker_.get(); }
125
126 // Ensures that the given number of worker threads is created by adding
127 // tasks and waiting until they complete. Worker thread creation is
128 // serialized, can happen on background threads asynchronously, and doesn't
129 // happen any more at shutdown. This means that if a test posts a bunch of
130 // tasks and calls shutdown, fewer workers will be created than the test may
131 // expect.
132 //
133 // This function ensures that this condition can't happen so tests can make
134 // assumptions about the number of workers active. See the comment in
135 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
136 // details.
137 //
138 // It will post tasks to the queue with id -1. It also assumes this is the
139 // first thing called in a test since it will clear the complete_sequence_.
140 void EnsureAllWorkersCreated() {
141 // Create a bunch of threads, all waiting for an event. This will cause
142 // that may workers to be created.
143 base::WaitableEvent event(false, false);
144 for (size_t i = 0; i < kNumWorkerThreads; i++) {
145 pool().PostWorkerTask(FROM_HERE,
146 base::Bind(&TestTracker::BlockOnEventTask,
147 tracker(), -1, &event));
148 }
149 tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads);
150
151 // Now signal all the workers and wait until they're done.
152 for (size_t i = 0; i < kNumWorkerThreads; i++)
153 event.Signal();
jar (doing other things) 2011/12/29 18:15:45 As noted above (see line 54), this won't consisten
154 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
155
156 // Clean up the task IDs we added.
157 tracker()->ClearCompleteSequence();
158 }
159
160 protected:
161 // This closure will be executed right before the pool blocks on shutdown.
162 base::Closure before_wait_for_shutdown_;
163
164 private:
165 // SequencedWorkerPool::TestingObserver implementation.
166 virtual void WillWaitForShutdown() {
167 if (!before_wait_for_shutdown_.is_null())
168 before_wait_for_shutdown_.Run();
169 }
170
171 SequencedWorkerPool pool_;
172 scoped_refptr<TestTracker> tracker_;
173 };
174
175 // Checks that the given number of entries are in the tasks to complete of
176 // the given tracker, and then signals the given event the given number of
177 // times. This is used to wakt up blocked background threads before blocking
178 // on shutdown.
179 void EnsureTasksToCompleteCountAndSignal(scoped_refptr<TestTracker> tracker,
180 size_t expected_tasks_to_complete,
181 base::WaitableEvent* event,
182 size_t times_to_signal_event) {
183 EXPECT_EQ(
184 expected_tasks_to_complete,
185 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
186
187 for (size_t i = 0; i < times_to_signal_event; i++)
188 event->Signal();
jar (doing other things) 2011/12/29 18:15:45 I don't think event contains a counter, and hence
189 }
190
191 } // namespace
192
193 // Tests that same-named tokens have the same ID.
194 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
195 const std::string name1("hello");
196 SequencedWorkerPool::SequenceToken token1 =
197 pool().GetNamedSequenceToken(name1);
198
199 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
200
201 const std::string name3("goodbye");
202 SequencedWorkerPool::SequenceToken token3 =
203 pool().GetNamedSequenceToken(name3);
204
205 // All 3 tokens should be different.
206 EXPECT_FALSE(token1.Equals(token2));
207 EXPECT_FALSE(token1.Equals(token3));
208 EXPECT_FALSE(token2.Equals(token3));
209
210 // Requesting the same name again should give the same value.
211 SequencedWorkerPool::SequenceToken token1again =
212 pool().GetNamedSequenceToken(name1);
213 EXPECT_TRUE(token1.Equals(token1again));
214
215 SequencedWorkerPool::SequenceToken token3again =
216 pool().GetNamedSequenceToken(name3);
217 EXPECT_TRUE(token3.Equals(token3again));
218 }
219
220 // Tests that posting a bunch of tasks (many more than the number of worker
221 // threads) runs them all.
222 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
223 pool().PostWorkerTask(FROM_HERE,
224 base::Bind(&TestTracker::SlowTask, tracker(), 0));
225
226 const size_t kNumTasks = 20;
227 for (size_t i = 1; i < kNumTasks; i++) {
228 pool().PostWorkerTask(FROM_HERE,
229 base::Bind(&TestTracker::FastTask, tracker(), i));
230 }
231
232 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
233 EXPECT_EQ(kNumTasks, result.size());
234 }
235
236 // Test that tasks with the same sequence token are executed in order but don't
237 // affect other tasks.
238 TEST_F(SequencedWorkerPoolTest, Sequence) {
239 // Fill all the worker threads except one.
240 base::WaitableEvent background_event(false, false);
241 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
242 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
243 pool().PostWorkerTask(FROM_HERE,
244 base::Bind(&TestTracker::BlockOnEventTask,
245 tracker(), i, &background_event));
246 }
247 tracker()->WaitUntilEventTasksBlocked(kNumBackgroundTasks);
248
249 // Create two tasks with the same sequence token, one that will block on the
250 // event, and one which will just complete quickly when it's run. Since there
251 // is one worker thread free, the first task will start and then block, and
252 // the second task should be waiting.
253 base::WaitableEvent event1(false, false);
254 SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken();
255 pool().PostSequencedWorkerTask(
256 token1, FROM_HERE,
257 base::Bind(&TestTracker::BlockOnEventTask, tracker(), 100,
258 &event1));
259 pool().PostSequencedWorkerTask(
260 token1, FROM_HERE,
261 base::Bind(&TestTracker::FastTask, tracker(), 101));
262 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
263
264 // Create another two tasks as above with a different token. These will be
265 // blocked since there are no slots to run.
266 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
267 pool().PostSequencedWorkerTask(
268 token2, FROM_HERE,
269 base::Bind(&TestTracker::FastTask, tracker(), 200));
270 pool().PostSequencedWorkerTask(
271 token2, FROM_HERE,
272 base::Bind(&TestTracker::FastTask, tracker(), 201));
273 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
274
275 // Let one background task complete. This should then let both tasks of
276 // token2 run to completion in order. The second task of token1 should still
277 // be blocked.
278 background_event.Signal();
279 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
280 ASSERT_EQ(3u, result.size());
281 EXPECT_EQ(200, result[1]);
282 EXPECT_EQ(201, result[2]);
283
284 // Finish the rest of the background tasks. This should leave some workers
285 // free with the second token1 task still blocked on the first.
286 for (size_t i = 0; i < kNumBackgroundTasks - 1; i++)
287 background_event.Signal();
288 EXPECT_EQ(kNumBackgroundTasks + 2,
289 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
290
291 // Allow the first task of token1 to complete. This should run the second.
292 event1.Signal();
293 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
294 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
295 EXPECT_EQ(100, result[result.size() - 2]);
296 EXPECT_EQ(101, result[result.size() - 1]);
297 }
298
299 // Tests that unrun tasks are discarded properly according to their shutdown
300 // mode.
301 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
302 // Start tasks to take all the threads and block them.
303 EnsureAllWorkersCreated();
304 base::WaitableEvent background_event(false, false);
305 for (size_t i = 0; i < kNumWorkerThreads; i++) {
306 pool().PostWorkerTask(FROM_HERE,
307 base::Bind(&TestTracker::BlockOnEventTask,
308 tracker(), i, &background_event));
309 }
310 tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads);
311
312 // Create some tasks with different shutdown modes.
313 pool().PostWorkerTaskWithShutdownBehavior(
314 FROM_HERE,
315 base::Bind(&TestTracker::FastTask, tracker(), 100),
316 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
317 pool().PostWorkerTaskWithShutdownBehavior(
318 FROM_HERE,
319 base::Bind(&TestTracker::FastTask, tracker(), 101),
320 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
321 pool().PostWorkerTaskWithShutdownBehavior(
322 FROM_HERE,
323 base::Bind(&TestTracker::FastTask, tracker(), 102),
324 SequencedWorkerPool::BLOCK_SHUTDOWN);
325
326 // Shutdown the worker pool. This should discard all non-blocking tasks.
327 before_wait_for_shutdown_ =
328 base::Bind(&EnsureTasksToCompleteCountAndSignal,
329 scoped_refptr<TestTracker>(tracker()), 0,
330 &background_event, kNumWorkerThreads);
331 pool().Shutdown();
332
333 std::vector<int> result = tracker()->WaitUntilTasksComplete(4);
334
335 // Tge kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
336 // one, in no particular order.
337 ASSERT_EQ(4u, result.size());
338 for (size_t i = 0; i < kNumWorkerThreads; i++) {
339 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
340 result.end());
341 }
342 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
343 }
344
345 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
346 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
347 EnsureAllWorkersCreated();
348 base::WaitableEvent background_event(false, false);
349 pool().PostWorkerTaskWithShutdownBehavior(
350 FROM_HERE,
351 base::Bind(&TestTracker::BlockOnEventTask,
352 tracker(), 0, &background_event),
353 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
354 tracker()->WaitUntilEventTasksBlocked(1);
355
356 // This should not block. If this test hangs, it means it failed.
357 pool().Shutdown();
358
359 // The task should not have completed yet.
360 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
361
362 // Posting more tasks should fail.
363 EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior(
364 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
365 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
366
367 // Continue the background thread and make sure the task can complete.
368 background_event.Signal();
369 std::vector<int> result = tracker()->WaitUntilTasksComplete(1);
370 EXPECT_EQ(1u, result.size());
371 }
372
373 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698