OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
11 #include <memory> | 11 #include <memory> |
12 #include <set> | 12 #include <set> |
13 #include <utility> | 13 #include <utility> |
14 #include <vector> | 14 #include <vector> |
15 | 15 |
16 #include "base/atomic_sequence_num.h" | 16 #include "base/atomic_sequence_num.h" |
| 17 #include "base/atomicops.h" |
17 #include "base/callback.h" | 18 #include "base/callback.h" |
18 #include "base/compiler_specific.h" | 19 #include "base/compiler_specific.h" |
19 #include "base/critical_closure.h" | 20 #include "base/critical_closure.h" |
20 #include "base/lazy_instance.h" | 21 #include "base/lazy_instance.h" |
21 #include "base/logging.h" | 22 #include "base/logging.h" |
22 #include "base/macros.h" | 23 #include "base/macros.h" |
23 #include "base/memory/ptr_util.h" | 24 #include "base/memory/ptr_util.h" |
24 #include "base/stl_util.h" | 25 #include "base/stl_util.h" |
25 #include "base/strings/stringprintf.h" | 26 #include "base/strings/stringprintf.h" |
26 #include "base/synchronization/condition_variable.h" | 27 #include "base/synchronization/condition_variable.h" |
(...skipping 24 matching lines...) Expand all Loading... |
51 namespace base { | 52 namespace base { |
52 | 53 |
53 namespace { | 54 namespace { |
54 | 55 |
55 // An enum representing the state of all pools. Any given process should only | 56 // An enum representing the state of all pools. Any given process should only |
56 // ever transition from NONE_ACTIVE to the active states, transitions between | 57 // ever transition from NONE_ACTIVE to the active states, transitions between |
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition | 58 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called | 59 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created | 60 // and the WORKER_CREATED transition occurs when a Worker needs to be created |
60 // because the first task was posted and the state is still NONE_ACTIVE. | 61 // because the first task was posted and the state is still NONE_ACTIVE. |
| 62 // |g_all_pools_state| uses relaxed atomic operations to ensure no data race |
| 63 // between reads/writes, strict memory ordering isn't required per no other |
| 64 // state being inferred from its value. Explicit synchronization (e.g. locks or |
| 65 // events) would be overkill (it's fine for other threads to still see |
| 66 // NONE_ACTIVE after the first Worker was created -- this is not possible for |
| 67 // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no |
| 68 // other threads are active). |
61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool | 69 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
62 // will be phased out completely otherwise). | 70 // will be phased out completely otherwise). |
63 enum class AllPoolsState { | 71 enum AllPoolsState : subtle::Atomic32 { |
64 NONE_ACTIVE, | 72 NONE_ACTIVE, |
65 WORKER_CREATED, | 73 WORKER_CREATED, |
66 REDIRECTED_TO_TASK_SCHEDULER, | 74 REDIRECTED_TO_TASK_SCHEDULER, |
67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE; | 75 }; |
| 76 subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
68 | 77 |
69 struct SequencedTask : public TrackingInfo { | 78 struct SequencedTask : public TrackingInfo { |
70 SequencedTask() | 79 SequencedTask() |
71 : sequence_token_id(0), | 80 : sequence_token_id(0), |
72 trace_id(0), | 81 trace_id(0), |
73 sequence_task_number(0), | 82 sequence_task_number(0), |
74 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} | 83 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
75 | 84 |
76 explicit SequencedTask(const tracked_objects::Location& from_here) | 85 explicit SequencedTask(const tracked_objects::Location& from_here) |
77 : base::TrackingInfo(from_here, TimeTicks()), | 86 : base::TrackingInfo(from_here, TimeTicks()), |
(...skipping 467 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
545 // Worker definitions --------------------------------------------------------- | 554 // Worker definitions --------------------------------------------------------- |
546 | 555 |
547 SequencedWorkerPool::Worker::Worker( | 556 SequencedWorkerPool::Worker::Worker( |
548 scoped_refptr<SequencedWorkerPool> worker_pool, | 557 scoped_refptr<SequencedWorkerPool> worker_pool, |
549 int thread_number, | 558 int thread_number, |
550 const std::string& prefix) | 559 const std::string& prefix) |
551 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), | 560 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), |
552 worker_pool_(std::move(worker_pool)), | 561 worker_pool_(std::move(worker_pool)), |
553 task_shutdown_behavior_(BLOCK_SHUTDOWN), | 562 task_shutdown_behavior_(BLOCK_SHUTDOWN), |
554 is_processing_task_(false) { | 563 is_processing_task_(false) { |
555 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 564 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 565 subtle::NoBarrier_Load(&g_all_pools_state)); |
556 Start(); | 566 Start(); |
557 } | 567 } |
558 | 568 |
559 SequencedWorkerPool::Worker::~Worker() { | 569 SequencedWorkerPool::Worker::~Worker() { |
560 } | 570 } |
561 | 571 |
562 void SequencedWorkerPool::Worker::Run() { | 572 void SequencedWorkerPool::Worker::Run() { |
563 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 573 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 574 subtle::NoBarrier_Load(&g_all_pools_state)); |
564 | 575 |
565 #if defined(OS_WIN) | 576 #if defined(OS_WIN) |
566 win::ScopedCOMInitializer com_initializer; | 577 win::ScopedCOMInitializer com_initializer; |
567 #endif | 578 #endif |
568 | 579 |
569 // Store a pointer to this worker in thread local storage for static function | 580 // Store a pointer to this worker in thread local storage for static function |
570 // access. | 581 // access. |
571 DCHECK(!lazy_tls_ptr_.Get().Get()); | 582 DCHECK(!lazy_tls_ptr_.Get().Get()); |
572 lazy_tls_ptr_.Get().Set(this); | 583 lazy_tls_ptr_.Get().Set(this); |
573 | 584 |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
699 "SequencedWorkerPool::Inner::PostTask", | 710 "SequencedWorkerPool::Inner::PostTask", |
700 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), | 711 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
701 TRACE_EVENT_FLAG_FLOW_OUT); | 712 TRACE_EVENT_FLAG_FLOW_OUT); |
702 | 713 |
703 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); | 714 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
704 | 715 |
705 // Now that we have the lock, apply the named token rules. | 716 // Now that we have the lock, apply the named token rules. |
706 if (optional_token_name) | 717 if (optional_token_name) |
707 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); | 718 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
708 | 719 |
709 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 720 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 721 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
710 PostTaskToTaskScheduler(sequenced); | 722 PostTaskToTaskScheduler(sequenced); |
711 } else { | 723 } else { |
712 pending_tasks_.insert(sequenced); | 724 pending_tasks_.insert(sequenced); |
713 | 725 |
714 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) | 726 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
715 blocking_shutdown_pending_task_count_++; | 727 blocking_shutdown_pending_task_count_++; |
716 | 728 |
717 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 729 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
718 } | 730 } |
719 } | 731 } |
720 | 732 |
721 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 733 if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| 734 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
722 // Actually start the additional thread or signal an existing one outside | 735 // Actually start the additional thread or signal an existing one outside |
723 // the lock. | 736 // the lock. |
724 if (create_thread_id) | 737 if (create_thread_id) |
725 FinishStartingAdditionalThread(create_thread_id); | 738 FinishStartingAdditionalThread(create_thread_id); |
726 else | 739 else |
727 SignalHasWork(); | 740 SignalHasWork(); |
728 } | 741 } |
729 | 742 |
730 #if DCHECK_IS_ON() | 743 #if DCHECK_IS_ON() |
731 { | 744 { |
732 AutoLock lock_for_dcheck(lock_); | 745 AutoLock lock_for_dcheck(lock_); |
733 // Some variables are exposed in both modes for convenience but only really | 746 // Some variables are exposed in both modes for convenience but only really |
734 // intended for one of them at runtime, confirm exclusive usage here. | 747 // intended for one of them at runtime, confirm exclusive usage here. |
735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 748 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 749 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
736 DCHECK(pending_tasks_.empty()); | 750 DCHECK(pending_tasks_.empty()); |
737 DCHECK_EQ(0, create_thread_id); | 751 DCHECK_EQ(0, create_thread_id); |
738 } else { | 752 } else { |
739 DCHECK(sequenced_task_runner_map_.empty()); | 753 DCHECK(sequenced_task_runner_map_.empty()); |
740 } | 754 } |
741 } | 755 } |
742 #endif // DCHECK_IS_ON() | 756 #endif // DCHECK_IS_ON() |
743 | 757 |
744 return true; | 758 return true; |
745 } | 759 } |
746 | 760 |
747 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( | 761 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
748 const SequencedTask& sequenced) { | 762 const SequencedTask& sequenced) { |
749 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 763 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 764 subtle::NoBarrier_Load(&g_all_pools_state)); |
750 | 765 |
751 lock_.AssertAcquired(); | 766 lock_.AssertAcquired(); |
752 | 767 |
753 // Confirm that the TaskScheduler's shutdown behaviors use the same | 768 // Confirm that the TaskScheduler's shutdown behaviors use the same |
754 // underlying values as SequencedWorkerPool. | 769 // underlying values as SequencedWorkerPool. |
755 static_assert( | 770 static_assert( |
756 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == | 771 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
757 static_cast<int>(CONTINUE_ON_SHUTDOWN), | 772 static_cast<int>(CONTINUE_ON_SHUTDOWN), |
758 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | 773 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
759 "CONTINUE_ON_SHUTDOWN."); | 774 "CONTINUE_ON_SHUTDOWN."); |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
801 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() | 816 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() |
802 // directly. | 817 // directly. |
803 DCHECK_GT(max_threads_, 1U); | 818 DCHECK_GT(max_threads_, 1U); |
804 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, | 819 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
805 sequenced.task); | 820 sequenced.task); |
806 } | 821 } |
807 } | 822 } |
808 | 823 |
809 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 824 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
810 AutoLock lock(lock_); | 825 AutoLock lock(lock_); |
811 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 826 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 827 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
812 if (!runs_tasks_on_verifier_) { | 828 if (!runs_tasks_on_verifier_) { |
813 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( | 829 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
814 TaskTraits().WithFileIO().WithPriority(task_priority_), | 830 TaskTraits().WithFileIO().WithPriority(task_priority_), |
815 ExecutionMode::PARALLEL); | 831 ExecutionMode::PARALLEL); |
816 } | 832 } |
817 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); | 833 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
818 } else { | 834 } else { |
819 return ContainsKey(threads_, PlatformThread::CurrentId()); | 835 return ContainsKey(threads_, PlatformThread::CurrentId()); |
820 } | 836 } |
821 } | 837 } |
822 | 838 |
823 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 839 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
824 SequenceToken sequence_token) const { | 840 SequenceToken sequence_token) const { |
825 AutoLock lock(lock_); | 841 AutoLock lock(lock_); |
826 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 842 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 843 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
827 // TODO(gab): This currently only verifies that the current thread is a | 844 // TODO(gab): This currently only verifies that the current thread is a |
828 // thread on which a task bound to |sequence_token| *could* run, but it | 845 // thread on which a task bound to |sequence_token| *could* run, but it |
829 // doesn't verify that the current is *currently running* a task bound to | 846 // doesn't verify that the current is *currently running* a task bound to |
830 // |sequence_token|. | 847 // |sequence_token|. |
831 const auto sequenced_task_runner_it = | 848 const auto sequenced_task_runner_it = |
832 sequenced_task_runner_map_.find(sequence_token.id_); | 849 sequenced_task_runner_map_.find(sequence_token.id_); |
833 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && | 850 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
834 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); | 851 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
835 } else { | 852 } else { |
836 ThreadMap::const_iterator found = | 853 ThreadMap::const_iterator found = |
(...skipping 30 matching lines...) Expand all Loading... |
867 int max_new_blocking_tasks_after_shutdown) { | 884 int max_new_blocking_tasks_after_shutdown) { |
868 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); | 885 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
869 { | 886 { |
870 AutoLock lock(lock_); | 887 AutoLock lock(lock_); |
871 // Cleanup and Shutdown should not be called concurrently. | 888 // Cleanup and Shutdown should not be called concurrently. |
872 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 889 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
873 if (shutdown_called_) | 890 if (shutdown_called_) |
874 return; | 891 return; |
875 shutdown_called_ = true; | 892 shutdown_called_ = true; |
876 | 893 |
877 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) | 894 if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| 895 AllPoolsState::WORKER_CREATED) |
878 return; | 896 return; |
879 | 897 |
880 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 898 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
881 | 899 |
882 // Tickle the threads. This will wake up a waiting one so it will know that | 900 // Tickle the threads. This will wake up a waiting one so it will know that |
883 // it can exit, which in turn will wake up any other waiting ones. | 901 // it can exit, which in turn will wake up any other waiting ones. |
884 SignalHasWork(); | 902 SignalHasWork(); |
885 | 903 |
886 // There are no pending or running tasks blocking shutdown, we're done. | 904 // There are no pending or running tasks blocking shutdown, we're done. |
887 if (CanShutdown()) | 905 if (CanShutdown()) |
(...skipping 21 matching lines...) Expand all Loading... |
909 TimeTicks::Now() - shutdown_wait_begin); | 927 TimeTicks::Now() - shutdown_wait_begin); |
910 #endif | 928 #endif |
911 } | 929 } |
912 | 930 |
913 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { | 931 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
914 AutoLock lock(lock_); | 932 AutoLock lock(lock_); |
915 return shutdown_called_; | 933 return shutdown_called_; |
916 } | 934 } |
917 | 935 |
918 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 936 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
919 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 937 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 938 subtle::NoBarrier_Load(&g_all_pools_state)); |
920 { | 939 { |
921 AutoLock lock(lock_); | 940 AutoLock lock(lock_); |
922 DCHECK(thread_being_created_); | 941 DCHECK(thread_being_created_); |
923 thread_being_created_ = false; | 942 thread_being_created_ = false; |
924 auto result = threads_.insert( | 943 auto result = threads_.insert( |
925 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); | 944 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); |
926 DCHECK(result.second); | 945 DCHECK(result.second); |
927 | 946 |
928 while (true) { | 947 while (true) { |
929 #if defined(OS_MACOSX) | 948 #if defined(OS_MACOSX) |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1044 | 1063 |
1045 // We noticed we should exit. Wake up the next worker so it knows it should | 1064 // We noticed we should exit. Wake up the next worker so it knows it should |
1046 // exit as well (because the Shutdown() code only signals once). | 1065 // exit as well (because the Shutdown() code only signals once). |
1047 SignalHasWork(); | 1066 SignalHasWork(); |
1048 | 1067 |
1049 // Possibly unblock shutdown. | 1068 // Possibly unblock shutdown. |
1050 can_shutdown_cv_.Signal(); | 1069 can_shutdown_cv_.Signal(); |
1051 } | 1070 } |
1052 | 1071 |
1053 void SequencedWorkerPool::Inner::HandleCleanup() { | 1072 void SequencedWorkerPool::Inner::HandleCleanup() { |
1054 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1073 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1074 subtle::NoBarrier_Load(&g_all_pools_state)); |
1055 | 1075 |
1056 lock_.AssertAcquired(); | 1076 lock_.AssertAcquired(); |
1057 if (cleanup_state_ == CLEANUP_DONE) | 1077 if (cleanup_state_ == CLEANUP_DONE) |
1058 return; | 1078 return; |
1059 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1079 if (cleanup_state_ == CLEANUP_REQUESTED) { |
1060 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1080 // We win, we get to do the cleanup as soon as the others wise up and idle. |
1061 cleanup_state_ = CLEANUP_STARTING; | 1081 cleanup_state_ = CLEANUP_STARTING; |
1062 while (thread_being_created_ || | 1082 while (thread_being_created_ || |
1063 cleanup_idlers_ != threads_.size() - 1) { | 1083 cleanup_idlers_ != threads_.size() - 1) { |
1064 has_work_cv_.Signal(); | 1084 has_work_cv_.Signal(); |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1111 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1131 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
1112 lock_.AssertAcquired(); | 1132 lock_.AssertAcquired(); |
1113 // We assume that we never create enough tasks to wrap around. | 1133 // We assume that we never create enough tasks to wrap around. |
1114 return next_sequence_task_number_++; | 1134 return next_sequence_task_number_++; |
1115 } | 1135 } |
1116 | 1136 |
1117 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1137 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
1118 SequencedTask* task, | 1138 SequencedTask* task, |
1119 TimeDelta* wait_time, | 1139 TimeDelta* wait_time, |
1120 std::vector<Closure>* delete_these_outside_lock) { | 1140 std::vector<Closure>* delete_these_outside_lock) { |
1121 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1141 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1142 subtle::NoBarrier_Load(&g_all_pools_state)); |
1122 | 1143 |
1123 lock_.AssertAcquired(); | 1144 lock_.AssertAcquired(); |
1124 | 1145 |
1125 // Find the next task with a sequence token that's not currently in use. | 1146 // Find the next task with a sequence token that's not currently in use. |
1126 // If the token is in use, that means another thread is running something | 1147 // If the token is in use, that means another thread is running something |
1127 // in that sequence, and we can't run it without going out-of-order. | 1148 // in that sequence, and we can't run it without going out-of-order. |
1128 // | 1149 // |
1129 // This algorithm is simple and fair, but inefficient in some cases. For | 1150 // This algorithm is simple and fair, but inefficient in some cases. For |
1130 // example, say somebody schedules 1000 slow tasks with the same sequence | 1151 // example, say somebody schedules 1000 slow tasks with the same sequence |
1131 // number. We'll have to go through all those tasks each time we feel like | 1152 // number. We'll have to go through all those tasks each time we feel like |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1199 } | 1220 } |
1200 | 1221 |
1201 status = GET_WORK_FOUND; | 1222 status = GET_WORK_FOUND; |
1202 break; | 1223 break; |
1203 } | 1224 } |
1204 | 1225 |
1205 return status; | 1226 return status; |
1206 } | 1227 } |
1207 | 1228 |
1208 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { | 1229 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
1209 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1230 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1231 subtle::NoBarrier_Load(&g_all_pools_state)); |
1210 | 1232 |
1211 lock_.AssertAcquired(); | 1233 lock_.AssertAcquired(); |
1212 | 1234 |
1213 // Mark the task's sequence number as in use. | 1235 // Mark the task's sequence number as in use. |
1214 if (task.sequence_token_id) | 1236 if (task.sequence_token_id) |
1215 current_sequences_.insert(task.sequence_token_id); | 1237 current_sequences_.insert(task.sequence_token_id); |
1216 | 1238 |
1217 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN | 1239 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN |
1218 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread | 1240 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread |
1219 // completes. | 1241 // completes. |
(...skipping 12 matching lines...) Expand all Loading... |
1232 // if there is one waiting to pick up the next task. | 1254 // if there is one waiting to pick up the next task. |
1233 // | 1255 // |
1234 // Note that we really need to do this *before* running the task, not | 1256 // Note that we really need to do this *before* running the task, not |
1235 // after. Otherwise, if more than one task is posted, the creation of the | 1257 // after. Otherwise, if more than one task is posted, the creation of the |
1236 // second thread (since we only create one at a time) will be blocked by | 1258 // second thread (since we only create one at a time) will be blocked by |
1237 // the execution of the first task, which could be arbitrarily long. | 1259 // the execution of the first task, which could be arbitrarily long. |
1238 return PrepareToStartAdditionalThreadIfHelpful(); | 1260 return PrepareToStartAdditionalThreadIfHelpful(); |
1239 } | 1261 } |
1240 | 1262 |
1241 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 1263 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
1242 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1264 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1265 subtle::NoBarrier_Load(&g_all_pools_state)); |
1243 | 1266 |
1244 lock_.AssertAcquired(); | 1267 lock_.AssertAcquired(); |
1245 | 1268 |
1246 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { | 1269 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
1247 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 1270 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
1248 blocking_shutdown_thread_count_--; | 1271 blocking_shutdown_thread_count_--; |
1249 } | 1272 } |
1250 | 1273 |
1251 if (task.sequence_token_id) | 1274 if (task.sequence_token_id) |
1252 current_sequences_.erase(task.sequence_token_id); | 1275 current_sequences_.erase(task.sequence_token_id); |
1253 } | 1276 } |
1254 | 1277 |
1255 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 1278 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
1256 int sequence_token_id) const { | 1279 int sequence_token_id) const { |
1257 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1280 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1281 subtle::NoBarrier_Load(&g_all_pools_state)); |
1258 | 1282 |
1259 lock_.AssertAcquired(); | 1283 lock_.AssertAcquired(); |
1260 return !sequence_token_id || | 1284 return !sequence_token_id || |
1261 current_sequences_.find(sequence_token_id) == | 1285 current_sequences_.find(sequence_token_id) == |
1262 current_sequences_.end(); | 1286 current_sequences_.end(); |
1263 } | 1287 } |
1264 | 1288 |
1265 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { | 1289 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
1266 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1290 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1291 subtle::NoBarrier_Load(&g_all_pools_state)); |
1267 | 1292 |
1268 lock_.AssertAcquired(); | 1293 lock_.AssertAcquired(); |
1269 // How thread creation works: | 1294 // How thread creation works: |
1270 // | 1295 // |
1271 // We'de like to avoid creating threads with the lock held. However, we | 1296 // We'de like to avoid creating threads with the lock held. However, we |
1272 // need to be sure that we have an accurate accounting of the threads for | 1297 // need to be sure that we have an accurate accounting of the threads for |
1273 // proper Joining and deltion on shutdown. | 1298 // proper Joining and deltion on shutdown. |
1274 // | 1299 // |
1275 // We need to figure out if we need another thread with the lock held, which | 1300 // We need to figure out if we need another thread with the lock held, which |
1276 // is what this function does. It then marks us as in the process of creating | 1301 // is what this function does. It then marks us as in the process of creating |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1308 thread_being_created_ = true; | 1333 thread_being_created_ = true; |
1309 return static_cast<int>(threads_.size() + 1); | 1334 return static_cast<int>(threads_.size() + 1); |
1310 } | 1335 } |
1311 } | 1336 } |
1312 } | 1337 } |
1313 return 0; | 1338 return 0; |
1314 } | 1339 } |
1315 | 1340 |
1316 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 1341 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
1317 int thread_number) { | 1342 int thread_number) { |
1318 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1343 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1344 subtle::NoBarrier_Load(&g_all_pools_state)); |
1319 | 1345 |
1320 // Called outside of the lock. | 1346 // Called outside of the lock. |
1321 DCHECK_GT(thread_number, 0); | 1347 DCHECK_GT(thread_number, 0); |
1322 | 1348 |
1323 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { | 1349 if (subtle::NoBarrier_Load(&g_all_pools_state) != |
1324 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); | 1350 AllPoolsState::WORKER_CREATED) { |
1325 g_all_pools_state = AllPoolsState::WORKER_CREATED; | 1351 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
| 1352 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1353 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED); |
1326 } | 1354 } |
1327 | 1355 |
1328 // The worker is assigned to the list when the thread actually starts, which | 1356 // The worker is assigned to the list when the thread actually starts, which |
1329 // will manage the memory of the pointer. | 1357 // will manage the memory of the pointer. |
1330 new Worker(worker_pool_, thread_number, thread_name_prefix_); | 1358 new Worker(worker_pool_, thread_number, thread_name_prefix_); |
1331 } | 1359 } |
1332 | 1360 |
1333 void SequencedWorkerPool::Inner::SignalHasWork() { | 1361 void SequencedWorkerPool::Inner::SignalHasWork() { |
1334 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1362 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1363 subtle::NoBarrier_Load(&g_all_pools_state)); |
1335 | 1364 |
1336 has_work_cv_.Signal(); | 1365 has_work_cv_.Signal(); |
1337 if (testing_observer_) { | 1366 if (testing_observer_) { |
1338 testing_observer_->OnHasWork(); | 1367 testing_observer_->OnHasWork(); |
1339 } | 1368 } |
1340 } | 1369 } |
1341 | 1370 |
1342 bool SequencedWorkerPool::Inner::CanShutdown() const { | 1371 bool SequencedWorkerPool::Inner::CanShutdown() const { |
1343 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1372 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1373 subtle::NoBarrier_Load(&g_all_pools_state)); |
1344 lock_.AssertAcquired(); | 1374 lock_.AssertAcquired(); |
1345 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 1375 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
1346 return !thread_being_created_ && | 1376 return !thread_being_created_ && |
1347 blocking_shutdown_thread_count_ == 0 && | 1377 blocking_shutdown_thread_count_ == 0 && |
1348 blocking_shutdown_pending_task_count_ == 0; | 1378 blocking_shutdown_pending_task_count_ == 0; |
1349 } | 1379 } |
1350 | 1380 |
1351 base::StaticAtomicSequenceNumber | 1381 base::StaticAtomicSequenceNumber |
1352 SequencedWorkerPool::Inner::g_last_sequence_number_; | 1382 SequencedWorkerPool::Inner::g_last_sequence_number_; |
1353 | 1383 |
(...skipping 24 matching lines...) Expand all Loading... |
1378 } | 1408 } |
1379 | 1409 |
1380 // static | 1410 // static |
1381 void SequencedWorkerPool:: | 1411 void SequencedWorkerPool:: |
1382 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { | 1412 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
1383 DCHECK(TaskScheduler::GetInstance()); | 1413 DCHECK(TaskScheduler::GetInstance()); |
1384 // Hitting this DCHECK indicates that a task was posted to a | 1414 // Hitting this DCHECK indicates that a task was posted to a |
1385 // SequencedWorkerPool before the TaskScheduler was initialized and | 1415 // SequencedWorkerPool before the TaskScheduler was initialized and |
1386 // redirected, posting task to SequencedWorkerPools needs to at least be | 1416 // redirected, posting task to SequencedWorkerPools needs to at least be |
1387 // delayed until after that point. | 1417 // delayed until after that point. |
1388 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); | 1418 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
1389 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; | 1419 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1420 subtle::NoBarrier_Store(&g_all_pools_state, |
| 1421 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
1390 } | 1422 } |
1391 | 1423 |
1392 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1424 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1393 const std::string& thread_name_prefix, | 1425 const std::string& thread_name_prefix, |
1394 base::TaskPriority task_priority) | 1426 base::TaskPriority task_priority) |
1395 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1427 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1396 inner_(new Inner(this, | 1428 inner_(new Inner(this, |
1397 max_threads, | 1429 max_threads, |
1398 thread_name_prefix, | 1430 thread_name_prefix, |
1399 task_priority, | 1431 task_priority, |
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1538 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1570 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1539 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1571 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
1540 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1572 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1541 } | 1573 } |
1542 | 1574 |
1543 bool SequencedWorkerPool::IsShutdownInProgress() { | 1575 bool SequencedWorkerPool::IsShutdownInProgress() { |
1544 return inner_->IsShutdownInProgress(); | 1576 return inner_->IsShutdownInProgress(); |
1545 } | 1577 } |
1546 | 1578 |
1547 } // namespace base | 1579 } // namespace base |
OLD | NEW |