OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <memory> | 9 #include <memory> |
10 #include <unordered_set> | 10 #include <unordered_set> |
11 #include <vector> | 11 #include <vector> |
12 | 12 |
13 #include "base/atomicops.h" | |
13 #include "base/bind.h" | 14 #include "base/bind.h" |
14 #include "base/bind_helpers.h" | 15 #include "base/bind_helpers.h" |
15 #include "base/callback.h" | 16 #include "base/callback.h" |
16 #include "base/macros.h" | 17 #include "base/macros.h" |
17 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
18 #include "base/memory/ref_counted.h" | 19 #include "base/memory/ref_counted.h" |
19 #include "base/synchronization/condition_variable.h" | 20 #include "base/synchronization/condition_variable.h" |
20 #include "base/synchronization/lock.h" | 21 #include "base/synchronization/lock.h" |
21 #include "base/synchronization/waitable_event.h" | 22 #include "base/synchronization/waitable_event.h" |
22 #include "base/task_runner.h" | 23 #include "base/task_runner.h" |
23 #include "base/task_scheduler/delayed_task_manager.h" | 24 #include "base/task_scheduler/delayed_task_manager.h" |
24 #include "base/task_scheduler/sequence.h" | 25 #include "base/task_scheduler/sequence.h" |
25 #include "base/task_scheduler/sequence_sort_key.h" | 26 #include "base/task_scheduler/sequence_sort_key.h" |
26 #include "base/task_scheduler/task_tracker.h" | 27 #include "base/task_scheduler/task_tracker.h" |
27 #include "base/task_scheduler/test_task_factory.h" | 28 #include "base/task_scheduler/test_task_factory.h" |
28 #include "base/task_scheduler/test_utils.h" | 29 #include "base/task_scheduler/test_utils.h" |
29 #include "base/threading/platform_thread.h" | 30 #include "base/threading/platform_thread.h" |
30 #include "base/threading/simple_thread.h" | 31 #include "base/threading/simple_thread.h" |
32 #include "base/threading/thread_local_storage.h" | |
31 #include "base/threading/thread_restrictions.h" | 33 #include "base/threading/thread_restrictions.h" |
34 #include "base/time/time.h" | |
32 #include "testing/gtest/include/gtest/gtest.h" | 35 #include "testing/gtest/include/gtest/gtest.h" |
33 | 36 |
34 namespace base { | 37 namespace base { |
35 namespace internal { | 38 namespace internal { |
36 namespace { | 39 namespace { |
37 | 40 |
38 const size_t kNumWorkersInWorkerPool = 4; | 41 const size_t kNumWorkersInWorkerPool = 4; |
39 const size_t kNumThreadsPostingTasks = 4; | 42 const size_t kNumThreadsPostingTasks = 4; |
40 const size_t kNumTasksPostedPerThread = 150; | 43 const size_t kNumTasksPostedPerThread = 150; |
44 const TimeDelta kSuggestedReclaimTime = TimeDelta::FromMilliseconds(10); | |
41 | 45 |
42 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; | 46 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; |
43 | 47 |
44 class TestDelayedTaskManager : public DelayedTaskManager { | 48 class TestDelayedTaskManager : public DelayedTaskManager { |
45 public: | 49 public: |
46 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} | 50 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} |
47 | 51 |
48 void SetCurrentTime(TimeTicks now) { now_ = now; } | 52 void SetCurrentTime(TimeTicks now) { now_ = now; } |
49 | 53 |
50 // DelayedTaskManager: | 54 // DelayedTaskManager: |
51 TimeTicks Now() const override { return now_; } | 55 TimeTicks Now() const override { return now_; } |
52 | 56 |
53 private: | 57 private: |
54 TimeTicks now_ = TimeTicks::Now(); | 58 TimeTicks now_ = TimeTicks::Now(); |
55 | 59 |
56 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); | 60 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); |
57 }; | 61 }; |
58 | 62 |
59 class TaskSchedulerWorkerPoolImplTest | 63 class TaskSchedulerWorkerPoolImplTest |
60 : public testing::TestWithParam<ExecutionMode> { | 64 : public testing::TestWithParam<ExecutionMode> { |
61 protected: | 65 protected: |
62 TaskSchedulerWorkerPoolImplTest() = default; | 66 TaskSchedulerWorkerPoolImplTest() = default; |
63 | 67 |
64 void SetUp() override { | 68 void SetUp() override { |
65 worker_pool_ = SchedulerWorkerPoolImpl::Create( | 69 InitializeWorkerPool(); |
66 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL, | 70 worker_pool_->DisallowWorkerDetachmentForTesting(); |
67 kNumWorkersInWorkerPool, IORestriction::ALLOWED, | |
68 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | |
69 Unretained(this)), | |
70 &task_tracker_, &delayed_task_manager_); | |
71 ASSERT_TRUE(worker_pool_); | |
72 } | 71 } |
73 | 72 |
74 void TearDown() override { | 73 void TearDown() override { |
75 worker_pool_->WaitForAllWorkersIdleForTesting(); | 74 worker_pool_->WaitForAllWorkersIdleForTesting(); |
76 worker_pool_->JoinForTesting(); | 75 worker_pool_->JoinForTesting(); |
77 } | 76 } |
78 | 77 |
78 void InitializeWorkerPool() { | |
79 worker_pool_ = SchedulerWorkerPoolImpl::Create( | |
80 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL, | |
81 kNumWorkersInWorkerPool, IORestriction::ALLOWED, | |
82 kSuggestedReclaimTime, | |
83 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | |
84 Unretained(this)), | |
85 &task_tracker_, &delayed_task_manager_); | |
86 ASSERT_TRUE(worker_pool_); | |
87 } | |
88 | |
79 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; | 89 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; |
80 | 90 |
81 TaskTracker task_tracker_; | 91 TaskTracker task_tracker_; |
82 TestDelayedTaskManager delayed_task_manager_; | 92 TestDelayedTaskManager delayed_task_manager_; |
83 | 93 |
84 private: | 94 private: |
85 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | 95 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { |
86 // In production code, this callback would be implemented by the | 96 // In production code, this callback would be implemented by the |
87 // TaskScheduler which would first determine which PriorityQueue the | 97 // TaskScheduler which would first determine which PriorityQueue the |
88 // sequence must be re-enqueued. | 98 // sequence must be re-enqueued. |
(...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
357 }; | 367 }; |
358 | 368 |
359 } // namespace | 369 } // namespace |
360 | 370 |
361 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { | 371 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { |
362 TaskTracker task_tracker; | 372 TaskTracker task_tracker; |
363 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); | 373 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); |
364 | 374 |
365 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 375 auto worker_pool = SchedulerWorkerPoolImpl::Create( |
366 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), | 376 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), |
367 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 377 TimeDelta::Max(), Bind(&NotReachedReEnqueueSequenceCallback), |
368 &delayed_task_manager); | 378 &task_tracker, &delayed_task_manager); |
369 ASSERT_TRUE(worker_pool); | 379 ASSERT_TRUE(worker_pool); |
380 worker_pool->DisallowWorkerDetachmentForTesting(); | |
370 | 381 |
371 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | 382 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, |
372 WaitableEvent::InitialState::NOT_SIGNALED); | 383 WaitableEvent::InitialState::NOT_SIGNALED); |
373 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) | 384 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) |
374 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); | 385 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); |
375 task_ran.Wait(); | 386 task_ran.Wait(); |
376 | 387 |
377 worker_pool->JoinForTesting(); | 388 worker_pool->JoinForTesting(); |
378 } | 389 } |
379 | 390 |
380 INSTANTIATE_TEST_CASE_P(IOAllowed, | 391 INSTANTIATE_TEST_CASE_P(IOAllowed, |
381 TaskSchedulerWorkerPoolImplIORestrictionTest, | 392 TaskSchedulerWorkerPoolImplIORestrictionTest, |
382 ::testing::Values(IORestriction::ALLOWED)); | 393 ::testing::Values(IORestriction::ALLOWED)); |
383 INSTANTIATE_TEST_CASE_P(IODisallowed, | 394 INSTANTIATE_TEST_CASE_P(IODisallowed, |
384 TaskSchedulerWorkerPoolImplIORestrictionTest, | 395 TaskSchedulerWorkerPoolImplIORestrictionTest, |
385 ::testing::Values(IORestriction::DISALLOWED)); | 396 ::testing::Values(IORestriction::DISALLOWED)); |
386 | 397 |
398 namespace { | |
399 | |
400 constexpr size_t kMagicTlsValue = 42; | |
401 | |
402 class TaskSchedulerWorkerPoolSingleThreadedTest | |
403 : public TaskSchedulerWorkerPoolImplTest { | |
404 public: | |
405 void SetTlsValue() { | |
406 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); | |
407 } | |
408 | |
409 void CheckTlsValue() { | |
410 EXPECT_EQ(kMagicTlsValue, reinterpret_cast<size_t>(slot_.Get())); | |
gab
2016/07/13 18:36:31
Instead of using TLS to do an "is this the right t
robliao
2016/07/13 20:19:47
Nice. I forgot about ThreadCheckerImpl. Done.
| |
411 } | |
412 | |
413 protected: | |
414 void SetUp() override { | |
415 InitializeWorkerPool(); | |
416 } | |
417 | |
418 TaskSchedulerWorkerPoolSingleThreadedTest() = default; | |
419 | |
420 private: | |
421 ThreadLocalStorage::Slot slot_; | |
422 | |
423 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest); | |
424 }; | |
425 | |
426 } // namespace | |
427 | |
428 // Verify that thread resources for a single thread remain. | |
429 TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) { | |
430 auto single_thread_task_runner = | |
431 worker_pool_->CreateTaskRunnerWithTraits( | |
432 TaskTraits(). | |
433 WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN), | |
434 ExecutionMode::SINGLE_THREADED); | |
435 single_thread_task_runner->PostTask( | |
436 FROM_HERE, | |
437 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::SetTlsValue, | |
438 Unretained(this))); | |
439 WaitableEvent task_waiter(WaitableEvent::ResetPolicy::MANUAL, | |
440 WaitableEvent::InitialState::NOT_SIGNALED); | |
441 single_thread_task_runner->PostTask( | |
442 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); | |
443 task_waiter.Wait(); | |
444 task_waiter.Reset(); | |
gab
2016/07/13 18:36:31
Make the event AUTOMATIC instead of doing Wait()+R
robliao
2016/07/13 20:19:47
I thought we were preferring Manual events for tes
gab
2016/07/13 21:07:55
I don't think there's a preference. I tend to use
robliao
2016/07/19 22:03:47
Cool. I prefer automatic here as well. Done.
| |
445 worker_pool_->WaitForAllWorkersIdleForTesting(); | |
446 | |
447 // Give the worker pool a chance to reclaim its threads. | |
gab
2016/07/13 18:36:32
It was never forced to create more than one thread
robliao
2016/07/13 20:19:47
The goal of this test is to verify that the worker
| |
448 PlatformThread::Sleep( | |
449 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); | |
450 | |
451 worker_pool_->DisallowWorkerDetachmentForTesting(); | |
452 | |
453 single_thread_task_runner->PostTask( | |
454 FROM_HERE, | |
455 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckTlsValue, | |
456 Unretained(this))); | |
457 single_thread_task_runner->PostTask( | |
458 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); | |
459 task_waiter.Wait(); | |
460 } | |
461 | |
462 namespace { | |
463 | |
464 class TaskSchedulerWorkerPoolCheckTlsReuse | |
465 : public TaskSchedulerWorkerPoolImplTest { | |
466 public: | |
467 void SetTlsValueAndWait() { | |
468 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); | |
469 waiter_.Wait(); | |
470 } | |
471 | |
472 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { | |
473 if (!slot_.Get()) | |
474 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); | |
475 | |
476 count_waiter->Signal(); | |
477 waiter_.Wait(); | |
478 } | |
479 | |
480 protected: | |
481 TaskSchedulerWorkerPoolCheckTlsReuse() : | |
482 waiter_(WaitableEvent::ResetPolicy::MANUAL, | |
483 WaitableEvent::InitialState::NOT_SIGNALED) {} | |
484 | |
485 void SetUp() override { | |
486 InitializeWorkerPool(); | |
487 } | |
488 | |
489 subtle::Atomic32 zero_tls_values_ = 0; | |
490 | |
491 WaitableEvent waiter_; | |
492 | |
493 private: | |
494 ThreadLocalStorage::Slot slot_; | |
495 | |
496 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); | |
497 }; | |
498 | |
499 } // namespace | |
500 | |
501 // Checks that at least one thread has detached by checking the TLS. | |
502 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) { | |
503 // Saturate the threads and mark each thread with a magic TLS value. | |
504 std::vector<std::unique_ptr<test::TestTaskFactory>> factories; | |
505 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { | |
506 factories.push_back(WrapUnique(new test::TestTaskFactory( | |
507 worker_pool_->CreateTaskRunnerWithTraits( | |
508 TaskTraits(), ExecutionMode::PARALLEL), | |
509 ExecutionMode::PARALLEL))); | |
510 ASSERT_TRUE(factories.back()->PostTask( | |
511 PostNestedTask::NO, | |
512 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait, | |
513 Unretained(this)))); | |
514 factories.back()->WaitForAllTasksToRun(); | |
515 } | |
516 | |
517 // Release tasks waiting on |waiter_|. | |
518 waiter_.Signal(); | |
519 worker_pool_->WaitForAllWorkersIdleForTesting(); | |
520 | |
521 // All threads should be done running by now, so reset for the next phase. | |
522 waiter_.Reset(); | |
523 | |
524 // Give the worker pool a chance to detach its threads. | |
525 PlatformThread::Sleep( | |
526 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); | |
527 | |
528 worker_pool_->DisallowWorkerDetachmentForTesting(); | |
529 | |
530 // Saturate and count the threads that do not have the magic TLS value. If the | |
531 // value is not there, that means we're at a new thread. | |
532 std::vector<std::unique_ptr<WaitableEvent>> count_waiters; | |
533 for (auto& factory : factories) { | |
534 count_waiters.push_back(WrapUnique(new WaitableEvent( | |
535 WaitableEvent::ResetPolicy::MANUAL, | |
536 WaitableEvent::InitialState::NOT_SIGNALED))); | |
537 ASSERT_TRUE(factory->PostTask( | |
538 PostNestedTask::NO, | |
539 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait, | |
gab
2016/07/13 18:36:31
Instead of counting lack of TLS values here I thin
robliao
2016/07/13 20:19:47
Wouldn't using the TLS be basically the same thing
gab
2016/07/13 21:07:55
Ah ok you're right, the TLS effectively acts as a
| |
540 Unretained(this), | |
541 count_waiters.back().get()))); | |
542 factory->WaitForAllTasksToRun(); | |
543 } | |
544 | |
545 // Wait for all counters to complete. | |
546 for (auto& count_waiter : count_waiters) | |
547 count_waiter->Wait(); | |
548 | |
549 EXPECT_GT(zero_tls_values_, 0); | |
550 | |
551 // Release tasks waiting on |waiter_|. | |
552 waiter_.Signal(); | |
553 } | |
554 | |
387 } // namespace internal | 555 } // namespace internal |
388 } // namespace base | 556 } // namespace base |
OLD | NEW |