Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include <algorithm> | |
| 6 | |
| 7 #include "base/bind.h" | |
| 8 #include "base/memory/ref_counted.h" | |
| 9 #include "base/synchronization/condition_variable.h" | |
| 10 #include "base/synchronization/lock.h" | |
| 11 #include "base/synchronization/waitable_event.h" | |
| 12 #include "base/threading/platform_thread.h" | |
| 13 #include "base/threading/sequenced_worker_pool.h" | |
| 14 #include "testing/gtest/include/gtest/gtest.h" | |
| 15 | |
| 16 namespace base { | |
| 17 | |
| 18 // IMPORTANT NOTE: | |
| 19 // | |
| 20 // Many of these tests have failure modes where they'll hang forever. These | |
| 21 // tests should not be flaky, and hangling indicates a type of failure. Do not | |
| 22 // mark as flaky if they're hanging, it's likely an actual bug. | |
| 23 | |
| 24 namespace { | |
| 25 | |
| 26 const size_t kNumWorkerThreads = 3; | |
| 27 | |
| 28 class TestTracker : public base::RefCountedThreadSafe<TestTracker> { | |
| 29 public: | |
| 30 TestTracker() | |
| 31 : lock_(), | |
| 32 cond_var_(&lock_), | |
| 33 completed_event_(false, false), | |
| 34 started_events_(0) { | |
| 35 } | |
| 36 | |
| 37 // Each of these tasks appends the argument to the complete sequence vector | |
| 38 // so calling code can see what order they finished in. | |
| 39 void FastTask(int id) { | |
| 40 SignalWorkerDone(id); | |
| 41 } | |
| 42 void SlowTask(int id) { | |
| 43 base::PlatformThread::Sleep(1000); | |
| 44 SignalWorkerDone(id); | |
| 45 } | |
| 46 void BlockOnEventTask(int id, base::WaitableEvent* event) { | |
| 47 // Note that this task has started and signal anwaybody waiting for that | |
|
jar (doing other things)
2011/12/29 18:15:45
nit: anwaybody-->anybody
| |
| 48 // to happen. | |
| 49 { | |
| 50 base::AutoLock lock(lock_); | |
| 51 started_events_++; | |
| 52 } | |
| 53 cond_var_.Signal(); | |
| 54 | |
|
jar (doing other things)
2011/12/29 18:15:45
With murphy threads, all the tasks *could* pause a
brettw
2011/12/29 18:23:17
Ah, good catch. I'll fix this.
| |
| 55 event->Wait(); | |
| 56 SignalWorkerDone(id); | |
| 57 } | |
| 58 | |
| 59 // Waits until the given number of tasks have started executing. | |
| 60 void WaitUntilEventTasksBlocked(int count) { | |
| 61 base::AutoLock lock(lock_); | |
| 62 while (started_events_ < count) | |
| 63 cond_var_.Wait(); | |
| 64 } | |
| 65 | |
| 66 // Blocks the current thread until at least the given number of tasks are in | |
| 67 // the completed vector, and then returns a copy. | |
| 68 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { | |
| 69 for (;;) { | |
| 70 { | |
| 71 base::AutoLock lock(lock_); | |
| 72 if (complete_sequence_.size() >= num_tasks) | |
| 73 return complete_sequence_; | |
| 74 } | |
| 75 completed_event_.Wait(); | |
| 76 } | |
| 77 } | |
| 78 | |
| 79 void ClearCompleteSequence() { | |
| 80 base::AutoLock lock(lock_); | |
| 81 complete_sequence_.clear(); | |
| 82 started_events_ = 0; | |
| 83 } | |
| 84 | |
| 85 private: | |
| 86 void SignalWorkerDone(int id) { | |
| 87 base::AutoLock lock(lock_); | |
| 88 complete_sequence_.push_back(id); | |
| 89 completed_event_.Signal(); | |
| 90 } | |
| 91 | |
| 92 // Protects the complete_sequence. | |
| 93 base::Lock lock_; | |
| 94 | |
| 95 base::ConditionVariable cond_var_; | |
| 96 | |
| 97 // Signaled every time something is posted to the complete_sequence_. | |
| 98 base::WaitableEvent completed_event_; | |
| 99 | |
| 100 // Protected by lock_. | |
| 101 std::vector<int> complete_sequence_; | |
| 102 | |
| 103 int started_events_; | |
| 104 }; | |
| 105 | |
| 106 class SequencedWorkerPoolTest : public testing::Test, | |
| 107 public SequencedWorkerPool::TestingObserver { | |
| 108 public: | |
| 109 SequencedWorkerPoolTest() | |
| 110 : pool_(kNumWorkerThreads, "test"), | |
| 111 tracker_(new TestTracker) { | |
| 112 pool_.SetTestingObserver(this); | |
| 113 } | |
| 114 ~SequencedWorkerPoolTest() { | |
| 115 } | |
| 116 | |
| 117 virtual void SetUp() { | |
| 118 } | |
| 119 virtual void TearDown() { | |
| 120 pool_.Shutdown(); | |
| 121 } | |
| 122 | |
| 123 SequencedWorkerPool& pool() { return pool_; } | |
| 124 TestTracker* tracker() { return tracker_.get(); } | |
| 125 | |
| 126 // Ensures that the given number of worker threads is created by adding | |
| 127 // tasks and waiting until they complete. Worker thread creation is | |
| 128 // serialized, can happen on background threads asynchronously, and doesn't | |
| 129 // happen any more at shutdown. This means that if a test posts a bunch of | |
| 130 // tasks and calls shutdown, fewer workers will be created than the test may | |
| 131 // expect. | |
| 132 // | |
| 133 // This function ensures that this condition can't happen so tests can make | |
| 134 // assumptions about the number of workers active. See the comment in | |
| 135 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more | |
| 136 // details. | |
| 137 // | |
| 138 // It will post tasks to the queue with id -1. It also assumes this is the | |
| 139 // first thing called in a test since it will clear the complete_sequence_. | |
| 140 void EnsureAllWorkersCreated() { | |
| 141 // Create a bunch of threads, all waiting for an event. This will cause | |
| 142 // that may workers to be created. | |
| 143 base::WaitableEvent event(false, false); | |
| 144 for (size_t i = 0; i < kNumWorkerThreads; i++) { | |
| 145 pool().PostWorkerTask(FROM_HERE, | |
| 146 base::Bind(&TestTracker::BlockOnEventTask, | |
| 147 tracker(), -1, &event)); | |
| 148 } | |
| 149 tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads); | |
| 150 | |
| 151 // Now signal all the workers and wait until they're done. | |
| 152 for (size_t i = 0; i < kNumWorkerThreads; i++) | |
| 153 event.Signal(); | |
|
jar (doing other things)
2011/12/29 18:15:45
As noted above (see line 54), this won't consisten
| |
| 154 tracker()->WaitUntilTasksComplete(kNumWorkerThreads); | |
| 155 | |
| 156 // Clean up the task IDs we added. | |
| 157 tracker()->ClearCompleteSequence(); | |
| 158 } | |
| 159 | |
| 160 protected: | |
| 161 // This closure will be executed right before the pool blocks on shutdown. | |
| 162 base::Closure before_wait_for_shutdown_; | |
| 163 | |
| 164 private: | |
| 165 // SequencedWorkerPool::TestingObserver implementation. | |
| 166 virtual void WillWaitForShutdown() { | |
| 167 if (!before_wait_for_shutdown_.is_null()) | |
| 168 before_wait_for_shutdown_.Run(); | |
| 169 } | |
| 170 | |
| 171 SequencedWorkerPool pool_; | |
| 172 scoped_refptr<TestTracker> tracker_; | |
| 173 }; | |
| 174 | |
| 175 // Checks that the given number of entries are in the tasks to complete of | |
| 176 // the given tracker, and then signals the given event the given number of | |
| 177 // times. This is used to wakt up blocked background threads before blocking | |
| 178 // on shutdown. | |
| 179 void EnsureTasksToCompleteCountAndSignal(scoped_refptr<TestTracker> tracker, | |
| 180 size_t expected_tasks_to_complete, | |
| 181 base::WaitableEvent* event, | |
| 182 size_t times_to_signal_event) { | |
| 183 EXPECT_EQ( | |
| 184 expected_tasks_to_complete, | |
| 185 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); | |
| 186 | |
| 187 for (size_t i = 0; i < times_to_signal_event; i++) | |
| 188 event->Signal(); | |
|
jar (doing other things)
2011/12/29 18:15:45
I don't think event contains a counter, and hence
| |
| 189 } | |
| 190 | |
| 191 } // namespace | |
| 192 | |
| 193 // Tests that same-named tokens have the same ID. | |
| 194 TEST_F(SequencedWorkerPoolTest, NamedTokens) { | |
| 195 const std::string name1("hello"); | |
| 196 SequencedWorkerPool::SequenceToken token1 = | |
| 197 pool().GetNamedSequenceToken(name1); | |
| 198 | |
| 199 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); | |
| 200 | |
| 201 const std::string name3("goodbye"); | |
| 202 SequencedWorkerPool::SequenceToken token3 = | |
| 203 pool().GetNamedSequenceToken(name3); | |
| 204 | |
| 205 // All 3 tokens should be different. | |
| 206 EXPECT_FALSE(token1.Equals(token2)); | |
| 207 EXPECT_FALSE(token1.Equals(token3)); | |
| 208 EXPECT_FALSE(token2.Equals(token3)); | |
| 209 | |
| 210 // Requesting the same name again should give the same value. | |
| 211 SequencedWorkerPool::SequenceToken token1again = | |
| 212 pool().GetNamedSequenceToken(name1); | |
| 213 EXPECT_TRUE(token1.Equals(token1again)); | |
| 214 | |
| 215 SequencedWorkerPool::SequenceToken token3again = | |
| 216 pool().GetNamedSequenceToken(name3); | |
| 217 EXPECT_TRUE(token3.Equals(token3again)); | |
| 218 } | |
| 219 | |
| 220 // Tests that posting a bunch of tasks (many more than the number of worker | |
| 221 // threads) runs them all. | |
| 222 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { | |
| 223 pool().PostWorkerTask(FROM_HERE, | |
| 224 base::Bind(&TestTracker::SlowTask, tracker(), 0)); | |
| 225 | |
| 226 const size_t kNumTasks = 20; | |
| 227 for (size_t i = 1; i < kNumTasks; i++) { | |
| 228 pool().PostWorkerTask(FROM_HERE, | |
| 229 base::Bind(&TestTracker::FastTask, tracker(), i)); | |
| 230 } | |
| 231 | |
| 232 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); | |
| 233 EXPECT_EQ(kNumTasks, result.size()); | |
| 234 } | |
| 235 | |
| 236 // Test that tasks with the same sequence token are executed in order but don't | |
| 237 // affect other tasks. | |
| 238 TEST_F(SequencedWorkerPoolTest, Sequence) { | |
| 239 // Fill all the worker threads except one. | |
| 240 base::WaitableEvent background_event(false, false); | |
| 241 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; | |
| 242 for (size_t i = 0; i < kNumBackgroundTasks; i++) { | |
| 243 pool().PostWorkerTask(FROM_HERE, | |
| 244 base::Bind(&TestTracker::BlockOnEventTask, | |
| 245 tracker(), i, &background_event)); | |
| 246 } | |
| 247 tracker()->WaitUntilEventTasksBlocked(kNumBackgroundTasks); | |
| 248 | |
| 249 // Create two tasks with the same sequence token, one that will block on the | |
| 250 // event, and one which will just complete quickly when it's run. Since there | |
| 251 // is one worker thread free, the first task will start and then block, and | |
| 252 // the second task should be waiting. | |
| 253 base::WaitableEvent event1(false, false); | |
| 254 SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken(); | |
| 255 pool().PostSequencedWorkerTask( | |
| 256 token1, FROM_HERE, | |
| 257 base::Bind(&TestTracker::BlockOnEventTask, tracker(), 100, | |
| 258 &event1)); | |
| 259 pool().PostSequencedWorkerTask( | |
| 260 token1, FROM_HERE, | |
| 261 base::Bind(&TestTracker::FastTask, tracker(), 101)); | |
| 262 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); | |
| 263 | |
| 264 // Create another two tasks as above with a different token. These will be | |
| 265 // blocked since there are no slots to run. | |
| 266 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); | |
| 267 pool().PostSequencedWorkerTask( | |
| 268 token2, FROM_HERE, | |
| 269 base::Bind(&TestTracker::FastTask, tracker(), 200)); | |
| 270 pool().PostSequencedWorkerTask( | |
| 271 token2, FROM_HERE, | |
| 272 base::Bind(&TestTracker::FastTask, tracker(), 201)); | |
| 273 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); | |
| 274 | |
| 275 // Let one background task complete. This should then let both tasks of | |
| 276 // token2 run to completion in order. The second task of token1 should still | |
| 277 // be blocked. | |
| 278 background_event.Signal(); | |
| 279 std::vector<int> result = tracker()->WaitUntilTasksComplete(3); | |
| 280 ASSERT_EQ(3u, result.size()); | |
| 281 EXPECT_EQ(200, result[1]); | |
| 282 EXPECT_EQ(201, result[2]); | |
| 283 | |
| 284 // Finish the rest of the background tasks. This should leave some workers | |
| 285 // free with the second token1 task still blocked on the first. | |
| 286 for (size_t i = 0; i < kNumBackgroundTasks - 1; i++) | |
| 287 background_event.Signal(); | |
| 288 EXPECT_EQ(kNumBackgroundTasks + 2, | |
| 289 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); | |
| 290 | |
| 291 // Allow the first task of token1 to complete. This should run the second. | |
| 292 event1.Signal(); | |
| 293 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); | |
| 294 ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); | |
| 295 EXPECT_EQ(100, result[result.size() - 2]); | |
| 296 EXPECT_EQ(101, result[result.size() - 1]); | |
| 297 } | |
| 298 | |
| 299 // Tests that unrun tasks are discarded properly according to their shutdown | |
| 300 // mode. | |
| 301 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { | |
| 302 // Start tasks to take all the threads and block them. | |
| 303 EnsureAllWorkersCreated(); | |
| 304 base::WaitableEvent background_event(false, false); | |
| 305 for (size_t i = 0; i < kNumWorkerThreads; i++) { | |
| 306 pool().PostWorkerTask(FROM_HERE, | |
| 307 base::Bind(&TestTracker::BlockOnEventTask, | |
| 308 tracker(), i, &background_event)); | |
| 309 } | |
| 310 tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads); | |
| 311 | |
| 312 // Create some tasks with different shutdown modes. | |
| 313 pool().PostWorkerTaskWithShutdownBehavior( | |
| 314 FROM_HERE, | |
| 315 base::Bind(&TestTracker::FastTask, tracker(), 100), | |
| 316 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); | |
| 317 pool().PostWorkerTaskWithShutdownBehavior( | |
| 318 FROM_HERE, | |
| 319 base::Bind(&TestTracker::FastTask, tracker(), 101), | |
| 320 SequencedWorkerPool::SKIP_ON_SHUTDOWN); | |
| 321 pool().PostWorkerTaskWithShutdownBehavior( | |
| 322 FROM_HERE, | |
| 323 base::Bind(&TestTracker::FastTask, tracker(), 102), | |
| 324 SequencedWorkerPool::BLOCK_SHUTDOWN); | |
| 325 | |
| 326 // Shutdown the worker pool. This should discard all non-blocking tasks. | |
| 327 before_wait_for_shutdown_ = | |
| 328 base::Bind(&EnsureTasksToCompleteCountAndSignal, | |
| 329 scoped_refptr<TestTracker>(tracker()), 0, | |
| 330 &background_event, kNumWorkerThreads); | |
| 331 pool().Shutdown(); | |
| 332 | |
| 333 std::vector<int> result = tracker()->WaitUntilTasksComplete(4); | |
| 334 | |
| 335 // Tge kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN | |
| 336 // one, in no particular order. | |
| 337 ASSERT_EQ(4u, result.size()); | |
| 338 for (size_t i = 0; i < kNumWorkerThreads; i++) { | |
| 339 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != | |
| 340 result.end()); | |
| 341 } | |
| 342 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); | |
| 343 } | |
| 344 | |
| 345 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. | |
| 346 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { | |
| 347 EnsureAllWorkersCreated(); | |
| 348 base::WaitableEvent background_event(false, false); | |
| 349 pool().PostWorkerTaskWithShutdownBehavior( | |
| 350 FROM_HERE, | |
| 351 base::Bind(&TestTracker::BlockOnEventTask, | |
| 352 tracker(), 0, &background_event), | |
| 353 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); | |
| 354 tracker()->WaitUntilEventTasksBlocked(1); | |
| 355 | |
| 356 // This should not block. If this test hangs, it means it failed. | |
| 357 pool().Shutdown(); | |
| 358 | |
| 359 // The task should not have completed yet. | |
| 360 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); | |
| 361 | |
| 362 // Posting more tasks should fail. | |
| 363 EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior( | |
| 364 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), | |
| 365 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); | |
| 366 | |
| 367 // Continue the background thread and make sure the task can complete. | |
| 368 background_event.Signal(); | |
| 369 std::vector<int> result = tracker()->WaitUntilTasksComplete(1); | |
| 370 EXPECT_EQ(1u, result.size()); | |
| 371 } | |
| 372 | |
| 373 } // namespace base | |
| OLD | NEW |