Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(355)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl_unittest.cc

Issue 2650383007: Move Task Scheduler Single Thread Task Runners to Dedicated Threads (Closed)
Patch Set: CR Feedback Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <memory> 9 #include <memory>
10 #include <unordered_set> 10 #include <unordered_set>
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
60 60
61 using StandbyThreadPolicy = SchedulerWorkerPoolParams::StandbyThreadPolicy; 61 using StandbyThreadPolicy = SchedulerWorkerPoolParams::StandbyThreadPolicy;
62 62
63 class TaskSchedulerWorkerPoolImplTest 63 class TaskSchedulerWorkerPoolImplTest
64 : public testing::TestWithParam<test::ExecutionMode> { 64 : public testing::TestWithParam<test::ExecutionMode> {
65 protected: 65 protected:
66 TaskSchedulerWorkerPoolImplTest() 66 TaskSchedulerWorkerPoolImplTest()
67 : service_thread_("TaskSchedulerServiceThread") {} 67 : service_thread_("TaskSchedulerServiceThread") {}
68 68
69 void SetUp() override { 69 void SetUp() override {
70 InitializeWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool); 70 if (GetParam() == test::ExecutionMode::SINGLE_THREADED)
71 InitializeSingleThreadWorkerPool(TimeDelta::Max());
72 else
73 InitializeWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
71 } 74 }
72 75
73 void TearDown() override { 76 void TearDown() override {
74 service_thread_.Stop(); 77 service_thread_.Stop();
75 worker_pool_->WaitForAllWorkersIdleForTesting(); 78 worker_pool_->WaitForAllWorkersIdleForTesting();
76 worker_pool_->JoinForTesting(); 79 worker_pool_->JoinForTesting();
77 } 80 }
78 81
79 void InitializeWorkerPool(TimeDelta suggested_reclaim_time, 82 void InitializeWorkerPool(TimeDelta suggested_reclaim_time,
80 size_t num_workers) { 83 size_t num_workers) {
81 ASSERT_FALSE(worker_pool_); 84 ASSERT_FALSE(worker_pool_);
82 ASSERT_FALSE(delayed_task_manager_); 85 ASSERT_FALSE(delayed_task_manager_);
83 service_thread_.Start(); 86 service_thread_.Start();
84 delayed_task_manager_ = 87 delayed_task_manager_ =
85 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner()); 88 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner());
86 worker_pool_ = SchedulerWorkerPoolImpl::Create( 89 worker_pool_ = SchedulerWorkerPoolImpl::Create(
87 SchedulerWorkerPoolParams("TestWorkerPool", ThreadPriority::NORMAL, 90 SchedulerWorkerPoolParams("TestWorkerPool", ThreadPriority::NORMAL,
88 StandbyThreadPolicy::LAZY, num_workers, 91 StandbyThreadPolicy::LAZY, num_workers,
89 suggested_reclaim_time), 92 suggested_reclaim_time),
90 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, 93 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback,
91 Unretained(this)), 94 Unretained(this)),
92 &task_tracker_, delayed_task_manager_.get()); 95 &task_tracker_, delayed_task_manager_.get());
93 ASSERT_TRUE(worker_pool_); 96 ASSERT_TRUE(worker_pool_);
94 } 97 }
95 98
99 void InitializeSingleThreadWorkerPool(TimeDelta suggested_reclaim_time) {
100 ASSERT_FALSE(worker_pool_);
101 ASSERT_FALSE(delayed_task_manager_);
102 service_thread_.Start();
103 delayed_task_manager_ =
104 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner());
105 worker_pool_ = SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
106 SchedulerWorkerPoolParams("TestWorkerPool", ThreadPriority::NORMAL,
107 StandbyThreadPolicy::LAZY, 1,
108 suggested_reclaim_time),
109 Bind(&TaskSchedulerWorkerPoolImplTest::
110 UnregisterSingleThreadWorkerPoolCallback,
111 Unretained(this)),
112 &task_tracker_, delayed_task_manager_.get());
113 ASSERT_TRUE(worker_pool_);
114 }
115
96 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; 116 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
97 117
98 TaskTracker task_tracker_; 118 TaskTracker task_tracker_;
99 Thread service_thread_; 119 Thread service_thread_;
100 std::unique_ptr<DelayedTaskManager> delayed_task_manager_; 120 std::unique_ptr<DelayedTaskManager> delayed_task_manager_;
101 121
102 private: 122 private:
103 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { 123 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) {
104 // In production code, this callback would be implemented by the 124 // In production code, this callback would be implemented by the
105 // TaskScheduler which would first determine which PriorityQueue the 125 // TaskScheduler which would first determine which PriorityQueue the
106 // sequence must be re-enqueued. 126 // sequence must be re-enqueued.
107 const SequenceSortKey sort_key(sequence->GetSortKey()); 127 const SequenceSortKey sort_key(sequence->GetSortKey());
108 worker_pool_->ReEnqueueSequence(std::move(sequence), sort_key); 128 worker_pool_->ReEnqueueSequence(std::move(sequence), sort_key);
109 } 129 }
110 130
131 void UnregisterSingleThreadWorkerPoolCallback(
132 const SchedulerWorkerPoolImpl* scheduler_worker_pool) {
133 EXPECT_EQ(worker_pool_.get(), scheduler_worker_pool);
134 }
135
111 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTest); 136 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTest);
112 }; 137 };
113 138
114 scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode( 139 scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode(
115 SchedulerWorkerPoolImpl* worker_pool, 140 SchedulerWorkerPoolImpl* worker_pool,
116 test::ExecutionMode execution_mode) { 141 test::ExecutionMode execution_mode) {
117 // Allow tasks posted to the returned TaskRunner to wait on a WaitableEvent. 142 // Allow tasks posted to the returned TaskRunner to wait on a WaitableEvent.
118 const TaskTraits traits = TaskTraits().WithBaseSyncPrimitives(); 143 const TaskTraits traits = TaskTraits().WithBaseSyncPrimitives();
119 switch (execution_mode) { 144 switch (execution_mode) {
120 case test::ExecutionMode::PARALLEL: 145 case test::ExecutionMode::PARALLEL:
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
250 275
251 // Wait until all workers are idle to be sure that no task accesses its 276 // Wait until all workers are idle to be sure that no task accesses its
252 // TestTaskFactory after |thread_posting_tasks| is destroyed. 277 // TestTaskFactory after |thread_posting_tasks| is destroyed.
253 worker_pool_->WaitForAllWorkersIdleForTesting(); 278 worker_pool_->WaitForAllWorkersIdleForTesting();
254 } 279 }
255 280
256 TEST_P(TaskSchedulerWorkerPoolImplTest, PostTasksWithOneAvailableWorker) { 281 TEST_P(TaskSchedulerWorkerPoolImplTest, PostTasksWithOneAvailableWorker) {
257 // Post blocking tasks to keep all workers busy except one until |event| is 282 // Post blocking tasks to keep all workers busy except one until |event| is
258 // signaled. Use different factories so that tasks are added to different 283 // signaled. Use different factories so that tasks are added to different
259 // sequences and can run simultaneously when the execution mode is SEQUENCED. 284 // sequences and can run simultaneously when the execution mode is SEQUENCED.
285 const size_t num_workers_in_pool =
286 (GetParam() == test::ExecutionMode::SINGLE_THREADED)
287 ? 1
288 : kNumWorkersInWorkerPool;
260 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, 289 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
261 WaitableEvent::InitialState::NOT_SIGNALED); 290 WaitableEvent::InitialState::NOT_SIGNALED);
262 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories; 291 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
263 for (size_t i = 0; i < (kNumWorkersInWorkerPool - 1); ++i) { 292 for (size_t i = 0; i < (num_workers_in_pool - 1); ++i) {
264 blocked_task_factories.push_back(MakeUnique<test::TestTaskFactory>( 293 blocked_task_factories.push_back(MakeUnique<test::TestTaskFactory>(
265 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()), 294 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
266 GetParam())); 295 GetParam()));
267 EXPECT_TRUE(blocked_task_factories.back()->PostTask( 296 EXPECT_TRUE(blocked_task_factories.back()->PostTask(
268 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event)))); 297 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
269 blocked_task_factories.back()->WaitForAllTasksToRun(); 298 blocked_task_factories.back()->WaitForAllTasksToRun();
270 } 299 }
271 300
272 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact 301 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
273 // that only one worker in |worker_pool_| isn't busy. 302 // that only one worker in |worker_pool_| isn't busy.
(...skipping 10 matching lines...) Expand all
284 // Wait until all workers are idle to be sure that no task accesses 313 // Wait until all workers are idle to be sure that no task accesses
285 // its TestTaskFactory after it is destroyed. 314 // its TestTaskFactory after it is destroyed.
286 worker_pool_->WaitForAllWorkersIdleForTesting(); 315 worker_pool_->WaitForAllWorkersIdleForTesting();
287 } 316 }
288 317
289 TEST_P(TaskSchedulerWorkerPoolImplTest, Saturate) { 318 TEST_P(TaskSchedulerWorkerPoolImplTest, Saturate) {
290 // Verify that it is possible to have |kNumWorkersInWorkerPool| 319 // Verify that it is possible to have |kNumWorkersInWorkerPool|
291 // tasks/sequences running simultaneously. Use different factories so that the 320 // tasks/sequences running simultaneously. Use different factories so that the
292 // blocking tasks are added to different sequences and can run simultaneously 321 // blocking tasks are added to different sequences and can run simultaneously
293 // when the execution mode is SEQUENCED. 322 // when the execution mode is SEQUENCED.
323 const size_t num_workers_in_pool =
324 (GetParam() == test::ExecutionMode::SINGLE_THREADED)
325 ? 1
326 : kNumWorkersInWorkerPool;
294 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, 327 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
295 WaitableEvent::InitialState::NOT_SIGNALED); 328 WaitableEvent::InitialState::NOT_SIGNALED);
296 std::vector<std::unique_ptr<test::TestTaskFactory>> factories; 329 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
297 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { 330 for (size_t i = 0; i < num_workers_in_pool; ++i) {
298 factories.push_back(MakeUnique<test::TestTaskFactory>( 331 factories.push_back(MakeUnique<test::TestTaskFactory>(
299 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()), 332 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
300 GetParam())); 333 GetParam()));
301 EXPECT_TRUE(factories.back()->PostTask( 334 EXPECT_TRUE(factories.back()->PostTask(
302 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event)))); 335 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
303 factories.back()->WaitForAllTasksToRun(); 336 factories.back()->WaitForAllTasksToRun();
304 } 337 }
305 338
306 // Release tasks waiting on |event|. 339 // Release tasks waiting on |event|.
307 event.Signal(); 340 event.Signal();
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
381 namespace { 414 namespace {
382 415
383 // Same as TaskSchedulerWorkerPoolImplTest but its SchedulerWorkerPoolImpl 416 // Same as TaskSchedulerWorkerPoolImplTest but its SchedulerWorkerPoolImpl
384 // instance uses |max_threads == 1|. 417 // instance uses |max_threads == 1|.
385 class TaskSchedulerWorkerPoolImplSingleWorkerTest 418 class TaskSchedulerWorkerPoolImplSingleWorkerTest
386 : public TaskSchedulerWorkerPoolImplTest { 419 : public TaskSchedulerWorkerPoolImplTest {
387 public: 420 public:
388 TaskSchedulerWorkerPoolImplSingleWorkerTest() = default; 421 TaskSchedulerWorkerPoolImplSingleWorkerTest() = default;
389 422
390 protected: 423 protected:
391 void SetUp() override { 424 void SetUp() override { InitializeSingleThreadWorkerPool(TimeDelta::Max()); }
392 InitializeWorkerPool(TimeDelta::Max(), 1);
393 }
394 425
395 private: 426 private:
396 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplSingleWorkerTest); 427 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplSingleWorkerTest);
397 }; 428 };
398 429
399 } // namespace 430 } // namespace
400 431
401 // Verify that the RunsTasksOnCurrentThread() method of a 432 // Verify that the RunsTasksOnCurrentThread() method of a
402 // SchedulerSingleThreadTaskRunner returns false when called from a task that 433 // SchedulerSingleThreadTaskRunner returns false when called from a task that
403 // isn't part of its sequence even though it's running on that 434 // isn't part of its sequence even though it's running on that
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
445 void InitializeThreadChecker() { 476 void InitializeThreadChecker() {
446 thread_checker_.reset(new ThreadCheckerImpl()); 477 thread_checker_.reset(new ThreadCheckerImpl());
447 } 478 }
448 479
449 void CheckValidThread() { 480 void CheckValidThread() {
450 EXPECT_TRUE(thread_checker_->CalledOnValidThread()); 481 EXPECT_TRUE(thread_checker_->CalledOnValidThread());
451 } 482 }
452 483
453 protected: 484 protected:
454 void SetUp() override { 485 void SetUp() override {
455 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); 486 InitializeSingleThreadWorkerPool(kReclaimTimeForDetachTests);
456 } 487 }
457 488
458 TaskSchedulerWorkerPoolSingleThreadedTest() = default; 489 TaskSchedulerWorkerPoolSingleThreadedTest() = default;
459 490
460 private: 491 private:
461 std::unique_ptr<ThreadCheckerImpl> thread_checker_; 492 std::unique_ptr<ThreadCheckerImpl> thread_checker_;
462 493
463 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest); 494 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest);
464 }; 495 };
465 496
(...skipping 265 matching lines...) Expand 10 before | Expand all | Expand 10 after
731 *thread_id = PlatformThread::CurrentId(); 762 *thread_id = PlatformThread::CurrentId();
732 } 763 }
733 764
734 void VerifyThreadIdIsNot(PlatformThreadId thread_id) { 765 void VerifyThreadIdIsNot(PlatformThreadId thread_id) {
735 EXPECT_NE(thread_id, PlatformThread::CurrentId()); 766 EXPECT_NE(thread_id, PlatformThread::CurrentId());
736 } 767 }
737 768
738 } // namespace 769 } // namespace
739 770
740 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeDetach) { 771 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeDetach) {
741 InitializeWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool); 772 InitializeSingleThreadWorkerPool(kReclaimTimeForDetachTests);
742 773
743 // This test assumes that the TaskRunners aren't assigned to the same worker. 774 // This test assumes that the TaskRunners aren't assigned to the same worker.
744 auto task_runner = 775 auto task_runner =
745 worker_pool_->CreateSingleThreadTaskRunnerWithTraits(TaskTraits()); 776 worker_pool_->CreateSingleThreadTaskRunnerWithTraits(TaskTraits());
746 auto other_task_runner =
747 worker_pool_->CreateSingleThreadTaskRunnerWithTraits(TaskTraits());
748 777
749 // Post 3 tasks and wait until they run. 778 // Post 3 tasks and wait until they run.
750 PlatformThreadId thread_id; 779 PlatformThreadId thread_id;
751 task_runner->PostTask(FROM_HERE, 780 task_runner->PostTask(FROM_HERE,
752 Bind(&CaptureThreadId, Unretained(&thread_id))); 781 Bind(&CaptureThreadId, Unretained(&thread_id)));
753 task_runner->PostTask(FROM_HERE, Bind(&DoNothing)); 782 task_runner->PostTask(FROM_HERE, Bind(&DoNothing));
754 task_runner->PostTask(FROM_HERE, Bind(&DoNothing)); 783 task_runner->PostTask(FROM_HERE, Bind(&DoNothing));
755 worker_pool_->WaitForAllWorkersIdleForTesting(); 784 worker_pool_->WaitForAllWorkersIdleForTesting();
756 785
757 // To allow the SchedulerWorker associated with |task_runner| to detach: 786 // To allow the SchedulerWorker associated with |task_runner| to detach:
758 // - Make sure it isn't on top of the idle stack by waking up another 787 // - Make sure it isn't on top of the idle stack by waking up another
759 // SchedulerWorker and waiting until it goes back to sleep. 788 // SchedulerWorker and waiting until it goes back to sleep.
760 // - Release |task_runner|. 789 // - Release |task_runner|.
761 other_task_runner->PostTask(FROM_HERE, Bind(&VerifyThreadIdIsNot, thread_id));
762 worker_pool_->WaitForAllWorkersIdleForTesting(); 790 worker_pool_->WaitForAllWorkersIdleForTesting();
763 task_runner = nullptr; 791 task_runner = nullptr;
764 792
765 // Allow the SchedulerWorker that was associated with |task_runner| to detach. 793 // Allow the SchedulerWorker that was associated with |task_runner| to detach.
766 PlatformThread::Sleep(kReclaimTimeForDetachTests + kExtraTimeToWaitForDetach); 794 PlatformThread::Sleep(kReclaimTimeForDetachTests + kExtraTimeToWaitForDetach);
767 worker_pool_->DisallowWorkerDetachmentForTesting(); 795 worker_pool_->DisallowWorkerDetachmentForTesting();
768 796
769 // Verify that counts were recorded to the histogram as expected. 797 // Verify that counts were recorded to the histogram as expected.
770 const auto* histogram = worker_pool_->num_tasks_before_detach_histogram(); 798 const auto* histogram = worker_pool_->num_tasks_before_detach_histogram();
771 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0)); 799 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
(...skipping 25 matching lines...) Expand all
797 StandbyThreadPolicy::ONE, 8U, TimeDelta::Max()), 825 StandbyThreadPolicy::ONE, 8U, TimeDelta::Max()),
798 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, 826 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker,
799 &delayed_task_manager); 827 &delayed_task_manager);
800 ASSERT_TRUE(worker_pool); 828 ASSERT_TRUE(worker_pool);
801 EXPECT_EQ(1U, worker_pool->NumberOfAliveWorkersForTesting()); 829 EXPECT_EQ(1U, worker_pool->NumberOfAliveWorkersForTesting());
802 worker_pool->JoinForTesting(); 830 worker_pool->JoinForTesting();
803 } 831 }
804 832
805 } // namespace internal 833 } // namespace internal
806 } // namespace base 834 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_worker_pool_impl.cc ('k') | base/task_scheduler/scheduler_worker_pool_params.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698