OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/compiler_specific.h" | 12 #include "base/compiler_specific.h" |
13 #include "base/macros.h" | 13 #include "base/macros.h" |
14 #include "base/memory/ref_counted.h" | 14 #include "base/memory/ref_counted.h" |
15 #include "base/memory/scoped_ptr.h" | 15 #include "base/memory/scoped_ptr.h" |
16 #include "base/message_loop/message_loop.h" | 16 #include "base/message_loop/message_loop.h" |
| 17 #include "base/sequence_checker_impl.h" |
17 #include "base/stl_util.h" | 18 #include "base/stl_util.h" |
18 #include "base/synchronization/condition_variable.h" | 19 #include "base/synchronization/condition_variable.h" |
19 #include "base/synchronization/lock.h" | 20 #include "base/synchronization/lock.h" |
| 21 #include "base/synchronization/waitable_event.h" |
20 #include "base/test/sequenced_task_runner_test_template.h" | 22 #include "base/test/sequenced_task_runner_test_template.h" |
21 #include "base/test/sequenced_worker_pool_owner.h" | 23 #include "base/test/sequenced_worker_pool_owner.h" |
22 #include "base/test/task_runner_test_template.h" | 24 #include "base/test/task_runner_test_template.h" |
23 #include "base/test/test_timeouts.h" | 25 #include "base/test/test_timeouts.h" |
24 #include "base/threading/platform_thread.h" | 26 #include "base/threading/platform_thread.h" |
25 #include "base/time/time.h" | 27 #include "base/time/time.h" |
26 #include "base/tracked_objects.h" | 28 #include "base/tracked_objects.h" |
27 #include "testing/gtest/include/gtest/gtest.h" | 29 #include "testing/gtest/include/gtest/gtest.h" |
28 | 30 |
29 namespace base { | 31 namespace base { |
(...skipping 857 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
887 // Should be fine to call on an idle instance with all threads created, and | 889 // Should be fine to call on an idle instance with all threads created, and |
888 // spamming the method shouldn't deadlock or confuse the class. | 890 // spamming the method shouldn't deadlock or confuse the class. |
889 pool()->FlushForTesting(); | 891 pool()->FlushForTesting(); |
890 pool()->FlushForTesting(); | 892 pool()->FlushForTesting(); |
891 | 893 |
892 // Should be fine to call after shutdown too. | 894 // Should be fine to call after shutdown too. |
893 pool()->Shutdown(); | 895 pool()->Shutdown(); |
894 pool()->FlushForTesting(); | 896 pool()->FlushForTesting(); |
895 } | 897 } |
896 | 898 |
897 namespace { | 899 // Helper method for VerifyCurrentSequencedTaskRunner() and |
898 | 900 // VerifyCurrentSequencedTaskRunnerForUnsequencedTask(). |
899 void CheckWorkerPoolAndSequenceToken( | 901 void VerifySequencedTaskRunnerRunsOnCurrentThread( |
900 const scoped_refptr<SequencedWorkerPool>& expected_pool, | 902 SequencedTaskRunner* task_runner, |
901 SequencedWorkerPool::SequenceToken expected_token) { | 903 bool should_run_on_current_thread, |
902 SequencedWorkerPool::SequenceToken token = | 904 const Closure& callback) { |
903 SequencedWorkerPool::GetSequenceTokenForCurrentThread(); | 905 EXPECT_EQ(should_run_on_current_thread, |
904 EXPECT_EQ(expected_token.ToString(), token.ToString()); | 906 task_runner->RunsTasksOnCurrentThread()); |
905 | 907 callback.Run(); |
906 scoped_refptr<SequencedWorkerPool> pool = | |
907 SequencedWorkerPool::GetWorkerPoolForCurrentThread(); | |
908 EXPECT_EQ(expected_pool, pool); | |
909 } | 908 } |
910 | 909 |
911 } // namespace | 910 void VerifyCurrentSequencedTaskRunner( |
| 911 SequencedTaskRunner* expected_task_runner, |
| 912 bool expected_equal, |
| 913 const Closure& callback) { |
| 914 scoped_refptr<SequencedTaskRunner> task_runner = |
| 915 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); |
912 | 916 |
913 TEST_F(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) { | 917 EXPECT_TRUE(task_runner->RunsTasksOnCurrentThread()); |
| 918 |
| 919 // SequencedTaskRunner does not allow directly checking for equality, but we |
| 920 // can post a task to one task runner and verify that the other task runner |
| 921 // is on the same sequence. |
| 922 task_runner->PostTask( |
| 923 FROM_HERE, |
| 924 Bind(&VerifySequencedTaskRunnerRunsOnCurrentThread, |
| 925 base::Unretained(expected_task_runner), expected_equal, callback)); |
| 926 } |
| 927 |
| 928 void VerifyCurrentSequencedTaskRunnerForUnsequencedTask( |
| 929 SequencedWorkerPool* pool, |
| 930 const Closure& callback) { |
| 931 EXPECT_FALSE( |
| 932 SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid()); |
| 933 |
| 934 scoped_refptr<SequencedTaskRunner> task_runner = |
| 935 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); |
| 936 |
| 937 EXPECT_TRUE(task_runner->RunsTasksOnCurrentThread()); |
| 938 |
| 939 scoped_refptr<SequencedTaskRunner> expected_task_runner = |
| 940 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); |
| 941 |
| 942 // The pool should now be running a sequence. This also verifies that no other |
| 943 // thread will start running tasks with this sequence token. |
| 944 const SequencedWorkerPool::SequenceToken sequence_token = |
| 945 SequencedWorkerPool::GetSequenceTokenForCurrentThread(); |
| 946 ASSERT_TRUE(sequence_token.IsValid()); |
| 947 EXPECT_TRUE(pool->IsRunningSequence(sequence_token)); |
| 948 |
| 949 // The two sequenced task runners should be the same. See |
| 950 // VerifyCurrentSequencedTaskRunner() above for why the check is implemented |
| 951 // this way. |
| 952 const bool expected_equal = true; |
| 953 task_runner->PostTask( |
| 954 FROM_HERE, |
| 955 Bind(&VerifySequencedTaskRunnerRunsOnCurrentThread, |
| 956 std::move(expected_task_runner), expected_equal, callback)); |
| 957 } |
| 958 |
| 959 TEST_F(SequencedWorkerPoolTest, GetSequencedTaskRunnerForCurrentThread) { |
914 EnsureAllWorkersCreated(); | 960 EnsureAllWorkersCreated(); |
915 | 961 |
916 // The current thread should have neither a worker pool nor a sequence token. | 962 // The current thread should not have a sequenced task runner from a |
917 SequencedWorkerPool::SequenceToken local_token = | 963 // worker pool. |
918 SequencedWorkerPool::GetSequenceTokenForCurrentThread(); | 964 scoped_refptr<SequencedTaskRunner> local_task_runner = |
919 scoped_refptr<SequencedWorkerPool> local_pool = | 965 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); |
920 SequencedWorkerPool::GetWorkerPoolForCurrentThread(); | 966 EXPECT_FALSE(local_task_runner); |
921 EXPECT_FALSE(local_token.IsValid()) << local_token.ToString(); | |
922 EXPECT_FALSE(local_pool); | |
923 | 967 |
924 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); | 968 WaitableEvent event(false, false); |
925 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); | 969 Closure signal = Bind(&WaitableEvent::Signal, Unretained(&event)); |
926 pool()->PostSequencedWorkerTask( | 970 scoped_refptr<SequencedTaskRunner> task_runner_1 = |
927 token1, FROM_HERE, | 971 pool()->GetSequencedTaskRunner(SequencedWorkerPool::GetSequenceToken()); |
928 base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token1)); | 972 scoped_refptr<SequencedTaskRunner> task_runner_2 = |
929 pool()->PostSequencedWorkerTask( | 973 pool()->GetSequencedTaskRunner(SequencedWorkerPool::GetSequenceToken()); |
930 token2, FROM_HERE, | 974 task_runner_1->PostTask( |
931 base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token2)); | 975 FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunner, |
| 976 base::Unretained(task_runner_1.get()), true, signal)); |
| 977 event.Wait(); |
| 978 task_runner_2->PostTask( |
| 979 FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunner, |
| 980 base::Unretained(task_runner_2.get()), true, signal)); |
| 981 event.Wait(); |
932 | 982 |
933 pool()->PostWorkerTask(FROM_HERE, | 983 task_runner_1->PostTask( |
934 base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), | 984 FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunner, |
935 SequencedWorkerPool::SequenceToken())); | 985 base::Unretained(task_runner_2.get()), false, signal)); |
| 986 event.Wait(); |
936 | 987 |
937 pool()->FlushForTesting(); | 988 pool()->PostWorkerTask( |
| 989 FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunnerForUnsequencedTask, |
| 990 pool(), signal)); |
| 991 event.Wait(); |
| 992 } |
| 993 |
| 994 class ChecksSequenceOnDestruction |
| 995 : public RefCountedThreadSafe<ChecksSequenceOnDestruction> { |
| 996 public: |
| 997 void DoNothing() {} |
| 998 |
| 999 private: |
| 1000 friend class RefCountedThreadSafe<ChecksSequenceOnDestruction>; |
| 1001 |
| 1002 ~ChecksSequenceOnDestruction() { |
| 1003 EXPECT_TRUE(sequence_checker_.CalledOnValidSequencedThread()); |
| 1004 } |
| 1005 |
| 1006 SequenceCheckerImpl sequence_checker_; |
| 1007 }; |
| 1008 |
| 1009 void VerifySequenceOnDestruction(const Closure& callback) { |
| 1010 scoped_refptr<SequencedTaskRunner> task_runner = |
| 1011 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); |
| 1012 scoped_refptr<ChecksSequenceOnDestruction> check_sequence( |
| 1013 new ChecksSequenceOnDestruction); |
| 1014 |
| 1015 // Post a task to an empty method. This will keep the only reference to the |
| 1016 // object, so it will be destroyed right after running the task. |
| 1017 task_runner->PostTask(FROM_HERE, Bind(&ChecksSequenceOnDestruction::DoNothing, |
| 1018 std::move(check_sequence))); |
| 1019 |
| 1020 // Post the callback afterwards, so we can be sure the first task completed. |
| 1021 task_runner->PostTask(FROM_HERE, callback); |
| 1022 } |
| 1023 |
| 1024 TEST_F(SequencedWorkerPoolTest, CheckSequenceOnDestruction) { |
| 1025 EnsureAllWorkersCreated(); |
| 1026 |
| 1027 WaitableEvent event(false, false); |
| 1028 Closure signal = Bind(&WaitableEvent::Signal, Unretained(&event)); |
| 1029 pool()->PostWorkerTask(FROM_HERE, Bind(&VerifySequenceOnDestruction, signal)); |
| 1030 event.Wait(); |
938 } | 1031 } |
939 | 1032 |
940 TEST_F(SequencedWorkerPoolTest, ShutsDownCleanWithContinueOnShutdown) { | 1033 TEST_F(SequencedWorkerPoolTest, ShutsDownCleanWithContinueOnShutdown) { |
941 scoped_refptr<SequencedTaskRunner> task_runner = | 1034 scoped_refptr<SequencedTaskRunner> task_runner = |
942 pool()->GetSequencedTaskRunnerWithShutdownBehavior( | 1035 pool()->GetSequencedTaskRunnerWithShutdownBehavior( |
943 pool()->GetSequenceToken(), | 1036 pool()->GetSequenceToken(), |
944 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); | 1037 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
945 | 1038 |
946 // Upon test exit, should shut down without hanging. | 1039 // Upon test exit, should shut down without hanging. |
947 pool()->Shutdown(); | 1040 pool()->Shutdown(); |
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1066 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, | 1159 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, |
1067 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); | 1160 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
1068 INSTANTIATE_TYPED_TEST_CASE_P( | 1161 INSTANTIATE_TYPED_TEST_CASE_P( |
1069 SequencedWorkerPoolSequencedTaskRunner, | 1162 SequencedWorkerPoolSequencedTaskRunner, |
1070 SequencedTaskRunnerDelayedTest, | 1163 SequencedTaskRunnerDelayedTest, |
1071 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); | 1164 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
1072 | 1165 |
1073 } // namespace | 1166 } // namespace |
1074 | 1167 |
1075 } // namespace base | 1168 } // namespace base |
OLD | NEW |