Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(196)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl_unittest.cc

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: CR Feedback Continuation Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <memory> 9 #include <memory>
10 #include <unordered_set> 10 #include <unordered_set>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/atomicops.h"
13 #include "base/bind.h" 14 #include "base/bind.h"
14 #include "base/bind_helpers.h" 15 #include "base/bind_helpers.h"
15 #include "base/callback.h" 16 #include "base/callback.h"
16 #include "base/macros.h" 17 #include "base/macros.h"
17 #include "base/memory/ptr_util.h" 18 #include "base/memory/ptr_util.h"
18 #include "base/memory/ref_counted.h" 19 #include "base/memory/ref_counted.h"
19 #include "base/synchronization/condition_variable.h" 20 #include "base/synchronization/condition_variable.h"
20 #include "base/synchronization/lock.h" 21 #include "base/synchronization/lock.h"
21 #include "base/synchronization/waitable_event.h" 22 #include "base/synchronization/waitable_event.h"
22 #include "base/task_runner.h" 23 #include "base/task_runner.h"
23 #include "base/task_scheduler/delayed_task_manager.h" 24 #include "base/task_scheduler/delayed_task_manager.h"
24 #include "base/task_scheduler/scheduler_worker_pool_params.h" 25 #include "base/task_scheduler/scheduler_worker_pool_params.h"
25 #include "base/task_scheduler/sequence.h" 26 #include "base/task_scheduler/sequence.h"
26 #include "base/task_scheduler/sequence_sort_key.h" 27 #include "base/task_scheduler/sequence_sort_key.h"
27 #include "base/task_scheduler/task_tracker.h" 28 #include "base/task_scheduler/task_tracker.h"
28 #include "base/task_scheduler/test_task_factory.h" 29 #include "base/task_scheduler/test_task_factory.h"
29 #include "base/task_scheduler/test_utils.h" 30 #include "base/task_scheduler/test_utils.h"
30 #include "base/threading/platform_thread.h" 31 #include "base/threading/platform_thread.h"
31 #include "base/threading/simple_thread.h" 32 #include "base/threading/simple_thread.h"
33 #include "base/threading/thread_checker_impl.h"
34 #include "base/threading/thread_local_storage.h"
32 #include "base/threading/thread_restrictions.h" 35 #include "base/threading/thread_restrictions.h"
36 #include "base/time/time.h"
33 #include "testing/gtest/include/gtest/gtest.h" 37 #include "testing/gtest/include/gtest/gtest.h"
34 38
35 namespace base { 39 namespace base {
36 namespace internal { 40 namespace internal {
37 namespace { 41 namespace {
38 42
39 const size_t kNumWorkersInWorkerPool = 4; 43 const size_t kNumWorkersInWorkerPool = 4;
40 const size_t kNumThreadsPostingTasks = 4; 44 const size_t kNumThreadsPostingTasks = 4;
41 const size_t kNumTasksPostedPerThread = 150; 45 const size_t kNumTasksPostedPerThread = 150;
46 const TimeDelta kSuggestedReclaimTime = TimeDelta::FromMilliseconds(10);
gab 2016/07/20 01:45:21 "kSuggestedReclaimTimeForDetachTests" or something
gab 2016/07/20 01:45:21 constexpr
robliao 2016/07/20 19:44:01 Done.
42 47
43 using IORestriction = SchedulerWorkerPoolParams::IORestriction; 48 using IORestriction = SchedulerWorkerPoolParams::IORestriction;
44 49
45 class TestDelayedTaskManager : public DelayedTaskManager { 50 class TestDelayedTaskManager : public DelayedTaskManager {
46 public: 51 public:
47 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} 52 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {}
48 53
49 void SetCurrentTime(TimeTicks now) { now_ = now; } 54 void SetCurrentTime(TimeTicks now) { now_ = now; }
50 55
51 // DelayedTaskManager: 56 // DelayedTaskManager:
52 TimeTicks Now() const override { return now_; } 57 TimeTicks Now() const override { return now_; }
53 58
54 private: 59 private:
55 TimeTicks now_ = TimeTicks::Now(); 60 TimeTicks now_ = TimeTicks::Now();
56 61
57 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); 62 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager);
58 }; 63 };
59 64
60 class TaskSchedulerWorkerPoolImplTest 65 class TaskSchedulerWorkerPoolImplTest
61 : public testing::TestWithParam<ExecutionMode> { 66 : public testing::TestWithParam<ExecutionMode> {
62 protected: 67 protected:
63 TaskSchedulerWorkerPoolImplTest() = default; 68 TaskSchedulerWorkerPoolImplTest() = default;
64 69
65 void SetUp() override { 70 void SetUp() override {
66 worker_pool_ = SchedulerWorkerPoolImpl::Create( 71 InitializeWorkerPool(TimeDelta::Max());
67 SchedulerWorkerPoolParams("TestWorkerPoolWithFileIO", 72 worker_pool_->DisallowWorkerDetachmentForTesting();
fdoray 2016/07/20 14:15:43 This is not required given that suggested reclaim
robliao 2016/07/20 19:44:01 Yep. Done.
68 ThreadPriority::NORMAL,
69 IORestriction::ALLOWED,
70 kNumWorkersInWorkerPool),
71 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback,
72 Unretained(this)),
73 &task_tracker_, &delayed_task_manager_);
74 ASSERT_TRUE(worker_pool_);
75 } 73 }
76 74
77 void TearDown() override { 75 void TearDown() override {
78 worker_pool_->WaitForAllWorkersIdleForTesting(); 76 worker_pool_->WaitForAllWorkersIdleForTesting();
79 worker_pool_->JoinForTesting(); 77 worker_pool_->JoinForTesting();
80 } 78 }
81 79
80 void InitializeWorkerPool(const TimeDelta& suggested_reclaim_time) {
81 worker_pool_ = SchedulerWorkerPoolImpl::Create(
82 SchedulerWorkerPoolParams("TestWorkerPoolWithFileIO",
83 ThreadPriority::NORMAL,
84 IORestriction::ALLOWED,
85 kNumWorkersInWorkerPool,
86 suggested_reclaim_time),
87 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback,
88 Unretained(this)),
89 &task_tracker_, &delayed_task_manager_);
90 ASSERT_TRUE(worker_pool_);
91 }
92
82 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; 93 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
83 94
84 TaskTracker task_tracker_; 95 TaskTracker task_tracker_;
85 TestDelayedTaskManager delayed_task_manager_; 96 TestDelayedTaskManager delayed_task_manager_;
86 97
87 private: 98 private:
88 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { 99 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) {
89 // In production code, this callback would be implemented by the 100 // In production code, this callback would be implemented by the
90 // TaskScheduler which would first determine which PriorityQueue the 101 // TaskScheduler which would first determine which PriorityQueue the
91 // sequence must be re-enqueued. 102 // sequence must be re-enqueued.
(...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after
360 }; 371 };
361 372
362 } // namespace 373 } // namespace
363 374
364 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { 375 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) {
365 TaskTracker task_tracker; 376 TaskTracker task_tracker;
366 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); 377 DelayedTaskManager delayed_task_manager(Bind(&DoNothing));
367 378
368 auto worker_pool = SchedulerWorkerPoolImpl::Create( 379 auto worker_pool = SchedulerWorkerPoolImpl::Create(
369 SchedulerWorkerPoolParams("TestWorkerPoolWithParam", 380 SchedulerWorkerPoolParams("TestWorkerPoolWithParam",
370 ThreadPriority::NORMAL, GetParam(), 1U), 381 ThreadPriority::NORMAL, GetParam(), 1U,
382 TimeDelta::Max()),
371 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, 383 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker,
372 &delayed_task_manager); 384 &delayed_task_manager);
373 ASSERT_TRUE(worker_pool); 385 ASSERT_TRUE(worker_pool);
386 worker_pool->DisallowWorkerDetachmentForTesting();
fdoray 2016/07/20 14:15:43 This is not required given that suggested reclaim
robliao 2016/07/20 19:44:01 Yep. Done.
374 387
375 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, 388 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
376 WaitableEvent::InitialState::NOT_SIGNALED); 389 WaitableEvent::InitialState::NOT_SIGNALED);
377 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) 390 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL)
378 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); 391 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran));
379 task_ran.Wait(); 392 task_ran.Wait();
380 393
381 worker_pool->JoinForTesting(); 394 worker_pool->JoinForTesting();
382 } 395 }
383 396
384 INSTANTIATE_TEST_CASE_P(IOAllowed, 397 INSTANTIATE_TEST_CASE_P(IOAllowed,
385 TaskSchedulerWorkerPoolImplIORestrictionTest, 398 TaskSchedulerWorkerPoolImplIORestrictionTest,
386 ::testing::Values(IORestriction::ALLOWED)); 399 ::testing::Values(IORestriction::ALLOWED));
387 INSTANTIATE_TEST_CASE_P(IODisallowed, 400 INSTANTIATE_TEST_CASE_P(IODisallowed,
388 TaskSchedulerWorkerPoolImplIORestrictionTest, 401 TaskSchedulerWorkerPoolImplIORestrictionTest,
389 ::testing::Values(IORestriction::DISALLOWED)); 402 ::testing::Values(IORestriction::DISALLOWED));
390 403
404 namespace {
405
406 class TaskSchedulerWorkerPoolSingleThreadedTest
407 : public TaskSchedulerWorkerPoolImplTest {
408 public:
409 void InitializeThreadChecker() {
410 thread_checker_.reset(new ThreadCheckerImpl());
411 }
412
413 void CheckValidThread() {
414 EXPECT_TRUE(thread_checker_->CalledOnValidThread());
415 }
416
417 protected:
418 void SetUp() override {
419 InitializeWorkerPool(kSuggestedReclaimTime);
420 }
421
422 TaskSchedulerWorkerPoolSingleThreadedTest() = default;
423
424 private:
425 std::unique_ptr<ThreadCheckerImpl> thread_checker_;
426
427 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest);
428 };
429
430 } // namespace
431
432 // Verify that thread resources for a single thread remain.
433 TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) {
434 auto single_thread_task_runner =
435 worker_pool_->CreateTaskRunnerWithTraits(
436 TaskTraits().
437 WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN),
438 ExecutionMode::SINGLE_THREADED);
439 single_thread_task_runner->PostTask(
440 FROM_HERE,
441 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::InitializeThreadChecker,
442 Unretained(this)));
443 WaitableEvent task_waiter(WaitableEvent::ResetPolicy::AUTOMATIC,
444 WaitableEvent::InitialState::NOT_SIGNALED);
445 single_thread_task_runner->PostTask(
446 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter)));
447 task_waiter.Wait();
448 worker_pool_->WaitForAllWorkersIdleForTesting();
449
450 // Give the worker pool a chance to reclaim its threads.
451 PlatformThread::Sleep(
452 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200));
453
454 worker_pool_->DisallowWorkerDetachmentForTesting();
455
456 single_thread_task_runner->PostTask(
457 FROM_HERE,
458 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckValidThread,
459 Unretained(this)));
460 single_thread_task_runner->PostTask(
461 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter)));
462 task_waiter.Wait();
463 }
464
465 namespace {
466
467 constexpr size_t kMagicTlsValue = 42;
468
469 class TaskSchedulerWorkerPoolCheckTlsReuse
470 : public TaskSchedulerWorkerPoolImplTest {
471 public:
472 void SetTlsValueAndWait() {
473 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
474 waiter_.Wait();
475 }
476
477 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) {
gab 2016/07/20 01:45:21 s/ZeroTlsValue/NoMagicTlsValue/ (here and below fo
robliao 2016/07/20 19:44:01 We're counting zero TLS values here so the name sh
478 if (!slot_.Get())
479 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
480
481 count_waiter->Signal();
482 waiter_.Wait();
483 }
484
485 protected:
486 TaskSchedulerWorkerPoolCheckTlsReuse() :
487 waiter_(WaitableEvent::ResetPolicy::MANUAL,
488 WaitableEvent::InitialState::NOT_SIGNALED) {}
489
490 void SetUp() override {
491 InitializeWorkerPool(kSuggestedReclaimTime);
492 }
493
494 subtle::Atomic32 zero_tls_values_ = 0;
495
496 WaitableEvent waiter_;
497
498 private:
499 ThreadLocalStorage::Slot slot_;
500
501 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse);
502 };
503
504 } // namespace
505
506 // Checks that at least one thread has detached by checking the TLS.
507 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) {
508 // Saturate the threads and mark each thread with a magic TLS value.
509 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
510 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
511 factories.push_back(WrapUnique(new test::TestTaskFactory(
512 worker_pool_->CreateTaskRunnerWithTraits(
513 TaskTraits(), ExecutionMode::PARALLEL),
514 ExecutionMode::PARALLEL)));
515 ASSERT_TRUE(factories.back()->PostTask(
516 PostNestedTask::NO,
517 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait,
518 Unretained(this))));
519 factories.back()->WaitForAllTasksToRun();
520 }
521
522 // Release tasks waiting on |waiter_|.
523 waiter_.Signal();
524 worker_pool_->WaitForAllWorkersIdleForTesting();
525
526 // All threads should be done running by now, so reset for the next phase.
527 waiter_.Reset();
528
529 // Give the worker pool a chance to detach its threads.
530 PlatformThread::Sleep(
531 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200));
532
533 worker_pool_->DisallowWorkerDetachmentForTesting();
534
535 // Saturate and count the threads that do not have the magic TLS value. If the
536 // value is not there, that means we're at a new thread.
537 std::vector<std::unique_ptr<WaitableEvent>> count_waiters;
538 for (auto& factory : factories) {
539 count_waiters.push_back(WrapUnique(new WaitableEvent(
540 WaitableEvent::ResetPolicy::MANUAL,
541 WaitableEvent::InitialState::NOT_SIGNALED)));
542 ASSERT_TRUE(factory->PostTask(
543 PostNestedTask::NO,
544 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait,
545 Unretained(this),
546 count_waiters.back().get())));
547 factory->WaitForAllTasksToRun();
548 }
549
550 // Wait for all counters to complete.
551 for (auto& count_waiter : count_waiters)
552 count_waiter->Wait();
553
554 EXPECT_GT(zero_tls_values_, 0);
gab 2016/07/20 01:45:21 Should this use NoBarrier_Load()? On Windows this
fdoray 2016/07/20 14:15:43 atomicops.h:19 It is incorrect to make direct ass
robliao 2016/07/20 19:44:01 The atomicops unittest direct references their var
555
556 // Release tasks waiting on |waiter_|.
557 waiter_.Signal();
558 }
559
391 } // namespace internal 560 } // namespace internal
392 } // namespace base 561 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698