Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <memory> | 9 #include <memory> |
| 10 #include <unordered_set> | 10 #include <unordered_set> |
| 11 #include <vector> | 11 #include <vector> |
| 12 | 12 |
| 13 #include "base/atomicops.h" | |
| 13 #include "base/bind.h" | 14 #include "base/bind.h" |
| 14 #include "base/bind_helpers.h" | 15 #include "base/bind_helpers.h" |
| 15 #include "base/callback.h" | 16 #include "base/callback.h" |
| 16 #include "base/macros.h" | 17 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
| 18 #include "base/memory/ref_counted.h" | 19 #include "base/memory/ref_counted.h" |
| 19 #include "base/synchronization/condition_variable.h" | 20 #include "base/synchronization/condition_variable.h" |
| 20 #include "base/synchronization/lock.h" | 21 #include "base/synchronization/lock.h" |
| 21 #include "base/synchronization/waitable_event.h" | 22 #include "base/synchronization/waitable_event.h" |
| 22 #include "base/task_runner.h" | 23 #include "base/task_runner.h" |
| 23 #include "base/task_scheduler/delayed_task_manager.h" | 24 #include "base/task_scheduler/delayed_task_manager.h" |
| 24 #include "base/task_scheduler/scheduler_worker_pool_params.h" | 25 #include "base/task_scheduler/scheduler_worker_pool_params.h" |
| 25 #include "base/task_scheduler/sequence.h" | 26 #include "base/task_scheduler/sequence.h" |
| 26 #include "base/task_scheduler/sequence_sort_key.h" | 27 #include "base/task_scheduler/sequence_sort_key.h" |
| 27 #include "base/task_scheduler/task_tracker.h" | 28 #include "base/task_scheduler/task_tracker.h" |
| 28 #include "base/task_scheduler/test_task_factory.h" | 29 #include "base/task_scheduler/test_task_factory.h" |
| 29 #include "base/task_scheduler/test_utils.h" | 30 #include "base/task_scheduler/test_utils.h" |
| 30 #include "base/threading/platform_thread.h" | 31 #include "base/threading/platform_thread.h" |
| 31 #include "base/threading/simple_thread.h" | 32 #include "base/threading/simple_thread.h" |
| 33 #include "base/threading/thread_checker_impl.h" | |
| 34 #include "base/threading/thread_local_storage.h" | |
| 32 #include "base/threading/thread_restrictions.h" | 35 #include "base/threading/thread_restrictions.h" |
| 36 #include "base/time/time.h" | |
| 33 #include "testing/gtest/include/gtest/gtest.h" | 37 #include "testing/gtest/include/gtest/gtest.h" |
| 34 | 38 |
| 35 namespace base { | 39 namespace base { |
| 36 namespace internal { | 40 namespace internal { |
| 37 namespace { | 41 namespace { |
| 38 | 42 |
| 39 const size_t kNumWorkersInWorkerPool = 4; | 43 const size_t kNumWorkersInWorkerPool = 4; |
| 40 const size_t kNumThreadsPostingTasks = 4; | 44 const size_t kNumThreadsPostingTasks = 4; |
| 41 const size_t kNumTasksPostedPerThread = 150; | 45 const size_t kNumTasksPostedPerThread = 150; |
| 46 const TimeDelta kSuggestedReclaimTime = TimeDelta::FromMilliseconds(10); | |
|
gab
2016/07/20 01:45:21
"kSuggestedReclaimTimeForDetachTests" or something
gab
2016/07/20 01:45:21
constexpr
robliao
2016/07/20 19:44:01
Done.
| |
| 42 | 47 |
| 43 using IORestriction = SchedulerWorkerPoolParams::IORestriction; | 48 using IORestriction = SchedulerWorkerPoolParams::IORestriction; |
| 44 | 49 |
| 45 class TestDelayedTaskManager : public DelayedTaskManager { | 50 class TestDelayedTaskManager : public DelayedTaskManager { |
| 46 public: | 51 public: |
| 47 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} | 52 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} |
| 48 | 53 |
| 49 void SetCurrentTime(TimeTicks now) { now_ = now; } | 54 void SetCurrentTime(TimeTicks now) { now_ = now; } |
| 50 | 55 |
| 51 // DelayedTaskManager: | 56 // DelayedTaskManager: |
| 52 TimeTicks Now() const override { return now_; } | 57 TimeTicks Now() const override { return now_; } |
| 53 | 58 |
| 54 private: | 59 private: |
| 55 TimeTicks now_ = TimeTicks::Now(); | 60 TimeTicks now_ = TimeTicks::Now(); |
| 56 | 61 |
| 57 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); | 62 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); |
| 58 }; | 63 }; |
| 59 | 64 |
| 60 class TaskSchedulerWorkerPoolImplTest | 65 class TaskSchedulerWorkerPoolImplTest |
| 61 : public testing::TestWithParam<ExecutionMode> { | 66 : public testing::TestWithParam<ExecutionMode> { |
| 62 protected: | 67 protected: |
| 63 TaskSchedulerWorkerPoolImplTest() = default; | 68 TaskSchedulerWorkerPoolImplTest() = default; |
| 64 | 69 |
| 65 void SetUp() override { | 70 void SetUp() override { |
| 66 worker_pool_ = SchedulerWorkerPoolImpl::Create( | 71 InitializeWorkerPool(TimeDelta::Max()); |
| 67 SchedulerWorkerPoolParams("TestWorkerPoolWithFileIO", | 72 worker_pool_->DisallowWorkerDetachmentForTesting(); |
|
fdoray
2016/07/20 14:15:43
This is not required given that suggested reclaim
robliao
2016/07/20 19:44:01
Yep. Done.
| |
| 68 ThreadPriority::NORMAL, | |
| 69 IORestriction::ALLOWED, | |
| 70 kNumWorkersInWorkerPool), | |
| 71 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | |
| 72 Unretained(this)), | |
| 73 &task_tracker_, &delayed_task_manager_); | |
| 74 ASSERT_TRUE(worker_pool_); | |
| 75 } | 73 } |
| 76 | 74 |
| 77 void TearDown() override { | 75 void TearDown() override { |
| 78 worker_pool_->WaitForAllWorkersIdleForTesting(); | 76 worker_pool_->WaitForAllWorkersIdleForTesting(); |
| 79 worker_pool_->JoinForTesting(); | 77 worker_pool_->JoinForTesting(); |
| 80 } | 78 } |
| 81 | 79 |
| 80 void InitializeWorkerPool(const TimeDelta& suggested_reclaim_time) { | |
| 81 worker_pool_ = SchedulerWorkerPoolImpl::Create( | |
| 82 SchedulerWorkerPoolParams("TestWorkerPoolWithFileIO", | |
| 83 ThreadPriority::NORMAL, | |
| 84 IORestriction::ALLOWED, | |
| 85 kNumWorkersInWorkerPool, | |
| 86 suggested_reclaim_time), | |
| 87 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | |
| 88 Unretained(this)), | |
| 89 &task_tracker_, &delayed_task_manager_); | |
| 90 ASSERT_TRUE(worker_pool_); | |
| 91 } | |
| 92 | |
| 82 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; | 93 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; |
| 83 | 94 |
| 84 TaskTracker task_tracker_; | 95 TaskTracker task_tracker_; |
| 85 TestDelayedTaskManager delayed_task_manager_; | 96 TestDelayedTaskManager delayed_task_manager_; |
| 86 | 97 |
| 87 private: | 98 private: |
| 88 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | 99 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { |
| 89 // In production code, this callback would be implemented by the | 100 // In production code, this callback would be implemented by the |
| 90 // TaskScheduler which would first determine which PriorityQueue the | 101 // TaskScheduler which would first determine which PriorityQueue the |
| 91 // sequence must be re-enqueued. | 102 // sequence must be re-enqueued. |
| (...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 360 }; | 371 }; |
| 361 | 372 |
| 362 } // namespace | 373 } // namespace |
| 363 | 374 |
| 364 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { | 375 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { |
| 365 TaskTracker task_tracker; | 376 TaskTracker task_tracker; |
| 366 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); | 377 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); |
| 367 | 378 |
| 368 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 379 auto worker_pool = SchedulerWorkerPoolImpl::Create( |
| 369 SchedulerWorkerPoolParams("TestWorkerPoolWithParam", | 380 SchedulerWorkerPoolParams("TestWorkerPoolWithParam", |
| 370 ThreadPriority::NORMAL, GetParam(), 1U), | 381 ThreadPriority::NORMAL, GetParam(), 1U, |
| 382 TimeDelta::Max()), | |
| 371 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 383 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, |
| 372 &delayed_task_manager); | 384 &delayed_task_manager); |
| 373 ASSERT_TRUE(worker_pool); | 385 ASSERT_TRUE(worker_pool); |
| 386 worker_pool->DisallowWorkerDetachmentForTesting(); | |
|
fdoray
2016/07/20 14:15:43
This is not required given that suggested reclaim
robliao
2016/07/20 19:44:01
Yep. Done.
| |
| 374 | 387 |
| 375 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | 388 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, |
| 376 WaitableEvent::InitialState::NOT_SIGNALED); | 389 WaitableEvent::InitialState::NOT_SIGNALED); |
| 377 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) | 390 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) |
| 378 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); | 391 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); |
| 379 task_ran.Wait(); | 392 task_ran.Wait(); |
| 380 | 393 |
| 381 worker_pool->JoinForTesting(); | 394 worker_pool->JoinForTesting(); |
| 382 } | 395 } |
| 383 | 396 |
| 384 INSTANTIATE_TEST_CASE_P(IOAllowed, | 397 INSTANTIATE_TEST_CASE_P(IOAllowed, |
| 385 TaskSchedulerWorkerPoolImplIORestrictionTest, | 398 TaskSchedulerWorkerPoolImplIORestrictionTest, |
| 386 ::testing::Values(IORestriction::ALLOWED)); | 399 ::testing::Values(IORestriction::ALLOWED)); |
| 387 INSTANTIATE_TEST_CASE_P(IODisallowed, | 400 INSTANTIATE_TEST_CASE_P(IODisallowed, |
| 388 TaskSchedulerWorkerPoolImplIORestrictionTest, | 401 TaskSchedulerWorkerPoolImplIORestrictionTest, |
| 389 ::testing::Values(IORestriction::DISALLOWED)); | 402 ::testing::Values(IORestriction::DISALLOWED)); |
| 390 | 403 |
| 404 namespace { | |
| 405 | |
| 406 class TaskSchedulerWorkerPoolSingleThreadedTest | |
| 407 : public TaskSchedulerWorkerPoolImplTest { | |
| 408 public: | |
| 409 void InitializeThreadChecker() { | |
| 410 thread_checker_.reset(new ThreadCheckerImpl()); | |
| 411 } | |
| 412 | |
| 413 void CheckValidThread() { | |
| 414 EXPECT_TRUE(thread_checker_->CalledOnValidThread()); | |
| 415 } | |
| 416 | |
| 417 protected: | |
| 418 void SetUp() override { | |
| 419 InitializeWorkerPool(kSuggestedReclaimTime); | |
| 420 } | |
| 421 | |
| 422 TaskSchedulerWorkerPoolSingleThreadedTest() = default; | |
| 423 | |
| 424 private: | |
| 425 std::unique_ptr<ThreadCheckerImpl> thread_checker_; | |
| 426 | |
| 427 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest); | |
| 428 }; | |
| 429 | |
| 430 } // namespace | |
| 431 | |
| 432 // Verify that thread resources for a single thread remain. | |
| 433 TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) { | |
| 434 auto single_thread_task_runner = | |
| 435 worker_pool_->CreateTaskRunnerWithTraits( | |
| 436 TaskTraits(). | |
| 437 WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN), | |
| 438 ExecutionMode::SINGLE_THREADED); | |
| 439 single_thread_task_runner->PostTask( | |
| 440 FROM_HERE, | |
| 441 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::InitializeThreadChecker, | |
| 442 Unretained(this))); | |
| 443 WaitableEvent task_waiter(WaitableEvent::ResetPolicy::AUTOMATIC, | |
| 444 WaitableEvent::InitialState::NOT_SIGNALED); | |
| 445 single_thread_task_runner->PostTask( | |
| 446 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); | |
| 447 task_waiter.Wait(); | |
| 448 worker_pool_->WaitForAllWorkersIdleForTesting(); | |
| 449 | |
| 450 // Give the worker pool a chance to reclaim its threads. | |
| 451 PlatformThread::Sleep( | |
| 452 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); | |
| 453 | |
| 454 worker_pool_->DisallowWorkerDetachmentForTesting(); | |
| 455 | |
| 456 single_thread_task_runner->PostTask( | |
| 457 FROM_HERE, | |
| 458 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckValidThread, | |
| 459 Unretained(this))); | |
| 460 single_thread_task_runner->PostTask( | |
| 461 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); | |
| 462 task_waiter.Wait(); | |
| 463 } | |
| 464 | |
| 465 namespace { | |
| 466 | |
| 467 constexpr size_t kMagicTlsValue = 42; | |
| 468 | |
| 469 class TaskSchedulerWorkerPoolCheckTlsReuse | |
| 470 : public TaskSchedulerWorkerPoolImplTest { | |
| 471 public: | |
| 472 void SetTlsValueAndWait() { | |
| 473 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); | |
| 474 waiter_.Wait(); | |
| 475 } | |
| 476 | |
| 477 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { | |
|
gab
2016/07/20 01:45:21
s/ZeroTlsValue/NoMagicTlsValue/ (here and below fo
robliao
2016/07/20 19:44:01
We're counting zero TLS values here so the name sh
| |
| 478 if (!slot_.Get()) | |
| 479 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); | |
| 480 | |
| 481 count_waiter->Signal(); | |
| 482 waiter_.Wait(); | |
| 483 } | |
| 484 | |
| 485 protected: | |
| 486 TaskSchedulerWorkerPoolCheckTlsReuse() : | |
| 487 waiter_(WaitableEvent::ResetPolicy::MANUAL, | |
| 488 WaitableEvent::InitialState::NOT_SIGNALED) {} | |
| 489 | |
| 490 void SetUp() override { | |
| 491 InitializeWorkerPool(kSuggestedReclaimTime); | |
| 492 } | |
| 493 | |
| 494 subtle::Atomic32 zero_tls_values_ = 0; | |
| 495 | |
| 496 WaitableEvent waiter_; | |
| 497 | |
| 498 private: | |
| 499 ThreadLocalStorage::Slot slot_; | |
| 500 | |
| 501 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); | |
| 502 }; | |
| 503 | |
| 504 } // namespace | |
| 505 | |
| 506 // Checks that at least one thread has detached by checking the TLS. | |
| 507 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) { | |
| 508 // Saturate the threads and mark each thread with a magic TLS value. | |
| 509 std::vector<std::unique_ptr<test::TestTaskFactory>> factories; | |
| 510 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { | |
| 511 factories.push_back(WrapUnique(new test::TestTaskFactory( | |
| 512 worker_pool_->CreateTaskRunnerWithTraits( | |
| 513 TaskTraits(), ExecutionMode::PARALLEL), | |
| 514 ExecutionMode::PARALLEL))); | |
| 515 ASSERT_TRUE(factories.back()->PostTask( | |
| 516 PostNestedTask::NO, | |
| 517 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait, | |
| 518 Unretained(this)))); | |
| 519 factories.back()->WaitForAllTasksToRun(); | |
| 520 } | |
| 521 | |
| 522 // Release tasks waiting on |waiter_|. | |
| 523 waiter_.Signal(); | |
| 524 worker_pool_->WaitForAllWorkersIdleForTesting(); | |
| 525 | |
| 526 // All threads should be done running by now, so reset for the next phase. | |
| 527 waiter_.Reset(); | |
| 528 | |
| 529 // Give the worker pool a chance to detach its threads. | |
| 530 PlatformThread::Sleep( | |
| 531 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); | |
| 532 | |
| 533 worker_pool_->DisallowWorkerDetachmentForTesting(); | |
| 534 | |
| 535 // Saturate and count the threads that do not have the magic TLS value. If the | |
| 536 // value is not there, that means we're at a new thread. | |
| 537 std::vector<std::unique_ptr<WaitableEvent>> count_waiters; | |
| 538 for (auto& factory : factories) { | |
| 539 count_waiters.push_back(WrapUnique(new WaitableEvent( | |
| 540 WaitableEvent::ResetPolicy::MANUAL, | |
| 541 WaitableEvent::InitialState::NOT_SIGNALED))); | |
| 542 ASSERT_TRUE(factory->PostTask( | |
| 543 PostNestedTask::NO, | |
| 544 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait, | |
| 545 Unretained(this), | |
| 546 count_waiters.back().get()))); | |
| 547 factory->WaitForAllTasksToRun(); | |
| 548 } | |
| 549 | |
| 550 // Wait for all counters to complete. | |
| 551 for (auto& count_waiter : count_waiters) | |
| 552 count_waiter->Wait(); | |
| 553 | |
| 554 EXPECT_GT(zero_tls_values_, 0); | |
|
gab
2016/07/20 01:45:21
Should this use NoBarrier_Load()? On Windows this
fdoray
2016/07/20 14:15:43
atomicops.h:19 It is incorrect to make direct ass
robliao
2016/07/20 19:44:01
The atomicops unittest direct references their var
| |
| 555 | |
| 556 // Release tasks waiting on |waiter_|. | |
| 557 waiter_.Signal(); | |
| 558 } | |
| 559 | |
| 391 } // namespace internal | 560 } // namespace internal |
| 392 } // namespace base | 561 } // namespace base |
| OLD | NEW |