Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <memory> | 9 #include <memory> |
| 10 #include <unordered_set> | 10 #include <unordered_set> |
| 11 #include <vector> | 11 #include <vector> |
| 12 | 12 |
| 13 #include "base/atomicops.h" | |
| 13 #include "base/bind.h" | 14 #include "base/bind.h" |
| 14 #include "base/bind_helpers.h" | 15 #include "base/bind_helpers.h" |
| 15 #include "base/callback.h" | 16 #include "base/callback.h" |
| 16 #include "base/macros.h" | 17 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
| 18 #include "base/memory/ref_counted.h" | 19 #include "base/memory/ref_counted.h" |
| 19 #include "base/synchronization/condition_variable.h" | 20 #include "base/synchronization/condition_variable.h" |
| 20 #include "base/synchronization/lock.h" | 21 #include "base/synchronization/lock.h" |
| 21 #include "base/synchronization/waitable_event.h" | 22 #include "base/synchronization/waitable_event.h" |
| 22 #include "base/task_runner.h" | 23 #include "base/task_runner.h" |
| 23 #include "base/task_scheduler/delayed_task_manager.h" | 24 #include "base/task_scheduler/delayed_task_manager.h" |
| 24 #include "base/task_scheduler/sequence.h" | 25 #include "base/task_scheduler/sequence.h" |
| 25 #include "base/task_scheduler/sequence_sort_key.h" | 26 #include "base/task_scheduler/sequence_sort_key.h" |
| 26 #include "base/task_scheduler/task_tracker.h" | 27 #include "base/task_scheduler/task_tracker.h" |
| 27 #include "base/task_scheduler/test_task_factory.h" | 28 #include "base/task_scheduler/test_task_factory.h" |
| 28 #include "base/task_scheduler/test_utils.h" | 29 #include "base/task_scheduler/test_utils.h" |
| 29 #include "base/threading/platform_thread.h" | 30 #include "base/threading/platform_thread.h" |
| 30 #include "base/threading/simple_thread.h" | 31 #include "base/threading/simple_thread.h" |
| 32 #include "base/threading/thread_local_storage.h" | |
| 31 #include "base/threading/thread_restrictions.h" | 33 #include "base/threading/thread_restrictions.h" |
| 34 #include "base/time/time.h" | |
| 32 #include "testing/gtest/include/gtest/gtest.h" | 35 #include "testing/gtest/include/gtest/gtest.h" |
| 33 | 36 |
| 34 namespace base { | 37 namespace base { |
| 35 namespace internal { | 38 namespace internal { |
| 36 namespace { | 39 namespace { |
| 37 | 40 |
| 38 const size_t kNumWorkersInWorkerPool = 4; | 41 const size_t kNumWorkersInWorkerPool = 4; |
| 39 const size_t kNumThreadsPostingTasks = 4; | 42 const size_t kNumThreadsPostingTasks = 4; |
| 40 const size_t kNumTasksPostedPerThread = 150; | 43 const size_t kNumTasksPostedPerThread = 150; |
| 44 const TimeDelta kSuggestedReclaimTime = TimeDelta::FromMilliseconds(10); | |
| 41 | 45 |
| 42 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; | 46 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; |
| 43 | 47 |
| 44 class TestDelayedTaskManager : public DelayedTaskManager { | 48 class TestDelayedTaskManager : public DelayedTaskManager { |
| 45 public: | 49 public: |
| 46 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} | 50 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} |
| 47 | 51 |
| 48 void SetCurrentTime(TimeTicks now) { now_ = now; } | 52 void SetCurrentTime(TimeTicks now) { now_ = now; } |
| 49 | 53 |
| 50 // DelayedTaskManager: | 54 // DelayedTaskManager: |
| 51 TimeTicks Now() const override { return now_; } | 55 TimeTicks Now() const override { return now_; } |
| 52 | 56 |
| 53 private: | 57 private: |
| 54 TimeTicks now_ = TimeTicks::Now(); | 58 TimeTicks now_ = TimeTicks::Now(); |
| 55 | 59 |
| 56 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); | 60 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); |
| 57 }; | 61 }; |
| 58 | 62 |
| 59 class TaskSchedulerWorkerPoolImplTest | 63 class TaskSchedulerWorkerPoolImplTest |
| 60 : public testing::TestWithParam<ExecutionMode> { | 64 : public testing::TestWithParam<ExecutionMode> { |
| 61 protected: | 65 protected: |
| 62 TaskSchedulerWorkerPoolImplTest() = default; | 66 TaskSchedulerWorkerPoolImplTest() = default; |
| 63 | 67 |
| 64 void SetUp() override { | 68 void SetUp() override { |
| 65 worker_pool_ = SchedulerWorkerPoolImpl::Create( | 69 InitializeWorkerPool(); |
| 66 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL, | 70 worker_pool_->DisallowWorkerDetachmentForTesting(); |
| 67 kNumWorkersInWorkerPool, IORestriction::ALLOWED, | |
| 68 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | |
| 69 Unretained(this)), | |
| 70 &task_tracker_, &delayed_task_manager_); | |
| 71 ASSERT_TRUE(worker_pool_); | |
| 72 } | 71 } |
| 73 | 72 |
| 74 void TearDown() override { | 73 void TearDown() override { |
| 75 worker_pool_->WaitForAllWorkersIdleForTesting(); | 74 worker_pool_->WaitForAllWorkersIdleForTesting(); |
| 76 worker_pool_->JoinForTesting(); | 75 worker_pool_->JoinForTesting(); |
| 77 } | 76 } |
| 78 | 77 |
| 78 void InitializeWorkerPool() { | |
| 79 worker_pool_ = SchedulerWorkerPoolImpl::Create( | |
| 80 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL, | |
| 81 kNumWorkersInWorkerPool, IORestriction::ALLOWED, | |
| 82 kSuggestedReclaimTime, | |
| 83 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | |
| 84 Unretained(this)), | |
| 85 &task_tracker_, &delayed_task_manager_); | |
| 86 ASSERT_TRUE(worker_pool_); | |
| 87 } | |
| 88 | |
| 79 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; | 89 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; |
| 80 | 90 |
| 81 TaskTracker task_tracker_; | 91 TaskTracker task_tracker_; |
| 82 TestDelayedTaskManager delayed_task_manager_; | 92 TestDelayedTaskManager delayed_task_manager_; |
| 83 | 93 |
| 84 private: | 94 private: |
| 85 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | 95 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { |
| 86 // In production code, this callback would be implemented by the | 96 // In production code, this callback would be implemented by the |
| 87 // TaskScheduler which would first determine which PriorityQueue the | 97 // TaskScheduler which would first determine which PriorityQueue the |
| 88 // sequence must be re-enqueued. | 98 // sequence must be re-enqueued. |
| (...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 357 }; | 367 }; |
| 358 | 368 |
| 359 } // namespace | 369 } // namespace |
| 360 | 370 |
| 361 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { | 371 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { |
| 362 TaskTracker task_tracker; | 372 TaskTracker task_tracker; |
| 363 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); | 373 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); |
| 364 | 374 |
| 365 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 375 auto worker_pool = SchedulerWorkerPoolImpl::Create( |
| 366 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), | 376 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), |
| 367 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 377 TimeDelta::Max(), Bind(&NotReachedReEnqueueSequenceCallback), |
| 368 &delayed_task_manager); | 378 &task_tracker, &delayed_task_manager); |
| 369 ASSERT_TRUE(worker_pool); | 379 ASSERT_TRUE(worker_pool); |
| 380 worker_pool->DisallowWorkerDetachmentForTesting(); | |
| 370 | 381 |
| 371 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | 382 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, |
| 372 WaitableEvent::InitialState::NOT_SIGNALED); | 383 WaitableEvent::InitialState::NOT_SIGNALED); |
| 373 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) | 384 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) |
| 374 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); | 385 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); |
| 375 task_ran.Wait(); | 386 task_ran.Wait(); |
| 376 | 387 |
| 377 worker_pool->JoinForTesting(); | 388 worker_pool->JoinForTesting(); |
| 378 } | 389 } |
| 379 | 390 |
| 380 INSTANTIATE_TEST_CASE_P(IOAllowed, | 391 INSTANTIATE_TEST_CASE_P(IOAllowed, |
| 381 TaskSchedulerWorkerPoolImplIORestrictionTest, | 392 TaskSchedulerWorkerPoolImplIORestrictionTest, |
| 382 ::testing::Values(IORestriction::ALLOWED)); | 393 ::testing::Values(IORestriction::ALLOWED)); |
| 383 INSTANTIATE_TEST_CASE_P(IODisallowed, | 394 INSTANTIATE_TEST_CASE_P(IODisallowed, |
| 384 TaskSchedulerWorkerPoolImplIORestrictionTest, | 395 TaskSchedulerWorkerPoolImplIORestrictionTest, |
| 385 ::testing::Values(IORestriction::DISALLOWED)); | 396 ::testing::Values(IORestriction::DISALLOWED)); |
| 386 | 397 |
| 398 namespace { | |
| 399 | |
| 400 constexpr size_t kMagicTlsValue = 42; | |
| 401 | |
| 402 class TaskSchedulerWorkerPoolSingleThreadedTest | |
| 403 : public TaskSchedulerWorkerPoolImplTest { | |
| 404 public: | |
| 405 void SetTlsValue() { | |
| 406 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); | |
| 407 } | |
| 408 | |
| 409 void CheckTlsValue() { | |
| 410 EXPECT_EQ(kMagicTlsValue, reinterpret_cast<size_t>(slot_.Get())); | |
|
gab
2016/07/13 18:36:31
Instead of using TLS to do an "is this the right t
robliao
2016/07/13 20:19:47
Nice. I forgot about ThreadCheckerImpl. Done.
| |
| 411 } | |
| 412 | |
| 413 protected: | |
| 414 void SetUp() override { | |
| 415 InitializeWorkerPool(); | |
| 416 } | |
| 417 | |
| 418 TaskSchedulerWorkerPoolSingleThreadedTest() = default; | |
| 419 | |
| 420 private: | |
| 421 ThreadLocalStorage::Slot slot_; | |
| 422 | |
| 423 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest); | |
| 424 }; | |
| 425 | |
| 426 } // namespace | |
| 427 | |
| 428 // Verify that thread resources for a single thread remain. | |
| 429 TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) { | |
| 430 auto single_thread_task_runner = | |
| 431 worker_pool_->CreateTaskRunnerWithTraits( | |
| 432 TaskTraits(). | |
| 433 WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN), | |
| 434 ExecutionMode::SINGLE_THREADED); | |
| 435 single_thread_task_runner->PostTask( | |
| 436 FROM_HERE, | |
| 437 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::SetTlsValue, | |
| 438 Unretained(this))); | |
| 439 WaitableEvent task_waiter(WaitableEvent::ResetPolicy::MANUAL, | |
| 440 WaitableEvent::InitialState::NOT_SIGNALED); | |
| 441 single_thread_task_runner->PostTask( | |
| 442 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); | |
| 443 task_waiter.Wait(); | |
| 444 task_waiter.Reset(); | |
|
gab
2016/07/13 18:36:31
Make the event AUTOMATIC instead of doing Wait()+R
robliao
2016/07/13 20:19:47
I thought we were preferring Manual events for tes
gab
2016/07/13 21:07:55
I don't think there's a preference. I tend to use
robliao
2016/07/19 22:03:47
Cool. I prefer automatic here as well. Done.
| |
| 445 worker_pool_->WaitForAllWorkersIdleForTesting(); | |
| 446 | |
| 447 // Give the worker pool a chance to reclaim its threads. | |
|
gab
2016/07/13 18:36:32
It was never forced to create more than one thread
robliao
2016/07/13 20:19:47
The goal of this test is to verify that the worker
| |
| 448 PlatformThread::Sleep( | |
| 449 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); | |
| 450 | |
| 451 worker_pool_->DisallowWorkerDetachmentForTesting(); | |
| 452 | |
| 453 single_thread_task_runner->PostTask( | |
| 454 FROM_HERE, | |
| 455 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckTlsValue, | |
| 456 Unretained(this))); | |
| 457 single_thread_task_runner->PostTask( | |
| 458 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); | |
| 459 task_waiter.Wait(); | |
| 460 } | |
| 461 | |
| 462 namespace { | |
| 463 | |
| 464 class TaskSchedulerWorkerPoolCheckTlsReuse | |
| 465 : public TaskSchedulerWorkerPoolImplTest { | |
| 466 public: | |
| 467 void SetTlsValueAndWait() { | |
| 468 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); | |
| 469 waiter_.Wait(); | |
| 470 } | |
| 471 | |
| 472 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { | |
| 473 if (!slot_.Get()) | |
| 474 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); | |
| 475 | |
| 476 count_waiter->Signal(); | |
| 477 waiter_.Wait(); | |
| 478 } | |
| 479 | |
| 480 protected: | |
| 481 TaskSchedulerWorkerPoolCheckTlsReuse() : | |
| 482 waiter_(WaitableEvent::ResetPolicy::MANUAL, | |
| 483 WaitableEvent::InitialState::NOT_SIGNALED) {} | |
| 484 | |
| 485 void SetUp() override { | |
| 486 InitializeWorkerPool(); | |
| 487 } | |
| 488 | |
| 489 subtle::Atomic32 zero_tls_values_ = 0; | |
| 490 | |
| 491 WaitableEvent waiter_; | |
| 492 | |
| 493 private: | |
| 494 ThreadLocalStorage::Slot slot_; | |
| 495 | |
| 496 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); | |
| 497 }; | |
| 498 | |
| 499 } // namespace | |
| 500 | |
| 501 // Checks that at least one thread has detached by checking the TLS. | |
| 502 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) { | |
| 503 // Saturate the threads and mark each thread with a magic TLS value. | |
| 504 std::vector<std::unique_ptr<test::TestTaskFactory>> factories; | |
| 505 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { | |
| 506 factories.push_back(WrapUnique(new test::TestTaskFactory( | |
| 507 worker_pool_->CreateTaskRunnerWithTraits( | |
| 508 TaskTraits(), ExecutionMode::PARALLEL), | |
| 509 ExecutionMode::PARALLEL))); | |
| 510 ASSERT_TRUE(factories.back()->PostTask( | |
| 511 PostNestedTask::NO, | |
| 512 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait, | |
| 513 Unretained(this)))); | |
| 514 factories.back()->WaitForAllTasksToRun(); | |
| 515 } | |
| 516 | |
| 517 // Release tasks waiting on |waiter_|. | |
| 518 waiter_.Signal(); | |
| 519 worker_pool_->WaitForAllWorkersIdleForTesting(); | |
| 520 | |
| 521 // All threads should be done running by now, so reset for the next phase. | |
| 522 waiter_.Reset(); | |
| 523 | |
| 524 // Give the worker pool a chance to detach its threads. | |
| 525 PlatformThread::Sleep( | |
| 526 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); | |
| 527 | |
| 528 worker_pool_->DisallowWorkerDetachmentForTesting(); | |
| 529 | |
| 530 // Saturate and count the threads that do not have the magic TLS value. If the | |
| 531 // value is not there, that means we're at a new thread. | |
| 532 std::vector<std::unique_ptr<WaitableEvent>> count_waiters; | |
| 533 for (auto& factory : factories) { | |
| 534 count_waiters.push_back(WrapUnique(new WaitableEvent( | |
| 535 WaitableEvent::ResetPolicy::MANUAL, | |
| 536 WaitableEvent::InitialState::NOT_SIGNALED))); | |
| 537 ASSERT_TRUE(factory->PostTask( | |
| 538 PostNestedTask::NO, | |
| 539 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait, | |
|
gab
2016/07/13 18:36:31
Instead of counting lack of TLS values here I thin
robliao
2016/07/13 20:19:47
Wouldn't using the TLS be basically the same thing
gab
2016/07/13 21:07:55
Ah ok you're right, the TLS effectively acts as a
| |
| 540 Unretained(this), | |
| 541 count_waiters.back().get()))); | |
| 542 factory->WaitForAllTasksToRun(); | |
| 543 } | |
| 544 | |
| 545 // Wait for all counters to complete. | |
| 546 for (auto& count_waiter : count_waiters) | |
| 547 count_waiter->Wait(); | |
| 548 | |
| 549 EXPECT_GT(zero_tls_values_, 0); | |
| 550 | |
| 551 // Release tasks waiting on |waiter_|. | |
| 552 waiter_.Signal(); | |
| 553 } | |
| 554 | |
| 387 } // namespace internal | 555 } // namespace internal |
| 388 } // namespace base | 556 } // namespace base |
| OLD | NEW |