OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <memory> | 9 #include <memory> |
10 #include <unordered_set> | 10 #include <unordered_set> |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
60 | 60 |
61 using StandbyThreadPolicy = SchedulerWorkerPoolParams::StandbyThreadPolicy; | 61 using StandbyThreadPolicy = SchedulerWorkerPoolParams::StandbyThreadPolicy; |
62 | 62 |
63 class TaskSchedulerWorkerPoolImplTest | 63 class TaskSchedulerWorkerPoolImplTest |
64 : public testing::TestWithParam<test::ExecutionMode> { | 64 : public testing::TestWithParam<test::ExecutionMode> { |
65 protected: | 65 protected: |
66 TaskSchedulerWorkerPoolImplTest() | 66 TaskSchedulerWorkerPoolImplTest() |
67 : service_thread_("TaskSchedulerServiceThread") {} | 67 : service_thread_("TaskSchedulerServiceThread") {} |
68 | 68 |
69 void SetUp() override { | 69 void SetUp() override { |
70 InitializeWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); | 70 CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); |
71 } | 71 } |
72 | 72 |
73 void TearDown() override { | 73 void TearDown() override { |
74 service_thread_.Stop(); | 74 service_thread_.Stop(); |
75 worker_pool_->WaitForAllWorkersIdleForTesting(); | 75 worker_pool_->WaitForAllWorkersIdleForTesting(); |
76 worker_pool_->JoinForTesting(); | 76 worker_pool_->JoinForTesting(); |
77 } | 77 } |
78 | 78 |
79 void InitializeWorkerPool(TimeDelta suggested_reclaim_time, | 79 void CreateWorkerPool() { |
80 size_t num_workers) { | |
81 ASSERT_FALSE(worker_pool_); | 80 ASSERT_FALSE(worker_pool_); |
82 ASSERT_FALSE(delayed_task_manager_); | 81 ASSERT_FALSE(delayed_task_manager_); |
83 service_thread_.Start(); | 82 service_thread_.Start(); |
84 delayed_task_manager_ = | 83 delayed_task_manager_ = |
85 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner()); | 84 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner()); |
86 worker_pool_ = SchedulerWorkerPoolImpl::Create( | 85 worker_pool_ = MakeUnique<SchedulerWorkerPoolImpl>( |
87 SchedulerWorkerPoolParams("TestWorkerPool", ThreadPriority::NORMAL, | 86 "TestWorkerPool", ThreadPriority::NORMAL, |
88 StandbyThreadPolicy::LAZY, num_workers, | |
89 suggested_reclaim_time), | |
90 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | 87 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, |
91 Unretained(this)), | 88 Unretained(this)), |
92 &task_tracker_, delayed_task_manager_.get()); | 89 &task_tracker_, delayed_task_manager_.get()); |
93 ASSERT_TRUE(worker_pool_); | 90 ASSERT_TRUE(worker_pool_); |
94 } | 91 } |
95 | 92 |
93 void StartWorkerPool(TimeDelta suggested_reclaim_time, size_t num_workers) { | |
94 ASSERT_TRUE(worker_pool_); | |
95 worker_pool_->Start(SchedulerWorkerPoolParams( | |
96 "TestWorkerPool", ThreadPriority::NORMAL, StandbyThreadPolicy::LAZY, | |
97 num_workers, suggested_reclaim_time)); | |
98 } | |
99 | |
100 void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time, | |
101 size_t num_workers) { | |
102 CreateWorkerPool(); | |
103 StartWorkerPool(suggested_reclaim_time, num_workers); | |
104 } | |
105 | |
96 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; | 106 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; |
97 | 107 |
98 TaskTracker task_tracker_; | 108 TaskTracker task_tracker_; |
99 Thread service_thread_; | 109 Thread service_thread_; |
100 std::unique_ptr<DelayedTaskManager> delayed_task_manager_; | 110 std::unique_ptr<DelayedTaskManager> delayed_task_manager_; |
101 | 111 |
102 private: | 112 private: |
103 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | 113 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { |
104 // In production code, this callback would be implemented by the | 114 // In production code, this callback would be implemented by the |
105 // TaskScheduler which would first determine which PriorityQueue the | 115 // TaskScheduler which would first determine which PriorityQueue the |
(...skipping 264 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
370 | 380 |
371 INSTANTIATE_TEST_CASE_P(Parallel, | 381 INSTANTIATE_TEST_CASE_P(Parallel, |
372 TaskSchedulerWorkerPoolImplTest, | 382 TaskSchedulerWorkerPoolImplTest, |
373 ::testing::Values(test::ExecutionMode::PARALLEL)); | 383 ::testing::Values(test::ExecutionMode::PARALLEL)); |
374 INSTANTIATE_TEST_CASE_P(Sequenced, | 384 INSTANTIATE_TEST_CASE_P(Sequenced, |
375 TaskSchedulerWorkerPoolImplTest, | 385 TaskSchedulerWorkerPoolImplTest, |
376 ::testing::Values(test::ExecutionMode::SEQUENCED)); | 386 ::testing::Values(test::ExecutionMode::SEQUENCED)); |
377 | 387 |
378 namespace { | 388 namespace { |
379 | 389 |
390 class TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest | |
gab
2017/04/10 16:16:25
Why even bother with the fixture? I'd favor inlini
| |
391 : public TaskSchedulerWorkerPoolImplTest { | |
392 public: | |
393 void SetUp() override { | |
394 CreateWorkerPool(); | |
395 // Let the test start the worker pool. | |
396 } | |
397 }; | |
398 | |
399 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref, | |
400 WaitableEvent* task_scheduled, | |
401 WaitableEvent* barrier) { | |
402 *platform_thread_ref = PlatformThread::CurrentRef(); | |
403 task_scheduled->Signal(); | |
404 barrier->Wait(); | |
405 } | |
406 | |
407 } // namespace | |
408 | |
409 // Verify that 2 tasks posted before Start() to a SchedulerWorkerPoolImpl with | |
410 // more than 2 workers are scheduled on different workers when Start() is | |
411 // called. | |
412 TEST_F(TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest, | |
413 PostTasksBeforeStart) { | |
414 PlatformThreadRef task_1_thread_ref; | |
415 PlatformThreadRef task_2_thread_ref; | |
416 WaitableEvent task_1_scheduled(WaitableEvent::ResetPolicy::MANUAL, | |
417 WaitableEvent::InitialState::NOT_SIGNALED); | |
418 WaitableEvent task_2_scheduled(WaitableEvent::ResetPolicy::MANUAL, | |
419 WaitableEvent::InitialState::NOT_SIGNALED); | |
420 | |
421 // This event is used to prevent a task from completing before the other task | |
422 // is scheduled. If that happened, both tasks could run on the same worker and | |
423 // this test couldn't verify that the correct number of workers were woken up. | |
424 WaitableEvent barrier(WaitableEvent::ResetPolicy::MANUAL, | |
425 WaitableEvent::InitialState::NOT_SIGNALED); | |
426 | |
427 worker_pool_ | |
428 ->CreateTaskRunnerWithTraits(TaskTraits().WithBaseSyncPrimitives()) | |
429 ->PostTask(FROM_HERE, | |
430 Bind(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref), | |
431 Unretained(&task_1_scheduled), Unretained(&barrier))); | |
432 worker_pool_ | |
433 ->CreateTaskRunnerWithTraits(TaskTraits().WithBaseSyncPrimitives()) | |
434 ->PostTask(FROM_HERE, | |
435 Bind(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref), | |
436 Unretained(&task_2_scheduled), Unretained(&barrier))); | |
437 | |
438 // Workers should not be created and tasks should not run before the pool is | |
439 // started. | |
gab
2017/04/10 19:09:07
PlatformThread::Sleep(TestTimeouts::tiny_timeout()
| |
440 EXPECT_EQ(0U, worker_pool_->NumberOfAliveWorkersForTesting()); | |
441 EXPECT_FALSE(task_1_scheduled.IsSignaled()); | |
442 EXPECT_FALSE(task_2_scheduled.IsSignaled()); | |
443 | |
444 StartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); | |
445 | |
446 // Tasks should be scheduled shortly after the pool is started. | |
447 task_1_scheduled.Wait(); | |
448 task_2_scheduled.Wait(); | |
449 | |
450 // Tasks should be scheduled on different threads. | |
451 EXPECT_NE(task_1_thread_ref, task_2_thread_ref); | |
452 | |
453 barrier.Signal(); | |
454 task_tracker_.Flush(); | |
455 } | |
456 | |
457 namespace { | |
458 | |
380 constexpr size_t kMagicTlsValue = 42; | 459 constexpr size_t kMagicTlsValue = 42; |
381 | 460 |
382 class TaskSchedulerWorkerPoolCheckTlsReuse | 461 class TaskSchedulerWorkerPoolCheckTlsReuse |
383 : public TaskSchedulerWorkerPoolImplTest { | 462 : public TaskSchedulerWorkerPoolImplTest { |
384 public: | 463 public: |
385 void SetTlsValueAndWait() { | 464 void SetTlsValueAndWait() { |
386 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); | 465 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); |
387 waiter_.Wait(); | 466 waiter_.Wait(); |
388 } | 467 } |
389 | 468 |
390 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { | 469 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { |
391 if (!slot_.Get()) | 470 if (!slot_.Get()) |
392 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); | 471 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); |
393 | 472 |
394 count_waiter->Signal(); | 473 count_waiter->Signal(); |
395 waiter_.Wait(); | 474 waiter_.Wait(); |
396 } | 475 } |
397 | 476 |
398 protected: | 477 protected: |
399 TaskSchedulerWorkerPoolCheckTlsReuse() : | 478 TaskSchedulerWorkerPoolCheckTlsReuse() : |
400 waiter_(WaitableEvent::ResetPolicy::MANUAL, | 479 waiter_(WaitableEvent::ResetPolicy::MANUAL, |
401 WaitableEvent::InitialState::NOT_SIGNALED) {} | 480 WaitableEvent::InitialState::NOT_SIGNALED) {} |
402 | 481 |
403 void SetUp() override { | 482 void SetUp() override { |
404 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); | 483 CreateAndStartWorkerPool(kReclaimTimeForDetachTests, |
484 kNumWorkersInWorkerPool); | |
405 } | 485 } |
406 | 486 |
407 subtle::Atomic32 zero_tls_values_ = 0; | 487 subtle::Atomic32 zero_tls_values_ = 0; |
408 | 488 |
409 WaitableEvent waiter_; | 489 WaitableEvent waiter_; |
410 | 490 |
411 private: | 491 private: |
412 ThreadLocalStorage::Slot slot_; | 492 ThreadLocalStorage::Slot slot_; |
413 | 493 |
414 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); | 494 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
486 StatisticsRecorder::CreateTemporaryForTesting(); | 566 StatisticsRecorder::CreateTemporaryForTesting(); |
487 | 567 |
488 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest); | 568 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest); |
489 }; | 569 }; |
490 | 570 |
491 } // namespace | 571 } // namespace |
492 | 572 |
493 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) { | 573 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) { |
494 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, | 574 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, |
495 WaitableEvent::InitialState::NOT_SIGNALED); | 575 WaitableEvent::InitialState::NOT_SIGNALED); |
496 InitializeWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); | 576 CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); |
497 auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits( | 577 auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits( |
498 TaskTraits().WithBaseSyncPrimitives()); | 578 TaskTraits().WithBaseSyncPrimitives()); |
499 | 579 |
500 // Post a task. | 580 // Post a task. |
501 task_runner->PostTask(FROM_HERE, | 581 task_runner->PostTask(FROM_HERE, |
502 Bind(&WaitableEvent::Wait, Unretained(&event))); | 582 Bind(&WaitableEvent::Wait, Unretained(&event))); |
503 | 583 |
504 // Post 2 more tasks while the first task hasn't completed its execution. It | 584 // Post 2 more tasks while the first task hasn't completed its execution. It |
505 // is guaranteed that these tasks will run immediately after the first task, | 585 // is guaranteed that these tasks will run immediately after the first task, |
506 // without allowing the worker to sleep. | 586 // without allowing the worker to sleep. |
(...skipping 23 matching lines...) Expand all Loading... | |
530 WaitableEvent* wait_event) { | 610 WaitableEvent* wait_event) { |
531 signal_event->Signal(); | 611 signal_event->Signal(); |
532 wait_event->Wait(); | 612 wait_event->Wait(); |
533 } | 613 } |
534 | 614 |
535 } // namespace | 615 } // namespace |
536 | 616 |
537 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaitsWithDetach) { | 617 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaitsWithDetach) { |
538 WaitableEvent tasks_can_exit_event(WaitableEvent::ResetPolicy::MANUAL, | 618 WaitableEvent tasks_can_exit_event(WaitableEvent::ResetPolicy::MANUAL, |
539 WaitableEvent::InitialState::NOT_SIGNALED); | 619 WaitableEvent::InitialState::NOT_SIGNALED); |
540 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); | 620 CreateAndStartWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); |
541 auto task_runner = worker_pool_->CreateTaskRunnerWithTraits( | 621 auto task_runner = worker_pool_->CreateTaskRunnerWithTraits( |
542 TaskTraits().WithBaseSyncPrimitives()); | 622 TaskTraits().WithBaseSyncPrimitives()); |
543 | 623 |
544 // Post tasks to saturate the pool. | 624 // Post tasks to saturate the pool. |
545 std::vector<std::unique_ptr<WaitableEvent>> task_started_events; | 625 std::vector<std::unique_ptr<WaitableEvent>> task_started_events; |
546 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { | 626 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { |
547 task_started_events.push_back( | 627 task_started_events.push_back( |
548 MakeUnique<WaitableEvent>(WaitableEvent::ResetPolicy::MANUAL, | 628 MakeUnique<WaitableEvent>(WaitableEvent::ResetPolicy::MANUAL, |
549 WaitableEvent::InitialState::NOT_SIGNALED)); | 629 WaitableEvent::InitialState::NOT_SIGNALED)); |
550 task_runner->PostTask( | 630 task_runner->PostTask( |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
592 EXPECT_EQ(static_cast<int>(kNumWorkersInWorkerPool), | 672 EXPECT_EQ(static_cast<int>(kNumWorkersInWorkerPool), |
593 histogram->SnapshotSamples()->GetCount(1)); | 673 histogram->SnapshotSamples()->GetCount(1)); |
594 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10)); | 674 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10)); |
595 | 675 |
596 tasks_can_exit_event.Signal(); | 676 tasks_can_exit_event.Signal(); |
597 worker_pool_->WaitForAllWorkersIdleForTesting(); | 677 worker_pool_->WaitForAllWorkersIdleForTesting(); |
598 worker_pool_->DisallowWorkerDetachmentForTesting(); | 678 worker_pool_->DisallowWorkerDetachmentForTesting(); |
599 } | 679 } |
600 | 680 |
601 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeDetach) { | 681 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeDetach) { |
602 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); | 682 CreateAndStartWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); |
603 | 683 |
604 auto histogrammed_thread_task_runner = | 684 auto histogrammed_thread_task_runner = |
605 worker_pool_->CreateSequencedTaskRunnerWithTraits( | 685 worker_pool_->CreateSequencedTaskRunnerWithTraits( |
606 TaskTraits().WithBaseSyncPrimitives()); | 686 TaskTraits().WithBaseSyncPrimitives()); |
607 | 687 |
608 // Post 3 tasks and hold the thread for idle thread stack ordering. | 688 // Post 3 tasks and hold the thread for idle thread stack ordering. |
609 // This test assumes |histogrammed_thread_task_runner| gets assigned the same | 689 // This test assumes |histogrammed_thread_task_runner| gets assigned the same |
610 // thread for each of its tasks. | 690 // thread for each of its tasks. |
611 PlatformThreadRef thread_ref; | 691 PlatformThreadRef thread_ref; |
612 histogrammed_thread_task_runner->PostTask( | 692 histogrammed_thread_task_runner->PostTask( |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
707 ADD_FAILURE() | 787 ADD_FAILURE() |
708 << "Unexpected invocation of NotReachedReEnqueueSequenceCallback."; | 788 << "Unexpected invocation of NotReachedReEnqueueSequenceCallback."; |
709 } | 789 } |
710 | 790 |
711 } // namespace | 791 } // namespace |
712 | 792 |
713 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitLazy) { | 793 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitLazy) { |
714 TaskTracker task_tracker; | 794 TaskTracker task_tracker; |
715 DelayedTaskManager delayed_task_manager( | 795 DelayedTaskManager delayed_task_manager( |
716 make_scoped_refptr(new TestSimpleTaskRunner)); | 796 make_scoped_refptr(new TestSimpleTaskRunner)); |
717 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 797 auto worker_pool = MakeUnique<SchedulerWorkerPoolImpl>( |
718 SchedulerWorkerPoolParams("LazyPolicyWorkerPool", ThreadPriority::NORMAL, | 798 "LazyPolicyWorkerPool", ThreadPriority::NORMAL, |
719 StandbyThreadPolicy::LAZY, 8U, | |
720 TimeDelta::Max()), | |
721 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 799 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, |
722 &delayed_task_manager); | 800 &delayed_task_manager); |
801 worker_pool->Start(SchedulerWorkerPoolParams(StandbyThreadPolicy::LAZY, 8U, | |
802 TimeDelta::Max())); | |
723 ASSERT_TRUE(worker_pool); | 803 ASSERT_TRUE(worker_pool); |
724 EXPECT_EQ(0U, worker_pool->NumberOfAliveWorkersForTesting()); | 804 EXPECT_EQ(0U, worker_pool->NumberOfAliveWorkersForTesting()); |
725 worker_pool->JoinForTesting(); | 805 worker_pool->JoinForTesting(); |
726 } | 806 } |
727 | 807 |
728 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) { | 808 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) { |
729 TaskTracker task_tracker; | 809 TaskTracker task_tracker; |
730 DelayedTaskManager delayed_task_manager( | 810 DelayedTaskManager delayed_task_manager( |
731 make_scoped_refptr(new TestSimpleTaskRunner)); | 811 make_scoped_refptr(new TestSimpleTaskRunner)); |
732 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 812 auto worker_pool = MakeUnique<SchedulerWorkerPoolImpl>( |
733 SchedulerWorkerPoolParams("LazyPolicyWorkerPool", ThreadPriority::NORMAL, | 813 "OnePolicyWorkerPool", ThreadPriority::NORMAL, |
734 StandbyThreadPolicy::ONE, 8U, TimeDelta::Max()), | |
735 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 814 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, |
736 &delayed_task_manager); | 815 &delayed_task_manager); |
816 worker_pool->Start(SchedulerWorkerPoolParams(StandbyThreadPolicy::ONE, 8U, | |
817 TimeDelta::Max())); | |
737 ASSERT_TRUE(worker_pool); | 818 ASSERT_TRUE(worker_pool); |
738 EXPECT_EQ(1U, worker_pool->NumberOfAliveWorkersForTesting()); | 819 EXPECT_EQ(1U, worker_pool->NumberOfAliveWorkersForTesting()); |
739 worker_pool->JoinForTesting(); | 820 worker_pool->JoinForTesting(); |
740 } | 821 } |
741 | 822 |
742 } // namespace internal | 823 } // namespace internal |
743 } // namespace base | 824 } // namespace base |
OLD | NEW |