OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <memory> | 9 #include <memory> |
10 #include <unordered_set> | 10 #include <unordered_set> |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
60 | 60 |
61 using StandbyThreadPolicy = SchedulerWorkerPoolParams::StandbyThreadPolicy; | 61 using StandbyThreadPolicy = SchedulerWorkerPoolParams::StandbyThreadPolicy; |
62 | 62 |
63 class TaskSchedulerWorkerPoolImplTest | 63 class TaskSchedulerWorkerPoolImplTest |
64 : public testing::TestWithParam<test::ExecutionMode> { | 64 : public testing::TestWithParam<test::ExecutionMode> { |
65 protected: | 65 protected: |
66 TaskSchedulerWorkerPoolImplTest() | 66 TaskSchedulerWorkerPoolImplTest() |
67 : service_thread_("TaskSchedulerServiceThread") {} | 67 : service_thread_("TaskSchedulerServiceThread") {} |
68 | 68 |
69 void SetUp() override { | 69 void SetUp() override { |
70 InitializeWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); | 70 CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); |
71 } | 71 } |
72 | 72 |
73 void TearDown() override { | 73 void TearDown() override { |
74 service_thread_.Stop(); | 74 service_thread_.Stop(); |
75 worker_pool_->WaitForAllWorkersIdleForTesting(); | 75 worker_pool_->WaitForAllWorkersIdleForTesting(); |
76 worker_pool_->JoinForTesting(); | 76 worker_pool_->JoinForTesting(); |
77 } | 77 } |
78 | 78 |
79 void InitializeWorkerPool(TimeDelta suggested_reclaim_time, | 79 void CreateWorkerPool() { |
80 size_t num_workers) { | |
81 ASSERT_FALSE(worker_pool_); | 80 ASSERT_FALSE(worker_pool_); |
82 ASSERT_FALSE(delayed_task_manager_); | 81 ASSERT_FALSE(delayed_task_manager_); |
83 service_thread_.Start(); | 82 service_thread_.Start(); |
84 delayed_task_manager_ = | 83 delayed_task_manager_ = |
85 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner()); | 84 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner()); |
86 worker_pool_ = SchedulerWorkerPoolImpl::Create( | 85 worker_pool_ = MakeUnique<SchedulerWorkerPoolImpl>( |
87 SchedulerWorkerPoolParams("TestWorkerPool", ThreadPriority::NORMAL, | 86 "TestWorkerPool", ThreadPriority::NORMAL, |
88 StandbyThreadPolicy::LAZY, num_workers, | |
89 suggested_reclaim_time), | |
90 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | 87 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, |
91 Unretained(this)), | 88 Unretained(this)), |
92 &task_tracker_, delayed_task_manager_.get()); | 89 &task_tracker_, delayed_task_manager_.get()); |
93 ASSERT_TRUE(worker_pool_); | 90 ASSERT_TRUE(worker_pool_); |
94 } | 91 } |
95 | 92 |
93 void StartWorkerPool(TimeDelta suggested_reclaim_time, size_t num_workers) { | |
94 ASSERT_TRUE(worker_pool_); | |
95 worker_pool_->Start(SchedulerWorkerPoolParams( | |
96 "TestWorkerPool", ThreadPriority::NORMAL, StandbyThreadPolicy::LAZY, | |
97 num_workers, suggested_reclaim_time)); | |
98 } | |
99 | |
100 void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time, | |
101 size_t num_workers) { | |
102 CreateWorkerPool(); | |
103 StartWorkerPool(suggested_reclaim_time, num_workers); | |
104 } | |
105 | |
96 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; | 106 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; |
97 | 107 |
98 TaskTracker task_tracker_; | 108 TaskTracker task_tracker_; |
99 Thread service_thread_; | 109 Thread service_thread_; |
100 std::unique_ptr<DelayedTaskManager> delayed_task_manager_; | 110 std::unique_ptr<DelayedTaskManager> delayed_task_manager_; |
101 | 111 |
102 private: | 112 private: |
103 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | 113 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { |
104 // In production code, this callback would be implemented by the | 114 // In production code, this callback would be implemented by the |
105 // TaskScheduler which would first determine which PriorityQueue the | 115 // TaskScheduler which would first determine which PriorityQueue the |
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
369 } | 379 } |
370 | 380 |
371 INSTANTIATE_TEST_CASE_P(Parallel, | 381 INSTANTIATE_TEST_CASE_P(Parallel, |
372 TaskSchedulerWorkerPoolImplTest, | 382 TaskSchedulerWorkerPoolImplTest, |
373 ::testing::Values(test::ExecutionMode::PARALLEL)); | 383 ::testing::Values(test::ExecutionMode::PARALLEL)); |
374 INSTANTIATE_TEST_CASE_P(Sequenced, | 384 INSTANTIATE_TEST_CASE_P(Sequenced, |
375 TaskSchedulerWorkerPoolImplTest, | 385 TaskSchedulerWorkerPoolImplTest, |
376 ::testing::Values(test::ExecutionMode::SEQUENCED)); | 386 ::testing::Values(test::ExecutionMode::SEQUENCED)); |
377 | 387 |
378 namespace { | 388 namespace { |
389 class TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest | |
robliao
2017/04/05 23:29:23
Nit: Linebreak above here.
fdoray
2017/04/06 15:07:12
Done.
| |
390 : public TaskSchedulerWorkerPoolImplTest { | |
391 public: | |
392 void SetUp() override { | |
393 CreateWorkerPool(); | |
394 // Let the test start the worker pool. | |
395 } | |
396 }; | |
397 | |
398 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref, | |
399 WaitableEvent* task_scheduled, | |
400 WaitableEvent* barrier) { | |
401 *platform_thread_ref = PlatformThread::CurrentRef(); | |
402 task_scheduled->Signal(); | |
403 barrier->Wait(); | |
404 } | |
405 | |
406 } // namespace | |
407 | |
408 // Verify that 2 tasks posted before Start() to a SchedulerWorkerPoolImpl with | |
409 // more than 2 workers are scheduled on different workers when Start() is | |
410 // called. | |
411 TEST_F(TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest, | |
412 PostTasksBeforeStart) { | |
413 PlatformThreadRef task_1_thread_ref; | |
414 PlatformThreadRef task_2_thread_ref; | |
415 WaitableEvent task_1_scheduled(WaitableEvent::ResetPolicy::MANUAL, | |
416 WaitableEvent::InitialState::NOT_SIGNALED); | |
417 WaitableEvent task_2_scheduled(WaitableEvent::ResetPolicy::MANUAL, | |
418 WaitableEvent::InitialState::NOT_SIGNALED); | |
419 | |
420 // This event is used to prevent a task from completing before the other task | |
421 // is scheduled. If that happened, both tasks could run on the same worker and | |
422 // this test couldn't verify that the correct number of workers were woken up. | |
423 WaitableEvent barrier(WaitableEvent::ResetPolicy::MANUAL, | |
424 WaitableEvent::InitialState::NOT_SIGNALED); | |
425 | |
426 worker_pool_ | |
427 ->CreateTaskRunnerWithTraits(TaskTraits().WithBaseSyncPrimitives()) | |
428 ->PostTask(FROM_HERE, | |
429 Bind(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref), | |
430 Unretained(&task_1_scheduled), Unretained(&barrier))); | |
431 worker_pool_ | |
432 ->CreateTaskRunnerWithTraits(TaskTraits().WithBaseSyncPrimitives()) | |
433 ->PostTask(FROM_HERE, | |
434 Bind(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref), | |
435 Unretained(&task_2_scheduled), Unretained(&barrier))); | |
436 | |
437 // Tasks should not be scheduled before the pool is started. | |
438 EXPECT_FALSE(task_1_scheduled.IsSignaled()); | |
robliao
2017/04/05 23:29:23
There is an inherent observibility problem here. I
fdoray
2017/04/06 15:07:12
Done.
Added NumberOfAliveWorkersForTesting() chec
| |
439 EXPECT_FALSE(task_2_scheduled.IsSignaled()); | |
440 | |
441 StartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); | |
442 | |
443 // Tasks should be scheduled shortly after the pool is started. | |
444 task_1_scheduled.Wait(); | |
445 task_2_scheduled.Wait(); | |
446 | |
447 // Tasks should be scheduled on different threads. | |
448 EXPECT_NE(task_1_thread_ref, task_2_thread_ref); | |
449 | |
450 barrier.Signal(); | |
451 task_tracker_.Flush(); | |
452 } | |
453 | |
454 namespace { | |
379 | 455 |
380 constexpr size_t kMagicTlsValue = 42; | 456 constexpr size_t kMagicTlsValue = 42; |
381 | 457 |
382 class TaskSchedulerWorkerPoolCheckTlsReuse | 458 class TaskSchedulerWorkerPoolCheckTlsReuse |
383 : public TaskSchedulerWorkerPoolImplTest { | 459 : public TaskSchedulerWorkerPoolImplTest { |
384 public: | 460 public: |
385 void SetTlsValueAndWait() { | 461 void SetTlsValueAndWait() { |
386 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); | 462 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); |
387 waiter_.Wait(); | 463 waiter_.Wait(); |
388 } | 464 } |
389 | 465 |
390 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { | 466 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { |
391 if (!slot_.Get()) | 467 if (!slot_.Get()) |
392 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); | 468 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); |
393 | 469 |
394 count_waiter->Signal(); | 470 count_waiter->Signal(); |
395 waiter_.Wait(); | 471 waiter_.Wait(); |
396 } | 472 } |
397 | 473 |
398 protected: | 474 protected: |
399 TaskSchedulerWorkerPoolCheckTlsReuse() : | 475 TaskSchedulerWorkerPoolCheckTlsReuse() : |
400 waiter_(WaitableEvent::ResetPolicy::MANUAL, | 476 waiter_(WaitableEvent::ResetPolicy::MANUAL, |
401 WaitableEvent::InitialState::NOT_SIGNALED) {} | 477 WaitableEvent::InitialState::NOT_SIGNALED) {} |
402 | 478 |
403 void SetUp() override { | 479 void SetUp() override { |
404 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); | 480 CreateAndStartWorkerPool(kReclaimTimeForDetachTests, |
481 kNumWorkersInWorkerPool); | |
405 } | 482 } |
406 | 483 |
407 subtle::Atomic32 zero_tls_values_ = 0; | 484 subtle::Atomic32 zero_tls_values_ = 0; |
408 | 485 |
409 WaitableEvent waiter_; | 486 WaitableEvent waiter_; |
410 | 487 |
411 private: | 488 private: |
412 ThreadLocalStorage::Slot slot_; | 489 ThreadLocalStorage::Slot slot_; |
413 | 490 |
414 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); | 491 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
486 StatisticsRecorder::CreateTemporaryForTesting(); | 563 StatisticsRecorder::CreateTemporaryForTesting(); |
487 | 564 |
488 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest); | 565 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest); |
489 }; | 566 }; |
490 | 567 |
491 } // namespace | 568 } // namespace |
492 | 569 |
493 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) { | 570 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) { |
494 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, | 571 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, |
495 WaitableEvent::InitialState::NOT_SIGNALED); | 572 WaitableEvent::InitialState::NOT_SIGNALED); |
496 InitializeWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); | 573 CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); |
497 auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits( | 574 auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits( |
498 TaskTraits().WithBaseSyncPrimitives()); | 575 TaskTraits().WithBaseSyncPrimitives()); |
499 | 576 |
500 // Post a task. | 577 // Post a task. |
501 task_runner->PostTask(FROM_HERE, | 578 task_runner->PostTask(FROM_HERE, |
502 Bind(&WaitableEvent::Wait, Unretained(&event))); | 579 Bind(&WaitableEvent::Wait, Unretained(&event))); |
503 | 580 |
504 // Post 2 more tasks while the first task hasn't completed its execution. It | 581 // Post 2 more tasks while the first task hasn't completed its execution. It |
505 // is guaranteed that these tasks will run immediately after the first task, | 582 // is guaranteed that these tasks will run immediately after the first task, |
506 // without allowing the worker to sleep. | 583 // without allowing the worker to sleep. |
(...skipping 23 matching lines...) Expand all Loading... | |
530 WaitableEvent* wait_event) { | 607 WaitableEvent* wait_event) { |
531 signal_event->Signal(); | 608 signal_event->Signal(); |
532 wait_event->Wait(); | 609 wait_event->Wait(); |
533 } | 610 } |
534 | 611 |
535 } // namespace | 612 } // namespace |
536 | 613 |
537 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaitsWithDetach) { | 614 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaitsWithDetach) { |
538 WaitableEvent tasks_can_exit_event(WaitableEvent::ResetPolicy::MANUAL, | 615 WaitableEvent tasks_can_exit_event(WaitableEvent::ResetPolicy::MANUAL, |
539 WaitableEvent::InitialState::NOT_SIGNALED); | 616 WaitableEvent::InitialState::NOT_SIGNALED); |
540 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); | 617 CreateAndStartWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); |
541 auto task_runner = worker_pool_->CreateTaskRunnerWithTraits( | 618 auto task_runner = worker_pool_->CreateTaskRunnerWithTraits( |
542 TaskTraits().WithBaseSyncPrimitives()); | 619 TaskTraits().WithBaseSyncPrimitives()); |
543 | 620 |
544 // Post tasks to saturate the pool. | 621 // Post tasks to saturate the pool. |
545 std::vector<std::unique_ptr<WaitableEvent>> task_started_events; | 622 std::vector<std::unique_ptr<WaitableEvent>> task_started_events; |
546 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { | 623 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { |
547 task_started_events.push_back( | 624 task_started_events.push_back( |
548 MakeUnique<WaitableEvent>(WaitableEvent::ResetPolicy::MANUAL, | 625 MakeUnique<WaitableEvent>(WaitableEvent::ResetPolicy::MANUAL, |
549 WaitableEvent::InitialState::NOT_SIGNALED)); | 626 WaitableEvent::InitialState::NOT_SIGNALED)); |
550 task_runner->PostTask( | 627 task_runner->PostTask( |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
592 EXPECT_EQ(static_cast<int>(kNumWorkersInWorkerPool), | 669 EXPECT_EQ(static_cast<int>(kNumWorkersInWorkerPool), |
593 histogram->SnapshotSamples()->GetCount(1)); | 670 histogram->SnapshotSamples()->GetCount(1)); |
594 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10)); | 671 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10)); |
595 | 672 |
596 tasks_can_exit_event.Signal(); | 673 tasks_can_exit_event.Signal(); |
597 worker_pool_->WaitForAllWorkersIdleForTesting(); | 674 worker_pool_->WaitForAllWorkersIdleForTesting(); |
598 worker_pool_->DisallowWorkerDetachmentForTesting(); | 675 worker_pool_->DisallowWorkerDetachmentForTesting(); |
599 } | 676 } |
600 | 677 |
601 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeDetach) { | 678 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeDetach) { |
602 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); | 679 CreateAndStartWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); |
603 | 680 |
604 auto histogrammed_thread_task_runner = | 681 auto histogrammed_thread_task_runner = |
605 worker_pool_->CreateSequencedTaskRunnerWithTraits( | 682 worker_pool_->CreateSequencedTaskRunnerWithTraits( |
606 TaskTraits().WithBaseSyncPrimitives()); | 683 TaskTraits().WithBaseSyncPrimitives()); |
607 | 684 |
608 // Post 3 tasks and hold the thread for idle thread stack ordering. | 685 // Post 3 tasks and hold the thread for idle thread stack ordering. |
609 // This test assumes |histogrammed_thread_task_runner| gets assigned the same | 686 // This test assumes |histogrammed_thread_task_runner| gets assigned the same |
610 // thread for each of its tasks. | 687 // thread for each of its tasks. |
611 PlatformThreadRef thread_ref; | 688 PlatformThreadRef thread_ref; |
612 histogrammed_thread_task_runner->PostTask( | 689 histogrammed_thread_task_runner->PostTask( |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
707 ADD_FAILURE() | 784 ADD_FAILURE() |
708 << "Unexpected invocation of NotReachedReEnqueueSequenceCallback."; | 785 << "Unexpected invocation of NotReachedReEnqueueSequenceCallback."; |
709 } | 786 } |
710 | 787 |
711 } // namespace | 788 } // namespace |
712 | 789 |
713 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitLazy) { | 790 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitLazy) { |
714 TaskTracker task_tracker; | 791 TaskTracker task_tracker; |
715 DelayedTaskManager delayed_task_manager( | 792 DelayedTaskManager delayed_task_manager( |
716 make_scoped_refptr(new TestSimpleTaskRunner)); | 793 make_scoped_refptr(new TestSimpleTaskRunner)); |
717 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 794 auto worker_pool = MakeUnique<SchedulerWorkerPoolImpl>( |
718 SchedulerWorkerPoolParams("LazyPolicyWorkerPool", ThreadPriority::NORMAL, | 795 "LazyPolicyWorkerPool", ThreadPriority::NORMAL, |
719 StandbyThreadPolicy::LAZY, 8U, | |
720 TimeDelta::Max()), | |
721 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 796 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, |
722 &delayed_task_manager); | 797 &delayed_task_manager); |
798 worker_pool->Start(SchedulerWorkerPoolParams(StandbyThreadPolicy::LAZY, 8U, | |
799 TimeDelta::Max())); | |
723 ASSERT_TRUE(worker_pool); | 800 ASSERT_TRUE(worker_pool); |
724 EXPECT_EQ(0U, worker_pool->NumberOfAliveWorkersForTesting()); | 801 EXPECT_EQ(0U, worker_pool->NumberOfAliveWorkersForTesting()); |
725 worker_pool->JoinForTesting(); | 802 worker_pool->JoinForTesting(); |
726 } | 803 } |
727 | 804 |
728 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) { | 805 TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) { |
729 TaskTracker task_tracker; | 806 TaskTracker task_tracker; |
730 DelayedTaskManager delayed_task_manager( | 807 DelayedTaskManager delayed_task_manager( |
731 make_scoped_refptr(new TestSimpleTaskRunner)); | 808 make_scoped_refptr(new TestSimpleTaskRunner)); |
732 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 809 auto worker_pool = MakeUnique<SchedulerWorkerPoolImpl>( |
733 SchedulerWorkerPoolParams("LazyPolicyWorkerPool", ThreadPriority::NORMAL, | 810 "OnePolicyWorkerPool", ThreadPriority::NORMAL, |
734 StandbyThreadPolicy::ONE, 8U, TimeDelta::Max()), | |
735 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 811 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, |
736 &delayed_task_manager); | 812 &delayed_task_manager); |
813 worker_pool->Start(SchedulerWorkerPoolParams(StandbyThreadPolicy::ONE, 8U, | |
814 TimeDelta::Max())); | |
737 ASSERT_TRUE(worker_pool); | 815 ASSERT_TRUE(worker_pool); |
738 EXPECT_EQ(1U, worker_pool->NumberOfAliveWorkersForTesting()); | 816 EXPECT_EQ(1U, worker_pool->NumberOfAliveWorkersForTesting()); |
739 worker_pool->JoinForTesting(); | 817 worker_pool->JoinForTesting(); |
740 } | 818 } |
741 | 819 |
742 } // namespace internal | 820 } // namespace internal |
743 } // namespace base | 821 } // namespace base |
OLD | NEW |