Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(308)

Side by Side Diff: base/threading/sequenced_worker_pool_unittest.cc

Issue 1647803004: Move base to DEPS (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.cc ('k') | base/threading/simple_thread.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/sequenced_worker_pool.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/synchronization/condition_variable.h"
15 #include "base/synchronization/lock.h"
16 #include "base/test/sequenced_task_runner_test_template.h"
17 #include "base/test/sequenced_worker_pool_owner.h"
18 #include "base/test/task_runner_test_template.h"
19 #include "base/test/test_timeouts.h"
20 #include "base/threading/platform_thread.h"
21 #include "base/time/time.h"
22 #include "base/tracked_objects.h"
23 #include "testing/gtest/include/gtest/gtest.h"
24
25 namespace base {
26
27 // IMPORTANT NOTE:
28 //
29 // Many of these tests have failure modes where they'll hang forever. These
30 // tests should not be flaky, and hanging indicates a type of failure. Do not
31 // mark as flaky if they're hanging, it's likely an actual bug.
32
33 namespace {
34
35 const size_t kNumWorkerThreads = 3;
36
37 // Allows a number of threads to all be blocked on the same event, and
38 // provides a way to unblock a certain number of them.
39 class ThreadBlocker {
40 public:
41 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
42
43 void Block() {
44 {
45 base::AutoLock lock(lock_);
46 while (unblock_counter_ == 0)
47 cond_var_.Wait();
48 unblock_counter_--;
49 }
50 cond_var_.Signal();
51 }
52
53 void Unblock(size_t count) {
54 {
55 base::AutoLock lock(lock_);
56 DCHECK_EQ(unblock_counter_, 0u);
57 unblock_counter_ = count;
58 }
59 cond_var_.Signal();
60 }
61
62 private:
63 base::Lock lock_;
64 base::ConditionVariable cond_var_;
65
66 size_t unblock_counter_;
67 };
68
69 class DestructionDeadlockChecker
70 : public base::RefCountedThreadSafe<DestructionDeadlockChecker> {
71 public:
72 DestructionDeadlockChecker(const scoped_refptr<SequencedWorkerPool>& pool)
73 : pool_(pool) {}
74
75 protected:
76 virtual ~DestructionDeadlockChecker() {
77 // This method should not deadlock.
78 pool_->RunsTasksOnCurrentThread();
79 }
80
81 private:
82 scoped_refptr<SequencedWorkerPool> pool_;
83 friend class base::RefCountedThreadSafe<DestructionDeadlockChecker>;
84 };
85
86 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
87 public:
88 TestTracker()
89 : lock_(),
90 cond_var_(&lock_),
91 started_events_(0) {
92 }
93
94 // Each of these tasks appends the argument to the complete sequence vector
95 // so calling code can see what order they finished in.
96 void FastTask(int id) {
97 SignalWorkerDone(id);
98 }
99
100 void SlowTask(int id) {
101 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
102 SignalWorkerDone(id);
103 }
104
105 void BlockTask(int id, ThreadBlocker* blocker) {
106 // Note that this task has started and signal anybody waiting for that
107 // to happen.
108 {
109 base::AutoLock lock(lock_);
110 started_events_++;
111 }
112 cond_var_.Signal();
113
114 blocker->Block();
115 SignalWorkerDone(id);
116 }
117
118 void PostAdditionalTasks(
119 int id, SequencedWorkerPool* pool,
120 bool expected_return_value) {
121 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
122 EXPECT_EQ(expected_return_value,
123 pool->PostWorkerTaskWithShutdownBehavior(
124 FROM_HERE, fast_task,
125 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
126 EXPECT_EQ(expected_return_value,
127 pool->PostWorkerTaskWithShutdownBehavior(
128 FROM_HERE, fast_task,
129 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
130 pool->PostWorkerTaskWithShutdownBehavior(
131 FROM_HERE, fast_task,
132 SequencedWorkerPool::BLOCK_SHUTDOWN);
133 SignalWorkerDone(id);
134 }
135
136 // This task posts itself back onto the SequencedWorkerPool before it
137 // finishes running. Each instance of the task maintains a strong reference
138 // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only
139 // destroyed when the task is destroyed without being run, which only happens
140 // during destruction of the SequencedWorkerPool.
141 void PostRepostingTask(
142 const scoped_refptr<SequencedWorkerPool>& pool,
143 const scoped_refptr<DestructionDeadlockChecker>& checker) {
144 Closure reposting_task =
145 base::Bind(&TestTracker::PostRepostingTask, this, pool, checker);
146 pool->PostWorkerTaskWithShutdownBehavior(
147 FROM_HERE, reposting_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN);
148 }
149
150 // This task reposts itself back onto the SequencedWorkerPool before it
151 // finishes running.
152 void PostRepostingBlockingTask(
153 const scoped_refptr<SequencedWorkerPool>& pool,
154 const SequencedWorkerPool::SequenceToken& token) {
155 Closure reposting_task =
156 base::Bind(&TestTracker::PostRepostingBlockingTask, this, pool, token);
157 pool->PostSequencedWorkerTaskWithShutdownBehavior(token,
158 FROM_HERE, reposting_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
159 }
160
161 void PostBlockingTaskThenUnblockThreads(
162 const scoped_refptr<SequencedWorkerPool>& pool,
163 ThreadBlocker* blocker,
164 size_t threads_to_wake) {
165 Closure arbitrary_task = base::Bind(&TestTracker::FastTask, this, 0);
166 pool->PostWorkerTaskWithShutdownBehavior(
167 FROM_HERE, arbitrary_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
168 blocker->Unblock(threads_to_wake);
169 }
170
171 // Waits until the given number of tasks have started executing.
172 void WaitUntilTasksBlocked(size_t count) {
173 {
174 base::AutoLock lock(lock_);
175 while (started_events_ < count)
176 cond_var_.Wait();
177 }
178 cond_var_.Signal();
179 }
180
181 // Blocks the current thread until at least the given number of tasks are in
182 // the completed vector, and then returns a copy.
183 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
184 std::vector<int> ret;
185 {
186 base::AutoLock lock(lock_);
187 while (complete_sequence_.size() < num_tasks)
188 cond_var_.Wait();
189 ret = complete_sequence_;
190 }
191 cond_var_.Signal();
192 return ret;
193 }
194
195 size_t GetTasksCompletedCount() {
196 base::AutoLock lock(lock_);
197 return complete_sequence_.size();
198 }
199
200 void ClearCompleteSequence() {
201 base::AutoLock lock(lock_);
202 complete_sequence_.clear();
203 started_events_ = 0;
204 }
205
206 private:
207 friend class base::RefCountedThreadSafe<TestTracker>;
208 ~TestTracker() {}
209
210 void SignalWorkerDone(int id) {
211 {
212 base::AutoLock lock(lock_);
213 complete_sequence_.push_back(id);
214 }
215 cond_var_.Signal();
216 }
217
218 // Protects the complete_sequence.
219 base::Lock lock_;
220
221 base::ConditionVariable cond_var_;
222
223 // Protected by lock_.
224 std::vector<int> complete_sequence_;
225
226 // Counter of the number of "block" workers that have started.
227 size_t started_events_;
228 };
229
230 class SequencedWorkerPoolTest : public testing::Test {
231 public:
232 SequencedWorkerPoolTest()
233 : tracker_(new TestTracker) {
234 ResetPool();
235 }
236
237 void TearDown() override { pool()->Shutdown(); }
238
239 const scoped_refptr<SequencedWorkerPool>& pool() {
240 return pool_owner_->pool();
241 }
242 TestTracker* tracker() { return tracker_.get(); }
243
244 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
245 // down, and creates a new instance.
246 void ResetPool() {
247 pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
248 }
249
250 void SetWillWaitForShutdownCallback(const Closure& callback) {
251 pool_owner_->SetWillWaitForShutdownCallback(callback);
252 }
253
254 // Ensures that the given number of worker threads is created by adding
255 // tasks and waiting until they complete. Worker thread creation is
256 // serialized, can happen on background threads asynchronously, and doesn't
257 // happen any more at shutdown. This means that if a test posts a bunch of
258 // tasks and calls shutdown, fewer workers will be created than the test may
259 // expect.
260 //
261 // This function ensures that this condition can't happen so tests can make
262 // assumptions about the number of workers active. See the comment in
263 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
264 // details.
265 //
266 // It will post tasks to the queue with id -1. It also assumes this is the
267 // first thing called in a test since it will clear the complete_sequence_.
268 void EnsureAllWorkersCreated() {
269 // Create a bunch of threads, all waiting. This will cause that may
270 // workers to be created.
271 ThreadBlocker blocker;
272 for (size_t i = 0; i < kNumWorkerThreads; i++) {
273 pool()->PostWorkerTask(FROM_HERE,
274 base::Bind(&TestTracker::BlockTask,
275 tracker(), -1, &blocker));
276 }
277 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
278
279 // Now wake them up and wait until they're done.
280 blocker.Unblock(kNumWorkerThreads);
281 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
282
283 // Clean up the task IDs we added.
284 tracker()->ClearCompleteSequence();
285 }
286
287 int has_work_call_count() const {
288 return pool_owner_->has_work_call_count();
289 }
290
291 private:
292 MessageLoop message_loop_;
293 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
294 const scoped_refptr<TestTracker> tracker_;
295 };
296
297 // Checks that the given number of entries are in the tasks to complete of
298 // the given tracker, and then signals the given event the given number of
299 // times. This is used to wake up blocked background threads before blocking
300 // on shutdown.
301 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
302 size_t expected_tasks_to_complete,
303 ThreadBlocker* blocker,
304 size_t threads_to_awake) {
305 EXPECT_EQ(
306 expected_tasks_to_complete,
307 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
308
309 blocker->Unblock(threads_to_awake);
310 }
311
312 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
313 public:
314 explicit DeletionHelper(
315 const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
316 : deleted_flag_(deleted_flag) {
317 }
318
319 private:
320 friend class base::RefCountedThreadSafe<DeletionHelper>;
321 virtual ~DeletionHelper() { deleted_flag_->data = true; }
322
323 const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
324 DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
325 };
326
327 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
328 const scoped_refptr<DeletionHelper>& helper) {
329 ADD_FAILURE() << "Should never run";
330 }
331
332 // Tests that delayed tasks are deleted upon shutdown of the pool.
333 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
334 // Post something to verify the pool is started up.
335 EXPECT_TRUE(pool()->PostTask(
336 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
337
338 scoped_refptr<base::RefCountedData<bool> > deleted_flag(
339 new base::RefCountedData<bool>(false));
340
341 base::Time posted_at(base::Time::Now());
342 // Post something that shouldn't run.
343 EXPECT_TRUE(pool()->PostDelayedTask(
344 FROM_HERE,
345 base::Bind(&HoldPoolReference,
346 pool(),
347 make_scoped_refptr(new DeletionHelper(deleted_flag))),
348 TestTimeouts::action_timeout()));
349
350 std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
351 ASSERT_EQ(1u, completion_sequence.size());
352 ASSERT_EQ(1, completion_sequence[0]);
353
354 pool()->Shutdown();
355 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
356 // fully destroyed (and thus shut down).
357 ResetPool();
358
359 // Verify that we didn't block until the task was due.
360 ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
361
362 // Verify that the deferred task has not only not run, but has also been
363 // destroyed.
364 ASSERT_TRUE(deleted_flag->data);
365 }
366
367 // Tests that same-named tokens have the same ID.
368 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
369 const std::string name1("hello");
370 SequencedWorkerPool::SequenceToken token1 =
371 pool()->GetNamedSequenceToken(name1);
372
373 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
374
375 const std::string name3("goodbye");
376 SequencedWorkerPool::SequenceToken token3 =
377 pool()->GetNamedSequenceToken(name3);
378
379 // All 3 tokens should be different.
380 EXPECT_FALSE(token1.Equals(token2));
381 EXPECT_FALSE(token1.Equals(token3));
382 EXPECT_FALSE(token2.Equals(token3));
383
384 // Requesting the same name again should give the same value.
385 SequencedWorkerPool::SequenceToken token1again =
386 pool()->GetNamedSequenceToken(name1);
387 EXPECT_TRUE(token1.Equals(token1again));
388
389 SequencedWorkerPool::SequenceToken token3again =
390 pool()->GetNamedSequenceToken(name3);
391 EXPECT_TRUE(token3.Equals(token3again));
392 }
393
394 // Tests that posting a bunch of tasks (many more than the number of worker
395 // threads) runs them all.
396 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
397 pool()->PostWorkerTask(FROM_HERE,
398 base::Bind(&TestTracker::SlowTask, tracker(), 0));
399
400 const size_t kNumTasks = 20;
401 for (size_t i = 1; i < kNumTasks; i++) {
402 pool()->PostWorkerTask(FROM_HERE,
403 base::Bind(&TestTracker::FastTask, tracker(), i));
404 }
405
406 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
407 EXPECT_EQ(kNumTasks, result.size());
408 }
409
410 // Tests that posting a bunch of tasks (many more than the number of
411 // worker threads) to two pools simultaneously runs them all twice.
412 // This test is meant to shake out any concurrency issues between
413 // pools (like histograms).
414 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
415 SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
416 SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
417
418 base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
419 pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
420 pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
421
422 const size_t kNumTasks = 20;
423 for (size_t i = 1; i < kNumTasks; i++) {
424 base::Closure fast_task =
425 base::Bind(&TestTracker::FastTask, tracker(), i);
426 pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
427 pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
428 }
429
430 std::vector<int> result =
431 tracker()->WaitUntilTasksComplete(2*kNumTasks);
432 EXPECT_EQ(2 * kNumTasks, result.size());
433
434 pool2.pool()->Shutdown();
435 pool1.pool()->Shutdown();
436 }
437
438 // Test that tasks with the same sequence token are executed in order but don't
439 // affect other tasks.
440 TEST_F(SequencedWorkerPoolTest, Sequence) {
441 // Fill all the worker threads except one.
442 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
443 ThreadBlocker background_blocker;
444 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
445 pool()->PostWorkerTask(FROM_HERE,
446 base::Bind(&TestTracker::BlockTask,
447 tracker(), i, &background_blocker));
448 }
449 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
450
451 // Create two tasks with the same sequence token, one that will block on the
452 // event, and one which will just complete quickly when it's run. Since there
453 // is one worker thread free, the first task will start and then block, and
454 // the second task should be waiting.
455 ThreadBlocker blocker;
456 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
457 pool()->PostSequencedWorkerTask(
458 token1, FROM_HERE,
459 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
460 pool()->PostSequencedWorkerTask(
461 token1, FROM_HERE,
462 base::Bind(&TestTracker::FastTask, tracker(), 101));
463 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
464
465 // Create another two tasks as above with a different token. These will be
466 // blocked since there are no slots to run.
467 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
468 pool()->PostSequencedWorkerTask(
469 token2, FROM_HERE,
470 base::Bind(&TestTracker::FastTask, tracker(), 200));
471 pool()->PostSequencedWorkerTask(
472 token2, FROM_HERE,
473 base::Bind(&TestTracker::FastTask, tracker(), 201));
474 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
475
476 // Let one background task complete. This should then let both tasks of
477 // token2 run to completion in order. The second task of token1 should still
478 // be blocked.
479 background_blocker.Unblock(1);
480 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
481 ASSERT_EQ(3u, result.size());
482 EXPECT_EQ(200, result[1]);
483 EXPECT_EQ(201, result[2]);
484
485 // Finish the rest of the background tasks. This should leave some workers
486 // free with the second token1 task still blocked on the first.
487 background_blocker.Unblock(kNumBackgroundTasks - 1);
488 EXPECT_EQ(kNumBackgroundTasks + 2,
489 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
490
491 // Allow the first task of token1 to complete. This should run the second.
492 blocker.Unblock(1);
493 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
494 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
495 EXPECT_EQ(100, result[result.size() - 2]);
496 EXPECT_EQ(101, result[result.size() - 1]);
497 }
498
499 // Tests that any tasks posted after Shutdown are ignored.
500 // Disabled for flakiness. See http://crbug.com/166451.
501 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
502 // Start tasks to take all the threads and block them.
503 EnsureAllWorkersCreated();
504 ThreadBlocker blocker;
505 for (size_t i = 0; i < kNumWorkerThreads; i++) {
506 pool()->PostWorkerTask(FROM_HERE,
507 base::Bind(&TestTracker::BlockTask,
508 tracker(), i, &blocker));
509 }
510 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
511
512 SetWillWaitForShutdownCallback(
513 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
514 scoped_refptr<TestTracker>(tracker()), 0,
515 &blocker, kNumWorkerThreads));
516
517 // Shutdown the worker pool. This should discard all non-blocking tasks.
518 const int kMaxNewBlockingTasksAfterShutdown = 100;
519 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
520
521 int old_has_work_call_count = has_work_call_count();
522
523 std::vector<int> result =
524 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
525
526 // The kNumWorkerThread items should have completed, in no particular order.
527 ASSERT_EQ(kNumWorkerThreads, result.size());
528 for (size_t i = 0; i < kNumWorkerThreads; i++) {
529 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
530 result.end());
531 }
532
533 // No further tasks, regardless of shutdown mode, should be allowed.
534 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
535 FROM_HERE,
536 base::Bind(&TestTracker::FastTask, tracker(), 100),
537 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
538 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
539 FROM_HERE,
540 base::Bind(&TestTracker::FastTask, tracker(), 101),
541 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
542 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
543 FROM_HERE,
544 base::Bind(&TestTracker::FastTask, tracker(), 102),
545 SequencedWorkerPool::BLOCK_SHUTDOWN));
546
547 ASSERT_EQ(old_has_work_call_count, has_work_call_count());
548 }
549
550 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
551 // Test that <n> new blocking tasks are allowed provided they're posted
552 // by a running tasks.
553 EnsureAllWorkersCreated();
554 ThreadBlocker blocker;
555
556 // Start tasks to take all the threads and block them.
557 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
558 for (int i = 0; i < kNumBlockTasks; ++i) {
559 EXPECT_TRUE(pool()->PostWorkerTask(
560 FROM_HERE,
561 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
562 }
563 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
564
565 // Queue up shutdown blocking tasks behind those which will attempt to post
566 // additional tasks when run, PostAdditionalTasks attemtps to post 3
567 // new FastTasks, one for each shutdown_behavior.
568 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
569 for (int i = 0; i < kNumQueuedTasks; ++i) {
570 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
571 FROM_HERE,
572 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
573 false),
574 SequencedWorkerPool::BLOCK_SHUTDOWN));
575 }
576
577 // Setup to open the floodgates from within Shutdown().
578 SetWillWaitForShutdownCallback(
579 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
580 scoped_refptr<TestTracker>(tracker()),
581 0, &blocker, kNumBlockTasks));
582
583 // Allow half of the additional blocking tasks thru.
584 const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
585 pool()->Shutdown(kNumNewBlockingTasksToAllow);
586
587 // Ensure that the correct number of tasks actually got run.
588 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
589 kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
590
591 // Clean up the task IDs we added and go home.
592 tracker()->ClearCompleteSequence();
593 }
594
595 // Tests that blocking tasks can still be posted during shutdown, as long as
596 // the task is not being posted within the context of a running task.
597 TEST_F(SequencedWorkerPoolTest,
598 AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) {
599 EnsureAllWorkersCreated();
600 ThreadBlocker blocker;
601
602 // Start tasks to take all the threads and block them.
603 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
604 for (int i = 0; i < kNumBlockTasks; ++i) {
605 EXPECT_TRUE(pool()->PostWorkerTask(
606 FROM_HERE,
607 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
608 }
609 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
610
611 // Setup to open the floodgates from within Shutdown().
612 SetWillWaitForShutdownCallback(
613 base::Bind(&TestTracker::PostBlockingTaskThenUnblockThreads,
614 scoped_refptr<TestTracker>(tracker()), pool(), &blocker,
615 kNumWorkerThreads));
616 pool()->Shutdown(kNumWorkerThreads + 1);
617
618 // Ensure that the correct number of tasks actually got run.
619 tracker()->WaitUntilTasksComplete(static_cast<size_t>(kNumWorkerThreads + 1));
620 tracker()->ClearCompleteSequence();
621 }
622
623 // Tests that unrun tasks are discarded properly according to their shutdown
624 // mode.
625 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
626 // Start tasks to take all the threads and block them.
627 EnsureAllWorkersCreated();
628 ThreadBlocker blocker;
629 for (size_t i = 0; i < kNumWorkerThreads; i++) {
630 pool()->PostWorkerTask(FROM_HERE,
631 base::Bind(&TestTracker::BlockTask,
632 tracker(), i, &blocker));
633 }
634 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
635
636 // Create some tasks with different shutdown modes.
637 pool()->PostWorkerTaskWithShutdownBehavior(
638 FROM_HERE,
639 base::Bind(&TestTracker::FastTask, tracker(), 100),
640 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
641 pool()->PostWorkerTaskWithShutdownBehavior(
642 FROM_HERE,
643 base::Bind(&TestTracker::FastTask, tracker(), 101),
644 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
645 pool()->PostWorkerTaskWithShutdownBehavior(
646 FROM_HERE,
647 base::Bind(&TestTracker::FastTask, tracker(), 102),
648 SequencedWorkerPool::BLOCK_SHUTDOWN);
649
650 // Shutdown the worker pool. This should discard all non-blocking tasks.
651 SetWillWaitForShutdownCallback(
652 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
653 scoped_refptr<TestTracker>(tracker()), 0,
654 &blocker, kNumWorkerThreads));
655 pool()->Shutdown();
656
657 std::vector<int> result =
658 tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
659
660 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
661 // one, in no particular order.
662 ASSERT_EQ(kNumWorkerThreads + 1, result.size());
663 for (size_t i = 0; i < kNumWorkerThreads; i++) {
664 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
665 result.end());
666 }
667 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
668 }
669
670 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
671 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
672 scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
673 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
674 scoped_refptr<SequencedTaskRunner> sequenced_runner(
675 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
676 pool()->GetSequenceToken(),
677 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
678 EnsureAllWorkersCreated();
679 ThreadBlocker blocker;
680 pool()->PostWorkerTaskWithShutdownBehavior(
681 FROM_HERE,
682 base::Bind(&TestTracker::BlockTask,
683 tracker(), 0, &blocker),
684 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
685 runner->PostTask(
686 FROM_HERE,
687 base::Bind(&TestTracker::BlockTask,
688 tracker(), 1, &blocker));
689 sequenced_runner->PostTask(
690 FROM_HERE,
691 base::Bind(&TestTracker::BlockTask,
692 tracker(), 2, &blocker));
693
694 tracker()->WaitUntilTasksBlocked(3);
695
696 // This should not block. If this test hangs, it means it failed.
697 pool()->Shutdown();
698
699 // The task should not have completed yet.
700 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
701
702 // Posting more tasks should fail.
703 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
704 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
705 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
706 EXPECT_FALSE(runner->PostTask(
707 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
708 EXPECT_FALSE(sequenced_runner->PostTask(
709 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
710
711 // Continue the background thread and make sure the tasks can complete.
712 blocker.Unblock(3);
713 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
714 EXPECT_EQ(3u, result.size());
715 }
716
717 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
718 // until they stop, but tasks not yet started do not.
719 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
720 // Start tasks to take all the threads and block them.
721 EnsureAllWorkersCreated();
722 ThreadBlocker blocker;
723
724 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
725 // return until these tasks have completed.
726 for (size_t i = 0; i < kNumWorkerThreads; i++) {
727 pool()->PostWorkerTaskWithShutdownBehavior(
728 FROM_HERE,
729 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
730 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
731 }
732 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
733
734 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
735 // executed once Shutdown() has been called.
736 pool()->PostWorkerTaskWithShutdownBehavior(
737 FROM_HERE,
738 base::Bind(&TestTracker::BlockTask,
739 tracker(), 0, &blocker),
740 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
741
742 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
743 // been started block shutdown.
744 SetWillWaitForShutdownCallback(
745 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
746 scoped_refptr<TestTracker>(tracker()), 0,
747 &blocker, kNumWorkerThreads));
748
749 // No tasks should have completed yet.
750 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
751
752 // This should not block. If this test hangs, it means it failed.
753 pool()->Shutdown();
754
755 // Shutdown should not return until all of the tasks have completed.
756 std::vector<int> result =
757 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
758
759 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
760 // allowed to complete. No additional non-blocking tasks should have been
761 // started.
762 ASSERT_EQ(kNumWorkerThreads, result.size());
763 for (size_t i = 0; i < kNumWorkerThreads; i++) {
764 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
765 result.end());
766 }
767 }
768
769 // Ensure all worker threads are created, and then trigger a spurious
770 // work signal. This shouldn't cause any other work signals to be
771 // triggered. This is a regression test for http://crbug.com/117469.
772 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
773 EnsureAllWorkersCreated();
774 int old_has_work_call_count = has_work_call_count();
775 pool()->SignalHasWorkForTesting();
776 // This is inherently racy, but can only produce false positives.
777 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
778 EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
779 }
780
781 void IsRunningOnCurrentThreadTask(
782 SequencedWorkerPool::SequenceToken test_positive_token,
783 SequencedWorkerPool::SequenceToken test_negative_token,
784 SequencedWorkerPool* pool,
785 SequencedWorkerPool* unused_pool) {
786 EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
787 EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
788 EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
789 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
790 EXPECT_FALSE(
791 unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
792 EXPECT_FALSE(
793 unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
794 }
795
796 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
797 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
798 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
799 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
800 SequencedWorkerPool::SequenceToken unsequenced_token;
801
802 scoped_refptr<SequencedWorkerPool> unused_pool =
803 new SequencedWorkerPool(2, "unused_pool");
804
805 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
806 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
807 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
808 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
809 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
810 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
811 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
812 EXPECT_FALSE(
813 unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
814
815 pool()->PostSequencedWorkerTask(
816 token1, FROM_HERE,
817 base::Bind(&IsRunningOnCurrentThreadTask,
818 token1, token2, pool(), unused_pool));
819 pool()->PostSequencedWorkerTask(
820 token2, FROM_HERE,
821 base::Bind(&IsRunningOnCurrentThreadTask,
822 token2, unsequenced_token, pool(), unused_pool));
823 pool()->PostWorkerTask(
824 FROM_HERE,
825 base::Bind(&IsRunningOnCurrentThreadTask,
826 unsequenced_token, token1, pool(), unused_pool));
827 pool()->Shutdown();
828 unused_pool->Shutdown();
829 }
830
831 // Checks that tasks are destroyed in the right context during shutdown. If a
832 // task is destroyed while SequencedWorkerPool's global lock is held,
833 // SequencedWorkerPool might deadlock.
834 TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) {
835 for (int i = 0; i < 4; ++i) {
836 scoped_refptr<DestructionDeadlockChecker> checker(
837 new DestructionDeadlockChecker(pool()));
838 tracker()->PostRepostingTask(pool(), checker);
839 }
840
841 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
842 // which in turn should not deadlock in their destructors.
843 pool()->Shutdown();
844 }
845
846 // Similar to the test AvoidsDeadlockOnShutdown, but there are now also
847 // sequenced, blocking tasks in the queue during shutdown.
848 TEST_F(SequencedWorkerPoolTest,
849 AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) {
850 const std::string sequence_token_name("name");
851 for (int i = 0; i < 4; ++i) {
852 scoped_refptr<DestructionDeadlockChecker> checker(
853 new DestructionDeadlockChecker(pool()));
854 tracker()->PostRepostingTask(pool(), checker);
855
856 SequencedWorkerPool::SequenceToken token1 =
857 pool()->GetNamedSequenceToken(sequence_token_name);
858 tracker()->PostRepostingBlockingTask(pool(), token1);
859 }
860
861 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
862 // which in turn should not deadlock in their destructors.
863 pool()->Shutdown();
864 }
865
866 // Verify that FlushForTesting works as intended.
867 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
868 // Should be fine to call on a new instance.
869 pool()->FlushForTesting();
870
871 // Queue up a bunch of work, including a long delayed task and
872 // a task that produces additional tasks as an artifact.
873 pool()->PostDelayedWorkerTask(
874 FROM_HERE,
875 base::Bind(&TestTracker::FastTask, tracker(), 0),
876 TimeDelta::FromMinutes(5));
877 pool()->PostWorkerTask(FROM_HERE,
878 base::Bind(&TestTracker::SlowTask, tracker(), 0));
879 const size_t kNumFastTasks = 20;
880 for (size_t i = 0; i < kNumFastTasks; i++) {
881 pool()->PostWorkerTask(FROM_HERE,
882 base::Bind(&TestTracker::FastTask, tracker(), 0));
883 }
884 pool()->PostWorkerTask(
885 FROM_HERE,
886 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
887 true));
888
889 // We expect all except the delayed task to have been run. We verify all
890 // closures have been deleted by looking at the refcount of the
891 // tracker.
892 EXPECT_FALSE(tracker()->HasOneRef());
893 pool()->FlushForTesting();
894 EXPECT_TRUE(tracker()->HasOneRef());
895 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
896
897 // Should be fine to call on an idle instance with all threads created, and
898 // spamming the method shouldn't deadlock or confuse the class.
899 pool()->FlushForTesting();
900 pool()->FlushForTesting();
901
902 // Should be fine to call after shutdown too.
903 pool()->Shutdown();
904 pool()->FlushForTesting();
905 }
906
907 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
908 MessageLoop loop;
909 scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
910 scoped_refptr<SequencedTaskRunner> task_runner =
911 pool->GetSequencedTaskRunnerWithShutdownBehavior(
912 pool->GetSequenceToken(),
913 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
914
915 // Upon test exit, should shut down without hanging.
916 pool->Shutdown();
917 }
918
919 class SequencedWorkerPoolTaskRunnerTestDelegate {
920 public:
921 SequencedWorkerPoolTaskRunnerTestDelegate() {}
922
923 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
924
925 void StartTaskRunner() {
926 pool_owner_.reset(
927 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
928 }
929
930 scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
931 return pool_owner_->pool();
932 }
933
934 void StopTaskRunner() {
935 // Make sure all tasks are run before shutting down. Delayed tasks are
936 // not run, they're simply deleted.
937 pool_owner_->pool()->FlushForTesting();
938 pool_owner_->pool()->Shutdown();
939 // Don't reset |pool_owner_| here, as the test may still hold a
940 // reference to the pool.
941 }
942
943 private:
944 MessageLoop message_loop_;
945 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
946 };
947
948 INSTANTIATE_TYPED_TEST_CASE_P(
949 SequencedWorkerPool, TaskRunnerTest,
950 SequencedWorkerPoolTaskRunnerTestDelegate);
951
952 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
953 public:
954 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
955
956 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
957 }
958
959 void StartTaskRunner() {
960 pool_owner_.reset(
961 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
962 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
963 SequencedWorkerPool::BLOCK_SHUTDOWN);
964 }
965
966 scoped_refptr<TaskRunner> GetTaskRunner() {
967 return task_runner_;
968 }
969
970 void StopTaskRunner() {
971 // Make sure all tasks are run before shutting down. Delayed tasks are
972 // not run, they're simply deleted.
973 pool_owner_->pool()->FlushForTesting();
974 pool_owner_->pool()->Shutdown();
975 // Don't reset |pool_owner_| here, as the test may still hold a
976 // reference to the pool.
977 }
978
979 private:
980 MessageLoop message_loop_;
981 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
982 scoped_refptr<TaskRunner> task_runner_;
983 };
984
985 INSTANTIATE_TYPED_TEST_CASE_P(
986 SequencedWorkerPoolTaskRunner, TaskRunnerTest,
987 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
988
989 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
990 public:
991 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
992
993 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
994 }
995
996 void StartTaskRunner() {
997 pool_owner_.reset(new SequencedWorkerPoolOwner(
998 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
999 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
1000 pool_owner_->pool()->GetSequenceToken());
1001 }
1002
1003 scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
1004 return task_runner_;
1005 }
1006
1007 void StopTaskRunner() {
1008 // Make sure all tasks are run before shutting down. Delayed tasks are
1009 // not run, they're simply deleted.
1010 pool_owner_->pool()->FlushForTesting();
1011 pool_owner_->pool()->Shutdown();
1012 // Don't reset |pool_owner_| here, as the test may still hold a
1013 // reference to the pool.
1014 }
1015
1016 private:
1017 MessageLoop message_loop_;
1018 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
1019 scoped_refptr<SequencedTaskRunner> task_runner_;
1020 };
1021
1022 INSTANTIATE_TYPED_TEST_CASE_P(
1023 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
1024 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1025
1026 INSTANTIATE_TYPED_TEST_CASE_P(
1027 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
1028 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1029
1030 } // namespace
1031
1032 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.cc ('k') | base/threading/simple_thread.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698