OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include <algorithm> | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/memory/ref_counted.h" | |
9 #include "base/synchronization/condition_variable.h" | |
10 #include "base/synchronization/lock.h" | |
11 #include "base/synchronization/waitable_event.h" | |
12 #include "base/threading/platform_thread.h" | |
13 #include "base/threading/sequenced_worker_pool.h" | |
14 #include "testing/gtest/include/gtest/gtest.h" | |
15 | |
16 namespace base { | |
17 | |
18 // IMPORTANT NOTE: | |
19 // | |
20 // Many of these tests have failure modes where they'll hang forever. These | |
21 // tests should not be flaky, and hangling indicates a type of failure. Do not | |
22 // mark as flaky if they're hanging, it's likely an actual bug. | |
23 | |
24 namespace { | |
25 | |
26 const size_t kNumWorkerThreads = 3; | |
27 | |
28 class TestTracker : public base::RefCountedThreadSafe<TestTracker> { | |
29 public: | |
30 TestTracker() | |
31 : lock_(), | |
32 cond_var_(&lock_), | |
33 completed_event_(false, false), | |
34 started_events_(0) { | |
35 } | |
36 | |
37 // Each of these tasks appends the argument to the complete sequence vector | |
38 // so calling code can see what order they finished in. | |
39 void FastTask(int id) { | |
40 SignalWorkerDone(id); | |
41 } | |
42 void SlowTask(int id) { | |
43 base::PlatformThread::Sleep(1000); | |
44 SignalWorkerDone(id); | |
45 } | |
46 void BlockOnEventTask(int id, base::WaitableEvent* event) { | |
47 // Note that this task has started and signal anwaybody waiting for that | |
jar (doing other things)
2011/12/29 18:15:45
nit: anwaybody-->anybody
| |
48 // to happen. | |
49 { | |
50 base::AutoLock lock(lock_); | |
51 started_events_++; | |
52 } | |
53 cond_var_.Signal(); | |
54 | |
jar (doing other things)
2011/12/29 18:15:45
With murphy threads, all the tasks *could* pause a
brettw
2011/12/29 18:23:17
Ah, good catch. I'll fix this.
| |
55 event->Wait(); | |
56 SignalWorkerDone(id); | |
57 } | |
58 | |
59 // Waits until the given number of tasks have started executing. | |
60 void WaitUntilEventTasksBlocked(int count) { | |
61 base::AutoLock lock(lock_); | |
62 while (started_events_ < count) | |
63 cond_var_.Wait(); | |
64 } | |
65 | |
66 // Blocks the current thread until at least the given number of tasks are in | |
67 // the completed vector, and then returns a copy. | |
68 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { | |
69 for (;;) { | |
70 { | |
71 base::AutoLock lock(lock_); | |
72 if (complete_sequence_.size() >= num_tasks) | |
73 return complete_sequence_; | |
74 } | |
75 completed_event_.Wait(); | |
76 } | |
77 } | |
78 | |
79 void ClearCompleteSequence() { | |
80 base::AutoLock lock(lock_); | |
81 complete_sequence_.clear(); | |
82 started_events_ = 0; | |
83 } | |
84 | |
85 private: | |
86 void SignalWorkerDone(int id) { | |
87 base::AutoLock lock(lock_); | |
88 complete_sequence_.push_back(id); | |
89 completed_event_.Signal(); | |
90 } | |
91 | |
92 // Protects the complete_sequence. | |
93 base::Lock lock_; | |
94 | |
95 base::ConditionVariable cond_var_; | |
96 | |
97 // Signaled every time something is posted to the complete_sequence_. | |
98 base::WaitableEvent completed_event_; | |
99 | |
100 // Protected by lock_. | |
101 std::vector<int> complete_sequence_; | |
102 | |
103 int started_events_; | |
104 }; | |
105 | |
106 class SequencedWorkerPoolTest : public testing::Test, | |
107 public SequencedWorkerPool::TestingObserver { | |
108 public: | |
109 SequencedWorkerPoolTest() | |
110 : pool_(kNumWorkerThreads, "test"), | |
111 tracker_(new TestTracker) { | |
112 pool_.SetTestingObserver(this); | |
113 } | |
114 ~SequencedWorkerPoolTest() { | |
115 } | |
116 | |
117 virtual void SetUp() { | |
118 } | |
119 virtual void TearDown() { | |
120 pool_.Shutdown(); | |
121 } | |
122 | |
123 SequencedWorkerPool& pool() { return pool_; } | |
124 TestTracker* tracker() { return tracker_.get(); } | |
125 | |
126 // Ensures that the given number of worker threads is created by adding | |
127 // tasks and waiting until they complete. Worker thread creation is | |
128 // serialized, can happen on background threads asynchronously, and doesn't | |
129 // happen any more at shutdown. This means that if a test posts a bunch of | |
130 // tasks and calls shutdown, fewer workers will be created than the test may | |
131 // expect. | |
132 // | |
133 // This function ensures that this condition can't happen so tests can make | |
134 // assumptions about the number of workers active. See the comment in | |
135 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more | |
136 // details. | |
137 // | |
138 // It will post tasks to the queue with id -1. It also assumes this is the | |
139 // first thing called in a test since it will clear the complete_sequence_. | |
140 void EnsureAllWorkersCreated() { | |
141 // Create a bunch of threads, all waiting for an event. This will cause | |
142 // that may workers to be created. | |
143 base::WaitableEvent event(false, false); | |
144 for (size_t i = 0; i < kNumWorkerThreads; i++) { | |
145 pool().PostWorkerTask(FROM_HERE, | |
146 base::Bind(&TestTracker::BlockOnEventTask, | |
147 tracker(), -1, &event)); | |
148 } | |
149 tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads); | |
150 | |
151 // Now signal all the workers and wait until they're done. | |
152 for (size_t i = 0; i < kNumWorkerThreads; i++) | |
153 event.Signal(); | |
jar (doing other things)
2011/12/29 18:15:45
As noted above (see line 54), this won't consisten
| |
154 tracker()->WaitUntilTasksComplete(kNumWorkerThreads); | |
155 | |
156 // Clean up the task IDs we added. | |
157 tracker()->ClearCompleteSequence(); | |
158 } | |
159 | |
160 protected: | |
161 // This closure will be executed right before the pool blocks on shutdown. | |
162 base::Closure before_wait_for_shutdown_; | |
163 | |
164 private: | |
165 // SequencedWorkerPool::TestingObserver implementation. | |
166 virtual void WillWaitForShutdown() { | |
167 if (!before_wait_for_shutdown_.is_null()) | |
168 before_wait_for_shutdown_.Run(); | |
169 } | |
170 | |
171 SequencedWorkerPool pool_; | |
172 scoped_refptr<TestTracker> tracker_; | |
173 }; | |
174 | |
175 // Checks that the given number of entries are in the tasks to complete of | |
176 // the given tracker, and then signals the given event the given number of | |
177 // times. This is used to wakt up blocked background threads before blocking | |
178 // on shutdown. | |
179 void EnsureTasksToCompleteCountAndSignal(scoped_refptr<TestTracker> tracker, | |
180 size_t expected_tasks_to_complete, | |
181 base::WaitableEvent* event, | |
182 size_t times_to_signal_event) { | |
183 EXPECT_EQ( | |
184 expected_tasks_to_complete, | |
185 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); | |
186 | |
187 for (size_t i = 0; i < times_to_signal_event; i++) | |
188 event->Signal(); | |
jar (doing other things)
2011/12/29 18:15:45
I don't think event contains a counter, and hence
| |
189 } | |
190 | |
191 } // namespace | |
192 | |
193 // Tests that same-named tokens have the same ID. | |
194 TEST_F(SequencedWorkerPoolTest, NamedTokens) { | |
195 const std::string name1("hello"); | |
196 SequencedWorkerPool::SequenceToken token1 = | |
197 pool().GetNamedSequenceToken(name1); | |
198 | |
199 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); | |
200 | |
201 const std::string name3("goodbye"); | |
202 SequencedWorkerPool::SequenceToken token3 = | |
203 pool().GetNamedSequenceToken(name3); | |
204 | |
205 // All 3 tokens should be different. | |
206 EXPECT_FALSE(token1.Equals(token2)); | |
207 EXPECT_FALSE(token1.Equals(token3)); | |
208 EXPECT_FALSE(token2.Equals(token3)); | |
209 | |
210 // Requesting the same name again should give the same value. | |
211 SequencedWorkerPool::SequenceToken token1again = | |
212 pool().GetNamedSequenceToken(name1); | |
213 EXPECT_TRUE(token1.Equals(token1again)); | |
214 | |
215 SequencedWorkerPool::SequenceToken token3again = | |
216 pool().GetNamedSequenceToken(name3); | |
217 EXPECT_TRUE(token3.Equals(token3again)); | |
218 } | |
219 | |
220 // Tests that posting a bunch of tasks (many more than the number of worker | |
221 // threads) runs them all. | |
222 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { | |
223 pool().PostWorkerTask(FROM_HERE, | |
224 base::Bind(&TestTracker::SlowTask, tracker(), 0)); | |
225 | |
226 const size_t kNumTasks = 20; | |
227 for (size_t i = 1; i < kNumTasks; i++) { | |
228 pool().PostWorkerTask(FROM_HERE, | |
229 base::Bind(&TestTracker::FastTask, tracker(), i)); | |
230 } | |
231 | |
232 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); | |
233 EXPECT_EQ(kNumTasks, result.size()); | |
234 } | |
235 | |
236 // Test that tasks with the same sequence token are executed in order but don't | |
237 // affect other tasks. | |
238 TEST_F(SequencedWorkerPoolTest, Sequence) { | |
239 // Fill all the worker threads except one. | |
240 base::WaitableEvent background_event(false, false); | |
241 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; | |
242 for (size_t i = 0; i < kNumBackgroundTasks; i++) { | |
243 pool().PostWorkerTask(FROM_HERE, | |
244 base::Bind(&TestTracker::BlockOnEventTask, | |
245 tracker(), i, &background_event)); | |
246 } | |
247 tracker()->WaitUntilEventTasksBlocked(kNumBackgroundTasks); | |
248 | |
249 // Create two tasks with the same sequence token, one that will block on the | |
250 // event, and one which will just complete quickly when it's run. Since there | |
251 // is one worker thread free, the first task will start and then block, and | |
252 // the second task should be waiting. | |
253 base::WaitableEvent event1(false, false); | |
254 SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken(); | |
255 pool().PostSequencedWorkerTask( | |
256 token1, FROM_HERE, | |
257 base::Bind(&TestTracker::BlockOnEventTask, tracker(), 100, | |
258 &event1)); | |
259 pool().PostSequencedWorkerTask( | |
260 token1, FROM_HERE, | |
261 base::Bind(&TestTracker::FastTask, tracker(), 101)); | |
262 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); | |
263 | |
264 // Create another two tasks as above with a different token. These will be | |
265 // blocked since there are no slots to run. | |
266 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); | |
267 pool().PostSequencedWorkerTask( | |
268 token2, FROM_HERE, | |
269 base::Bind(&TestTracker::FastTask, tracker(), 200)); | |
270 pool().PostSequencedWorkerTask( | |
271 token2, FROM_HERE, | |
272 base::Bind(&TestTracker::FastTask, tracker(), 201)); | |
273 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); | |
274 | |
275 // Let one background task complete. This should then let both tasks of | |
276 // token2 run to completion in order. The second task of token1 should still | |
277 // be blocked. | |
278 background_event.Signal(); | |
279 std::vector<int> result = tracker()->WaitUntilTasksComplete(3); | |
280 ASSERT_EQ(3u, result.size()); | |
281 EXPECT_EQ(200, result[1]); | |
282 EXPECT_EQ(201, result[2]); | |
283 | |
284 // Finish the rest of the background tasks. This should leave some workers | |
285 // free with the second token1 task still blocked on the first. | |
286 for (size_t i = 0; i < kNumBackgroundTasks - 1; i++) | |
287 background_event.Signal(); | |
288 EXPECT_EQ(kNumBackgroundTasks + 2, | |
289 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); | |
290 | |
291 // Allow the first task of token1 to complete. This should run the second. | |
292 event1.Signal(); | |
293 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); | |
294 ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); | |
295 EXPECT_EQ(100, result[result.size() - 2]); | |
296 EXPECT_EQ(101, result[result.size() - 1]); | |
297 } | |
298 | |
299 // Tests that unrun tasks are discarded properly according to their shutdown | |
300 // mode. | |
301 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { | |
302 // Start tasks to take all the threads and block them. | |
303 EnsureAllWorkersCreated(); | |
304 base::WaitableEvent background_event(false, false); | |
305 for (size_t i = 0; i < kNumWorkerThreads; i++) { | |
306 pool().PostWorkerTask(FROM_HERE, | |
307 base::Bind(&TestTracker::BlockOnEventTask, | |
308 tracker(), i, &background_event)); | |
309 } | |
310 tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads); | |
311 | |
312 // Create some tasks with different shutdown modes. | |
313 pool().PostWorkerTaskWithShutdownBehavior( | |
314 FROM_HERE, | |
315 base::Bind(&TestTracker::FastTask, tracker(), 100), | |
316 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); | |
317 pool().PostWorkerTaskWithShutdownBehavior( | |
318 FROM_HERE, | |
319 base::Bind(&TestTracker::FastTask, tracker(), 101), | |
320 SequencedWorkerPool::SKIP_ON_SHUTDOWN); | |
321 pool().PostWorkerTaskWithShutdownBehavior( | |
322 FROM_HERE, | |
323 base::Bind(&TestTracker::FastTask, tracker(), 102), | |
324 SequencedWorkerPool::BLOCK_SHUTDOWN); | |
325 | |
326 // Shutdown the worker pool. This should discard all non-blocking tasks. | |
327 before_wait_for_shutdown_ = | |
328 base::Bind(&EnsureTasksToCompleteCountAndSignal, | |
329 scoped_refptr<TestTracker>(tracker()), 0, | |
330 &background_event, kNumWorkerThreads); | |
331 pool().Shutdown(); | |
332 | |
333 std::vector<int> result = tracker()->WaitUntilTasksComplete(4); | |
334 | |
335 // Tge kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN | |
336 // one, in no particular order. | |
337 ASSERT_EQ(4u, result.size()); | |
338 for (size_t i = 0; i < kNumWorkerThreads; i++) { | |
339 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != | |
340 result.end()); | |
341 } | |
342 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); | |
343 } | |
344 | |
345 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. | |
346 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { | |
347 EnsureAllWorkersCreated(); | |
348 base::WaitableEvent background_event(false, false); | |
349 pool().PostWorkerTaskWithShutdownBehavior( | |
350 FROM_HERE, | |
351 base::Bind(&TestTracker::BlockOnEventTask, | |
352 tracker(), 0, &background_event), | |
353 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); | |
354 tracker()->WaitUntilEventTasksBlocked(1); | |
355 | |
356 // This should not block. If this test hangs, it means it failed. | |
357 pool().Shutdown(); | |
358 | |
359 // The task should not have completed yet. | |
360 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); | |
361 | |
362 // Posting more tasks should fail. | |
363 EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior( | |
364 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), | |
365 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); | |
366 | |
367 // Continue the background thread and make sure the task can complete. | |
368 background_event.Signal(); | |
369 std::vector<int> result = tracker()->WaitUntilTasksComplete(1); | |
370 EXPECT_EQ(1u, result.size()); | |
371 } | |
372 | |
373 } // namespace base | |
OLD | NEW |