| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_thread_pool.h" | 5 #include "base/task_scheduler/scheduler_thread_pool.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <memory> | 9 #include <memory> |
| 10 #include <unordered_set> | 10 #include <unordered_set> |
| 11 #include <vector> | 11 #include <vector> |
| 12 | 12 |
| 13 #include "base/bind.h" | 13 #include "base/bind.h" |
| 14 #include "base/bind_helpers.h" | 14 #include "base/bind_helpers.h" |
| 15 #include "base/macros.h" | 15 #include "base/macros.h" |
| 16 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
| 17 #include "base/memory/ref_counted.h" | 17 #include "base/memory/ref_counted.h" |
| 18 #include "base/synchronization/condition_variable.h" | 18 #include "base/synchronization/condition_variable.h" |
| 19 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
| 20 #include "base/synchronization/waitable_event.h" | 20 #include "base/synchronization/waitable_event.h" |
| 21 #include "base/task_runner.h" | 21 #include "base/task_runner.h" |
| 22 #include "base/task_scheduler/delayed_task_manager.h" | 22 #include "base/task_scheduler/delayed_task_manager.h" |
| 23 #include "base/task_scheduler/sequence.h" | 23 #include "base/task_scheduler/sequence.h" |
| 24 #include "base/task_scheduler/sequence_sort_key.h" | 24 #include "base/task_scheduler/sequence_sort_key.h" |
| 25 #include "base/task_scheduler/task_tracker.h" | 25 #include "base/task_scheduler/task_tracker.h" |
| 26 #include "base/threading/platform_thread.h" | 26 #include "base/threading/platform_thread.h" |
| 27 #include "base/threading/simple_thread.h" | 27 #include "base/threading/simple_thread.h" |
| 28 #include "base/threading/thread_checker_impl.h" |
| 28 #include "testing/gtest/include/gtest/gtest.h" | 29 #include "testing/gtest/include/gtest/gtest.h" |
| 29 | 30 |
| 30 namespace base { | 31 namespace base { |
| 31 namespace internal { | 32 namespace internal { |
| 32 namespace { | 33 namespace { |
| 33 | 34 |
| 34 const size_t kNumThreadsInThreadPool = 4; | 35 const size_t kNumThreadsInThreadPool = 4; |
| 35 const size_t kNumThreadsPostingTasks = 4; | 36 const size_t kNumThreadsPostingTasks = 4; |
| 36 const size_t kNumTasksPostedPerThread = 150; | 37 const size_t kNumTasksPostedPerThread = 150; |
| 37 | 38 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 72 }; | 73 }; |
| 73 | 74 |
| 74 class TaskFactory { | 75 class TaskFactory { |
| 75 public: | 76 public: |
| 76 // Constructs a TaskFactory that posts tasks with |execution_mode| to | 77 // Constructs a TaskFactory that posts tasks with |execution_mode| to |
| 77 // |thread_pool|. | 78 // |thread_pool|. |
| 78 TaskFactory(SchedulerThreadPool* thread_pool, ExecutionMode execution_mode) | 79 TaskFactory(SchedulerThreadPool* thread_pool, ExecutionMode execution_mode) |
| 79 : cv_(&lock_), | 80 : cv_(&lock_), |
| 80 task_runner_(thread_pool->CreateTaskRunnerWithTraits(TaskTraits(), | 81 task_runner_(thread_pool->CreateTaskRunnerWithTraits(TaskTraits(), |
| 81 execution_mode)), | 82 execution_mode)), |
| 82 execution_mode_(execution_mode) {} | 83 execution_mode_(execution_mode) { |
| 84 // Detach |thread_checker_| from the current thread. It will be attached to |
| 85 // the first thread that calls ThreadCheckerImpl::CalledOnValidThread(). |
| 86 thread_checker_.DetachFromThread(); |
| 87 } |
| 83 | 88 |
| 84 // Posts a task through |task_runner_|. If |post_nested_task| is true, the | 89 // Posts a task through |task_runner_|. If |post_nested_task| is true, the |
| 85 // task will post a new task when it runs. If |event| is set, the task will | 90 // task will post a new task when it runs. If |event| is set, the task will |
| 86 // block until it is signaled. | 91 // block until it is signaled. |
| 87 void PostTestTask(bool post_nested_task, WaitableEvent* event) { | 92 void PostTestTask(bool post_nested_task, WaitableEvent* event) { |
| 88 AutoLock auto_lock(lock_); | 93 AutoLock auto_lock(lock_); |
| 89 EXPECT_TRUE(task_runner_->PostTask( | 94 EXPECT_TRUE(task_runner_->PostTask( |
| 90 FROM_HERE, | 95 FROM_HERE, |
| 91 Bind(&TaskFactory::RunTaskCallback, Unretained(this), | 96 Bind(&TaskFactory::RunTaskCallback, Unretained(this), |
| 92 num_created_tasks_++, post_nested_task, Unretained(event)))); | 97 num_created_tasks_++, post_nested_task, Unretained(event)))); |
| (...skipping 19 matching lines...) Expand all Loading... |
| 112 bool post_nested_task, | 117 bool post_nested_task, |
| 113 WaitableEvent* event) { | 118 WaitableEvent* event) { |
| 114 if (post_nested_task) | 119 if (post_nested_task) |
| 115 PostTestTask(false, nullptr); | 120 PostTestTask(false, nullptr); |
| 116 | 121 |
| 117 EXPECT_TRUE(task_runner_->RunsTasksOnCurrentThread()); | 122 EXPECT_TRUE(task_runner_->RunsTasksOnCurrentThread()); |
| 118 | 123 |
| 119 { | 124 { |
| 120 AutoLock auto_lock(lock_); | 125 AutoLock auto_lock(lock_); |
| 121 | 126 |
| 122 if (execution_mode_ == ExecutionMode::SEQUENCED && | 127 if ((execution_mode_ == ExecutionMode::SEQUENCED || |
| 128 execution_mode_ == ExecutionMode::SINGLE_THREADED) && |
| 123 task_index != ran_tasks_.size()) { | 129 task_index != ran_tasks_.size()) { |
| 124 ADD_FAILURE() << "A SEQUENCED task didn't run in the expected order."; | 130 ADD_FAILURE() << "A task didn't run in the expected order."; |
| 125 } | 131 } |
| 126 | 132 |
| 133 if (execution_mode_ == ExecutionMode::SINGLE_THREADED) |
| 134 EXPECT_TRUE(thread_checker_.CalledOnValidThread()); |
| 135 |
| 127 if (ran_tasks_.find(task_index) != ran_tasks_.end()) | 136 if (ran_tasks_.find(task_index) != ran_tasks_.end()) |
| 128 ADD_FAILURE() << "A task ran more than once."; | 137 ADD_FAILURE() << "A task ran more than once."; |
| 129 ran_tasks_.insert(task_index); | 138 ran_tasks_.insert(task_index); |
| 130 | 139 |
| 131 cv_.Signal(); | 140 cv_.Signal(); |
| 132 } | 141 } |
| 133 | 142 |
| 134 if (event) | 143 if (event) |
| 135 event->Wait(); | 144 event->Wait(); |
| 136 } | 145 } |
| 137 | 146 |
| 138 // Synchronizes access to all members below. | 147 // Synchronizes access to all members below. |
| 139 mutable Lock lock_; | 148 mutable Lock lock_; |
| 140 | 149 |
| 141 // Condition variable signaled when a task runs. | 150 // Condition variable signaled when a task runs. |
| 142 mutable ConditionVariable cv_; | 151 mutable ConditionVariable cv_; |
| 143 | 152 |
| 144 // Task runner through which this factory posts tasks. | 153 // Task runner through which this factory posts tasks. |
| 145 const scoped_refptr<TaskRunner> task_runner_; | 154 const scoped_refptr<TaskRunner> task_runner_; |
| 146 | 155 |
| 147 // Execution mode of |task_runner_|. | 156 // Execution mode of |task_runner_|. |
| 148 const ExecutionMode execution_mode_; | 157 const ExecutionMode execution_mode_; |
| 149 | 158 |
| 150 // Number of tasks posted by PostTestTask(). | 159 // Number of tasks posted by PostTestTask(). |
| 151 size_t num_created_tasks_ = 0; | 160 size_t num_created_tasks_ = 0; |
| 152 | 161 |
| 153 // Indexes of tasks that ran. | 162 // Indexes of tasks that ran. |
| 154 std::unordered_set<size_t> ran_tasks_; | 163 std::unordered_set<size_t> ran_tasks_; |
| 155 | 164 |
| 165 // Used to verify that all tasks run on the same thread when |execution_mode_| |
| 166 // is SINGLE_THREADED. |
| 167 ThreadCheckerImpl thread_checker_; |
| 168 |
| 156 DISALLOW_COPY_AND_ASSIGN(TaskFactory); | 169 DISALLOW_COPY_AND_ASSIGN(TaskFactory); |
| 157 }; | 170 }; |
| 158 | 171 |
| 159 class ThreadPostingTasks : public SimpleThread { | 172 class ThreadPostingTasks : public SimpleThread { |
| 160 public: | 173 public: |
| 161 // Constructs a thread that posts tasks to |thread_pool| through an | 174 // Constructs a thread that posts tasks to |thread_pool| through an |
| 162 // |execution_mode| task runner. If |wait_for_all_threads_idle| is true, the | 175 // |execution_mode| task runner. If |wait_for_all_threads_idle| is true, the |
| 163 // thread wait until all worker threads in |thread_pool| are idle before | 176 // thread wait until all worker threads in |thread_pool| are idle before |
| 164 // posting a new task. If |post_nested_task| is true, each task posted by this | 177 // posting a new task. If |post_nested_task| is true, each task posted by this |
| 165 // thread posts another task when it runs. | 178 // thread posts another task when it runs. |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 270 thread_posting_tasks->factory()->NumRunTasks()); | 283 thread_posting_tasks->factory()->NumRunTasks()); |
| 271 } | 284 } |
| 272 | 285 |
| 273 // Wait until all worker threads are idle to be sure that no task accesses | 286 // Wait until all worker threads are idle to be sure that no task accesses |
| 274 // its TaskFactory after |thread_posting_tasks| is destroyed. | 287 // its TaskFactory after |thread_posting_tasks| is destroyed. |
| 275 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | 288 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); |
| 276 } | 289 } |
| 277 | 290 |
| 278 TEST_P(TaskSchedulerThreadPoolTest, PostTasksWithOneAvailableThread) { | 291 TEST_P(TaskSchedulerThreadPoolTest, PostTasksWithOneAvailableThread) { |
| 279 // Post tasks to keep all threads busy except one until |event| is signaled. | 292 // Post tasks to keep all threads busy except one until |event| is signaled. |
| 280 // Use different factories so that tasks are added to different sequences and | 293 // Use different factories so that tasks are assigned to different sequences |
| 281 // can run simultaneously when the execution mode is SEQUENCED. | 294 // or threads when the execution mode is SEQUENCED or SINGLE_THREADED. |
| 282 WaitableEvent event(true, false); | 295 WaitableEvent event(true, false); |
| 283 std::vector<std::unique_ptr<TaskFactory>> blocked_task_factories; | 296 std::vector<std::unique_ptr<TaskFactory>> blocked_task_factories; |
| 284 for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) { | 297 for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) { |
| 285 blocked_task_factories.push_back( | 298 blocked_task_factories.push_back( |
| 286 WrapUnique(new TaskFactory(thread_pool_.get(), GetParam()))); | 299 WrapUnique(new TaskFactory(thread_pool_.get(), GetParam()))); |
| 287 blocked_task_factories.back()->PostTestTask(false, &event); | 300 blocked_task_factories.back()->PostTestTask(false, &event); |
| 288 blocked_task_factories.back()->WaitForAllTasksToRun(); | 301 blocked_task_factories.back()->WaitForAllTasksToRun(); |
| 289 } | 302 } |
| 290 | 303 |
| 291 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact | 304 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact |
| 292 // that only one thread in |thread_pool_| isn't busy. | 305 // that only one thread in |thread_pool_| isn't busy. |
| 293 TaskFactory short_task_factory(thread_pool_.get(), GetParam()); | 306 TaskFactory short_task_factory(thread_pool_.get(), GetParam()); |
| 294 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) | 307 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) |
| 295 short_task_factory.PostTestTask(false, nullptr); | 308 short_task_factory.PostTestTask(false, nullptr); |
| 296 short_task_factory.WaitForAllTasksToRun(); | 309 short_task_factory.WaitForAllTasksToRun(); |
| 297 | 310 |
| 298 // Release tasks waiting on |event|. | 311 // Release tasks waiting on |event|. |
| 299 event.Signal(); | 312 event.Signal(); |
| 300 | 313 |
| 301 // Wait until all worker threads are idle to be sure that no task accesses | 314 // Wait until all worker threads are idle to be sure that no task accesses |
| 302 // its TaskFactory after it is destroyed. | 315 // its TaskFactory after it is destroyed. |
| 303 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | 316 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); |
| 304 } | 317 } |
| 305 | 318 |
| 306 TEST_P(TaskSchedulerThreadPoolTest, Saturate) { | 319 TEST_P(TaskSchedulerThreadPoolTest, Saturate) { |
| 307 // Verify that it is possible to have |kNumThreadsInThreadPool| | 320 // Verify that it is possible to have |kNumThreadsInThreadPool| |
| 308 // tasks/sequences running simultaneously. Use different factories so that | 321 // tasks/sequences running simultaneously. Use different factories so that |
| 309 // tasks are added to different sequences and can run simultaneously when the | 322 // tasks are assigned to different sequences or threads when the execution |
| 310 // execution mode is SEQUENCED. | 323 // mode is SEQUENCED or SINGLE_THREADED. |
| 311 WaitableEvent event(true, false); | 324 WaitableEvent event(true, false); |
| 312 std::vector<std::unique_ptr<TaskFactory>> factories; | 325 std::vector<std::unique_ptr<TaskFactory>> factories; |
| 313 for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) { | 326 for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) { |
| 314 factories.push_back( | 327 factories.push_back( |
| 315 WrapUnique(new TaskFactory(thread_pool_.get(), GetParam()))); | 328 WrapUnique(new TaskFactory(thread_pool_.get(), GetParam()))); |
| 316 factories.back()->PostTestTask(false, &event); | 329 factories.back()->PostTestTask(false, &event); |
| 317 factories.back()->WaitForAllTasksToRun(); | 330 factories.back()->WaitForAllTasksToRun(); |
| 318 } | 331 } |
| 319 | 332 |
| 320 // Release tasks waiting on |event|. | 333 // Release tasks waiting on |event|. |
| 321 event.Signal(); | 334 event.Signal(); |
| 322 | 335 |
| 323 // Wait until all worker threads are idle to be sure that no task accesses | 336 // Wait until all worker threads are idle to be sure that no task accesses |
| 324 // its TaskFactory after it is destroyed. | 337 // its TaskFactory after it is destroyed. |
| 325 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | 338 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); |
| 326 } | 339 } |
| 327 | 340 |
| 328 INSTANTIATE_TEST_CASE_P(Parallel, | 341 INSTANTIATE_TEST_CASE_P(Parallel, |
| 329 TaskSchedulerThreadPoolTest, | 342 TaskSchedulerThreadPoolTest, |
| 330 ::testing::Values(ExecutionMode::PARALLEL)); | 343 ::testing::Values(ExecutionMode::PARALLEL)); |
| 331 INSTANTIATE_TEST_CASE_P(Sequenced, | 344 INSTANTIATE_TEST_CASE_P(Sequenced, |
| 332 TaskSchedulerThreadPoolTest, | 345 TaskSchedulerThreadPoolTest, |
| 333 ::testing::Values(ExecutionMode::SEQUENCED)); | 346 ::testing::Values(ExecutionMode::SEQUENCED)); |
| 347 INSTANTIATE_TEST_CASE_P(SingleThreaded, |
| 348 TaskSchedulerThreadPoolTest, |
| 349 ::testing::Values(ExecutionMode::SINGLE_THREADED)); |
| 334 | 350 |
| 335 } // namespace | 351 } // namespace |
| 336 } // namespace internal | 352 } // namespace internal |
| 337 } // namespace base | 353 } // namespace base |
| OLD | NEW |