OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include <algorithm> |
| 6 |
| 7 #include "base/bind.h" |
| 8 #include "base/memory/ref_counted.h" |
| 9 #include "base/synchronization/condition_variable.h" |
| 10 #include "base/synchronization/lock.h" |
| 11 #include "base/threading/platform_thread.h" |
| 12 #include "base/threading/sequenced_worker_pool.h" |
| 13 #include "testing/gtest/include/gtest/gtest.h" |
| 14 |
| 15 namespace base { |
| 16 |
| 17 // IMPORTANT NOTE: |
| 18 // |
| 19 // Many of these tests have failure modes where they'll hang forever. These |
| 20 // tests should not be flaky, and hangling indicates a type of failure. Do not |
| 21 // mark as flaky if they're hanging, it's likely an actual bug. |
| 22 |
| 23 namespace { |
| 24 |
| 25 const size_t kNumWorkerThreads = 3; |
| 26 |
| 27 // Allows a number of threads to all be blocked on the same event, and |
| 28 // provides a way to unblock a certain number of them. |
| 29 class ThreadBlocker { |
| 30 public: |
| 31 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) { |
| 32 } |
| 33 |
| 34 void Block() { |
| 35 { |
| 36 base::AutoLock lock(lock_); |
| 37 while (unblock_counter_ == 0) |
| 38 cond_var_.Wait(); |
| 39 unblock_counter_--; |
| 40 } |
| 41 cond_var_.Signal(); |
| 42 } |
| 43 |
| 44 void Unblock(size_t count) { |
| 45 { |
| 46 base::AutoLock lock(lock_); |
| 47 DCHECK(unblock_counter_ == 0); |
| 48 unblock_counter_ = count; |
| 49 } |
| 50 cond_var_.Signal(); |
| 51 } |
| 52 |
| 53 private: |
| 54 base::Lock lock_; |
| 55 base::ConditionVariable cond_var_; |
| 56 |
| 57 size_t unblock_counter_; |
| 58 }; |
| 59 |
| 60 class TestTracker : public base::RefCountedThreadSafe<TestTracker> { |
| 61 public: |
| 62 TestTracker() |
| 63 : lock_(), |
| 64 cond_var_(&lock_), |
| 65 started_events_(0) { |
| 66 } |
| 67 |
| 68 // Each of these tasks appends the argument to the complete sequence vector |
| 69 // so calling code can see what order they finished in. |
| 70 void FastTask(int id) { |
| 71 SignalWorkerDone(id); |
| 72 } |
| 73 void SlowTask(int id) { |
| 74 base::PlatformThread::Sleep(1000); |
| 75 SignalWorkerDone(id); |
| 76 } |
| 77 |
| 78 void BlockTask(int id, ThreadBlocker* blocker) { |
| 79 // Note that this task has started and signal anybody waiting for that |
| 80 // to happen. |
| 81 { |
| 82 base::AutoLock lock(lock_); |
| 83 started_events_++; |
| 84 } |
| 85 cond_var_.Signal(); |
| 86 |
| 87 blocker->Block(); |
| 88 SignalWorkerDone(id); |
| 89 } |
| 90 |
| 91 // Waits until the given number of tasks have started executing. |
| 92 void WaitUntilTasksBlocked(size_t count) { |
| 93 { |
| 94 base::AutoLock lock(lock_); |
| 95 while (started_events_ < count) |
| 96 cond_var_.Wait(); |
| 97 } |
| 98 cond_var_.Signal(); |
| 99 } |
| 100 |
| 101 // Blocks the current thread until at least the given number of tasks are in |
| 102 // the completed vector, and then returns a copy. |
| 103 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { |
| 104 std::vector<int> ret; |
| 105 { |
| 106 base::AutoLock lock(lock_); |
| 107 while (complete_sequence_.size() < num_tasks) |
| 108 cond_var_.Wait(); |
| 109 ret = complete_sequence_; |
| 110 } |
| 111 cond_var_.Signal(); |
| 112 return ret; |
| 113 } |
| 114 |
| 115 void ClearCompleteSequence() { |
| 116 base::AutoLock lock(lock_); |
| 117 complete_sequence_.clear(); |
| 118 started_events_ = 0; |
| 119 } |
| 120 |
| 121 private: |
| 122 void SignalWorkerDone(int id) { |
| 123 { |
| 124 base::AutoLock lock(lock_); |
| 125 complete_sequence_.push_back(id); |
| 126 } |
| 127 cond_var_.Signal(); |
| 128 } |
| 129 |
| 130 // Protects the complete_sequence. |
| 131 base::Lock lock_; |
| 132 |
| 133 base::ConditionVariable cond_var_; |
| 134 |
| 135 // Protected by lock_. |
| 136 std::vector<int> complete_sequence_; |
| 137 |
| 138 // Counter of the number of "block" workers that have started. |
| 139 size_t started_events_; |
| 140 }; |
| 141 |
| 142 class SequencedWorkerPoolTest : public testing::Test, |
| 143 public SequencedWorkerPool::TestingObserver { |
| 144 public: |
| 145 SequencedWorkerPoolTest() |
| 146 : pool_(kNumWorkerThreads, "test"), |
| 147 tracker_(new TestTracker) { |
| 148 pool_.SetTestingObserver(this); |
| 149 } |
| 150 ~SequencedWorkerPoolTest() { |
| 151 } |
| 152 |
| 153 virtual void SetUp() { |
| 154 } |
| 155 virtual void TearDown() { |
| 156 pool_.Shutdown(); |
| 157 } |
| 158 |
| 159 SequencedWorkerPool& pool() { return pool_; } |
| 160 TestTracker* tracker() { return tracker_.get(); } |
| 161 |
| 162 // Ensures that the given number of worker threads is created by adding |
| 163 // tasks and waiting until they complete. Worker thread creation is |
| 164 // serialized, can happen on background threads asynchronously, and doesn't |
| 165 // happen any more at shutdown. This means that if a test posts a bunch of |
| 166 // tasks and calls shutdown, fewer workers will be created than the test may |
| 167 // expect. |
| 168 // |
| 169 // This function ensures that this condition can't happen so tests can make |
| 170 // assumptions about the number of workers active. See the comment in |
| 171 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more |
| 172 // details. |
| 173 // |
| 174 // It will post tasks to the queue with id -1. It also assumes this is the |
| 175 // first thing called in a test since it will clear the complete_sequence_. |
| 176 void EnsureAllWorkersCreated() { |
| 177 // Create a bunch of threads, all waiting. This will cause that may |
| 178 // workers to be created. |
| 179 ThreadBlocker blocker; |
| 180 for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| 181 pool().PostWorkerTask(FROM_HERE, |
| 182 base::Bind(&TestTracker::BlockTask, |
| 183 tracker(), -1, &blocker)); |
| 184 } |
| 185 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| 186 |
| 187 // Now wake them up and wait until they're done. |
| 188 blocker.Unblock(kNumWorkerThreads); |
| 189 tracker()->WaitUntilTasksComplete(kNumWorkerThreads); |
| 190 |
| 191 // Clean up the task IDs we added. |
| 192 tracker()->ClearCompleteSequence(); |
| 193 } |
| 194 |
| 195 protected: |
| 196 // This closure will be executed right before the pool blocks on shutdown. |
| 197 base::Closure before_wait_for_shutdown_; |
| 198 |
| 199 private: |
| 200 // SequencedWorkerPool::TestingObserver implementation. |
| 201 virtual void WillWaitForShutdown() { |
| 202 if (!before_wait_for_shutdown_.is_null()) |
| 203 before_wait_for_shutdown_.Run(); |
| 204 } |
| 205 |
| 206 SequencedWorkerPool pool_; |
| 207 scoped_refptr<TestTracker> tracker_; |
| 208 }; |
| 209 |
| 210 // Checks that the given number of entries are in the tasks to complete of |
| 211 // the given tracker, and then signals the given event the given number of |
| 212 // times. This is used to wakt up blocked background threads before blocking |
| 213 // on shutdown. |
| 214 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker, |
| 215 size_t expected_tasks_to_complete, |
| 216 ThreadBlocker* blocker, |
| 217 size_t threads_to_awake) { |
| 218 EXPECT_EQ( |
| 219 expected_tasks_to_complete, |
| 220 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); |
| 221 |
| 222 blocker->Unblock(threads_to_awake); |
| 223 } |
| 224 |
| 225 } // namespace |
| 226 |
| 227 // Tests that same-named tokens have the same ID. |
| 228 TEST_F(SequencedWorkerPoolTest, NamedTokens) { |
| 229 const std::string name1("hello"); |
| 230 SequencedWorkerPool::SequenceToken token1 = |
| 231 pool().GetNamedSequenceToken(name1); |
| 232 |
| 233 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); |
| 234 |
| 235 const std::string name3("goodbye"); |
| 236 SequencedWorkerPool::SequenceToken token3 = |
| 237 pool().GetNamedSequenceToken(name3); |
| 238 |
| 239 // All 3 tokens should be different. |
| 240 EXPECT_FALSE(token1.Equals(token2)); |
| 241 EXPECT_FALSE(token1.Equals(token3)); |
| 242 EXPECT_FALSE(token2.Equals(token3)); |
| 243 |
| 244 // Requesting the same name again should give the same value. |
| 245 SequencedWorkerPool::SequenceToken token1again = |
| 246 pool().GetNamedSequenceToken(name1); |
| 247 EXPECT_TRUE(token1.Equals(token1again)); |
| 248 |
| 249 SequencedWorkerPool::SequenceToken token3again = |
| 250 pool().GetNamedSequenceToken(name3); |
| 251 EXPECT_TRUE(token3.Equals(token3again)); |
| 252 } |
| 253 |
| 254 // Tests that posting a bunch of tasks (many more than the number of worker |
| 255 // threads) runs them all. |
| 256 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { |
| 257 pool().PostWorkerTask(FROM_HERE, |
| 258 base::Bind(&TestTracker::SlowTask, tracker(), 0)); |
| 259 |
| 260 const size_t kNumTasks = 20; |
| 261 for (size_t i = 1; i < kNumTasks; i++) { |
| 262 pool().PostWorkerTask(FROM_HERE, |
| 263 base::Bind(&TestTracker::FastTask, tracker(), i)); |
| 264 } |
| 265 |
| 266 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); |
| 267 EXPECT_EQ(kNumTasks, result.size()); |
| 268 } |
| 269 |
| 270 // Test that tasks with the same sequence token are executed in order but don't |
| 271 // affect other tasks. |
| 272 TEST_F(SequencedWorkerPoolTest, Sequence) { |
| 273 // Fill all the worker threads except one. |
| 274 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; |
| 275 ThreadBlocker background_blocker; |
| 276 for (size_t i = 0; i < kNumBackgroundTasks; i++) { |
| 277 pool().PostWorkerTask(FROM_HERE, |
| 278 base::Bind(&TestTracker::BlockTask, |
| 279 tracker(), i, &background_blocker)); |
| 280 } |
| 281 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); |
| 282 |
| 283 // Create two tasks with the same sequence token, one that will block on the |
| 284 // event, and one which will just complete quickly when it's run. Since there |
| 285 // is one worker thread free, the first task will start and then block, and |
| 286 // the second task should be waiting. |
| 287 ThreadBlocker blocker; |
| 288 SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken(); |
| 289 pool().PostSequencedWorkerTask( |
| 290 token1, FROM_HERE, |
| 291 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); |
| 292 pool().PostSequencedWorkerTask( |
| 293 token1, FROM_HERE, |
| 294 base::Bind(&TestTracker::FastTask, tracker(), 101)); |
| 295 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| 296 |
| 297 // Create another two tasks as above with a different token. These will be |
| 298 // blocked since there are no slots to run. |
| 299 SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); |
| 300 pool().PostSequencedWorkerTask( |
| 301 token2, FROM_HERE, |
| 302 base::Bind(&TestTracker::FastTask, tracker(), 200)); |
| 303 pool().PostSequencedWorkerTask( |
| 304 token2, FROM_HERE, |
| 305 base::Bind(&TestTracker::FastTask, tracker(), 201)); |
| 306 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| 307 |
| 308 // Let one background task complete. This should then let both tasks of |
| 309 // token2 run to completion in order. The second task of token1 should still |
| 310 // be blocked. |
| 311 background_blocker.Unblock(1); |
| 312 std::vector<int> result = tracker()->WaitUntilTasksComplete(3); |
| 313 ASSERT_EQ(3u, result.size()); |
| 314 EXPECT_EQ(200, result[1]); |
| 315 EXPECT_EQ(201, result[2]); |
| 316 |
| 317 // Finish the rest of the background tasks. This should leave some workers |
| 318 // free with the second token1 task still blocked on the first. |
| 319 background_blocker.Unblock(kNumBackgroundTasks - 1); |
| 320 EXPECT_EQ(kNumBackgroundTasks + 2, |
| 321 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); |
| 322 |
| 323 // Allow the first task of token1 to complete. This should run the second. |
| 324 blocker.Unblock(1); |
| 325 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); |
| 326 ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); |
| 327 EXPECT_EQ(100, result[result.size() - 2]); |
| 328 EXPECT_EQ(101, result[result.size() - 1]); |
| 329 } |
| 330 |
| 331 // Tests that unrun tasks are discarded properly according to their shutdown |
| 332 // mode. |
| 333 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { |
| 334 // Start tasks to take all the threads and block them. |
| 335 EnsureAllWorkersCreated(); |
| 336 ThreadBlocker blocker; |
| 337 for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| 338 pool().PostWorkerTask(FROM_HERE, |
| 339 base::Bind(&TestTracker::BlockTask, |
| 340 tracker(), i, &blocker)); |
| 341 } |
| 342 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| 343 |
| 344 // Create some tasks with different shutdown modes. |
| 345 pool().PostWorkerTaskWithShutdownBehavior( |
| 346 FROM_HERE, |
| 347 base::Bind(&TestTracker::FastTask, tracker(), 100), |
| 348 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
| 349 pool().PostWorkerTaskWithShutdownBehavior( |
| 350 FROM_HERE, |
| 351 base::Bind(&TestTracker::FastTask, tracker(), 101), |
| 352 SequencedWorkerPool::SKIP_ON_SHUTDOWN); |
| 353 pool().PostWorkerTaskWithShutdownBehavior( |
| 354 FROM_HERE, |
| 355 base::Bind(&TestTracker::FastTask, tracker(), 102), |
| 356 SequencedWorkerPool::BLOCK_SHUTDOWN); |
| 357 |
| 358 // Shutdown the worker pool. This should discard all non-blocking tasks. |
| 359 before_wait_for_shutdown_ = |
| 360 base::Bind(&EnsureTasksToCompleteCountAndUnblock, |
| 361 scoped_refptr<TestTracker>(tracker()), 0, |
| 362 &blocker, kNumWorkerThreads); |
| 363 pool().Shutdown(); |
| 364 |
| 365 std::vector<int> result = tracker()->WaitUntilTasksComplete(4); |
| 366 |
| 367 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN |
| 368 // one, in no particular order. |
| 369 ASSERT_EQ(4u, result.size()); |
| 370 for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| 371 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != |
| 372 result.end()); |
| 373 } |
| 374 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); |
| 375 } |
| 376 |
| 377 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. |
| 378 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { |
| 379 EnsureAllWorkersCreated(); |
| 380 ThreadBlocker blocker; |
| 381 pool().PostWorkerTaskWithShutdownBehavior( |
| 382 FROM_HERE, |
| 383 base::Bind(&TestTracker::BlockTask, |
| 384 tracker(), 0, &blocker), |
| 385 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
| 386 tracker()->WaitUntilTasksBlocked(1); |
| 387 |
| 388 // This should not block. If this test hangs, it means it failed. |
| 389 pool().Shutdown(); |
| 390 |
| 391 // The task should not have completed yet. |
| 392 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| 393 |
| 394 // Posting more tasks should fail. |
| 395 EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior( |
| 396 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), |
| 397 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| 398 |
| 399 // Continue the background thread and make sure the task can complete. |
| 400 blocker.Unblock(1); |
| 401 std::vector<int> result = tracker()->WaitUntilTasksComplete(1); |
| 402 EXPECT_EQ(1u, result.size()); |
| 403 } |
| 404 |
| 405 } // namespace base |
OLD | NEW |