OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_thread.h" | 5 #include "base/task_scheduler/scheduler_worker_thread.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
| 11 #include "base/lazy_instance.h" |
11 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/memory/ptr_util.h" |
12 #include "base/task_scheduler/task_tracker.h" | 14 #include "base/task_scheduler/task_tracker.h" |
| 15 #include "base/task_scheduler/utils.h" |
| 16 #include "base/threading/thread_local.h" |
| 17 #include "base/time/time.h" |
13 | 18 |
14 namespace base { | 19 namespace base { |
15 namespace internal { | 20 namespace internal { |
16 | 21 |
| 22 namespace { |
| 23 |
| 24 // SchedulerWorkerThread that owns the current thread, if any. |
| 25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky |
| 26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; |
| 27 |
| 28 // A task runner that runs tasks on a single SchedulerWorkerThread. |
| 29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| 30 public: |
| 31 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| 32 // tasks so long as |executor| is alive. |
| 33 // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
| 34 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| 35 SchedulerTaskExecutor* executor, |
| 36 TaskTracker* task_tracker, |
| 37 DelayedTaskManager* delayed_task_manager) |
| 38 : traits_(traits), |
| 39 executor_(executor), |
| 40 task_tracker_(task_tracker), |
| 41 delayed_task_manager_(delayed_task_manager) {} |
| 42 |
| 43 // SingleThreadTaskRunner: |
| 44 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 45 const Closure& closure, |
| 46 TimeDelta delay) override { |
| 47 // Post the task as part of |sequence|. |
| 48 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, |
| 49 executor_, task_tracker_, delayed_task_manager_); |
| 50 } |
| 51 |
| 52 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 53 const Closure& closure, |
| 54 base::TimeDelta delay) override { |
| 55 // Tasks are never nested within the task scheduler. |
| 56 return PostDelayedTask(from_here, closure, delay); |
| 57 } |
| 58 |
| 59 bool RunsTasksOnCurrentThread() const override { |
| 60 return tls_current_worker_thread.Get().Get() == executor_; |
| 61 } |
| 62 |
| 63 private: |
| 64 ~SchedulerSingleThreadTaskRunner() override = default; |
| 65 |
| 66 // Sequence for all Tasks posted through this TaskRunner. |
| 67 const scoped_refptr<Sequence> sequence_ = new Sequence; |
| 68 |
| 69 const TaskTraits traits_; |
| 70 SchedulerTaskExecutor* const executor_; |
| 71 TaskTracker* const task_tracker_; |
| 72 DelayedTaskManager* const delayed_task_manager_; |
| 73 |
| 74 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| 75 }; |
| 76 |
| 77 } // namespace |
| 78 |
17 std::unique_ptr<SchedulerWorkerThread> | 79 std::unique_ptr<SchedulerWorkerThread> |
18 SchedulerWorkerThread::CreateSchedulerWorkerThread( | 80 SchedulerWorkerThread::CreateSchedulerWorkerThread( |
19 ThreadPriority thread_priority, | 81 ThreadPriority thread_priority, |
20 Delegate* delegate, | 82 Delegate* delegate, |
21 TaskTracker* task_tracker) { | 83 TaskTracker* task_tracker, |
| 84 DelayedTaskManager* delayed_task_manager, |
| 85 const PriorityQueue* predecessor_priority_queue) { |
22 std::unique_ptr<SchedulerWorkerThread> worker_thread( | 86 std::unique_ptr<SchedulerWorkerThread> worker_thread( |
23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); | 87 new SchedulerWorkerThread(thread_priority, delegate, task_tracker, |
| 88 delayed_task_manager, |
| 89 predecessor_priority_queue)); |
24 | 90 |
25 if (worker_thread->thread_handle_.is_null()) | 91 if (worker_thread->thread_handle_.is_null()) |
26 return nullptr; | 92 return nullptr; |
27 return worker_thread; | 93 return worker_thread; |
28 } | 94 } |
29 | 95 |
30 SchedulerWorkerThread::~SchedulerWorkerThread() { | 96 SchedulerWorkerThread::~SchedulerWorkerThread() { |
31 DCHECK(ShouldExitForTesting()); | 97 DCHECK(ShouldExitForTesting()); |
32 } | 98 } |
33 | 99 |
| 100 scoped_refptr<SingleThreadTaskRunner> |
| 101 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) { |
| 102 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
| 103 traits, this, task_tracker_, delayed_task_manager_)); |
| 104 } |
| 105 |
34 void SchedulerWorkerThread::WakeUp() { | 106 void SchedulerWorkerThread::WakeUp() { |
35 wake_up_event_.Signal(); | 107 wake_up_event_.Signal(); |
36 } | 108 } |
37 | 109 |
38 void SchedulerWorkerThread::JoinForTesting() { | 110 void SchedulerWorkerThread::JoinForTesting() { |
39 { | 111 { |
40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 112 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
41 should_exit_for_testing_ = true; | 113 should_exit_for_testing_ = true; |
42 } | 114 } |
43 WakeUp(); | 115 WakeUp(); |
44 PlatformThread::Join(thread_handle_); | 116 PlatformThread::Join(thread_handle_); |
45 } | 117 } |
46 | 118 |
47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, | 119 void SchedulerWorkerThread::PostTaskWithSequence( |
48 Delegate* delegate, | 120 std::unique_ptr<Task> task, |
49 TaskTracker* task_tracker) | 121 scoped_refptr<Sequence> sequence) { |
| 122 DCHECK(task); |
| 123 DCHECK(sequence); |
| 124 |
| 125 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( |
| 126 std::move(task), std::move(sequence), &single_threaded_priority_queue_); |
| 127 |
| 128 // If |sequence| wasn't empty before |task| was inserted into it, the worker |
| 129 // thread has already been woken up to process it. |
| 130 // TODO(fdoray): Remove the worker thread from the stack of idle threads of |
| 131 // its parent thread pool when it is woken up to run single-threaded tasks. |
| 132 if (sequence_was_empty) |
| 133 WakeUp(); |
| 134 } |
| 135 |
| 136 SchedulerWorkerThread::SchedulerWorkerThread( |
| 137 ThreadPriority thread_priority, |
| 138 Delegate* delegate, |
| 139 TaskTracker* task_tracker, |
| 140 DelayedTaskManager* delayed_task_manager, |
| 141 const PriorityQueue* predecessor_priority_queue) |
50 : wake_up_event_(false, false), | 142 : wake_up_event_(false, false), |
| 143 single_threaded_priority_queue_(predecessor_priority_queue), |
51 delegate_(delegate), | 144 delegate_(delegate), |
52 task_tracker_(task_tracker) { | 145 task_tracker_(task_tracker), |
| 146 delayed_task_manager_(delayed_task_manager) { |
53 DCHECK(delegate_); | 147 DCHECK(delegate_); |
54 DCHECK(task_tracker_); | 148 DCHECK(task_tracker_); |
| 149 DCHECK(delayed_task_manager_); |
| 150 DCHECK(predecessor_priority_queue); |
55 | 151 |
56 const size_t kDefaultStackSize = 0; | 152 const size_t kDefaultStackSize = 0; |
57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | 153 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
58 thread_priority); | 154 thread_priority); |
59 } | 155 } |
60 | 156 |
61 void SchedulerWorkerThread::ThreadMain() { | 157 void SchedulerWorkerThread::ThreadMain() { |
62 delegate_->OnMainEntry(); | 158 delegate_->OnMainEntry(); |
| 159 tls_current_worker_thread.Get().Set(this); |
63 | 160 |
64 // A SchedulerWorkerThread starts out sleeping. | 161 // A SchedulerWorkerThread starts out sleeping. |
65 wake_up_event_.Wait(); | 162 wake_up_event_.Wait(); |
66 | 163 |
67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { | 164 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
68 // Get the sequence containing the next task to execute. | 165 // Get the sequence containing the next task to execute. |
69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); | 166 bool is_single_threaded_sequence = false; |
| 167 scoped_refptr<Sequence> sequence = delegate_->GetWork( |
| 168 this, &single_threaded_priority_queue_, &is_single_threaded_sequence); |
70 | 169 |
71 if (!sequence) { | 170 if (!sequence) { |
72 wake_up_event_.Wait(); | 171 wake_up_event_.Wait(); |
73 continue; | 172 continue; |
74 } | 173 } |
75 | 174 |
76 task_tracker_->RunTask(sequence->PeekTask()); | 175 task_tracker_->RunTask(sequence->PeekTask()); |
77 | 176 |
78 const bool sequence_became_empty = sequence->PopTask(); | 177 const bool sequence_became_empty = sequence->PopTask(); |
79 | 178 |
80 // If |sequence| isn't empty immediately after the pop, enqueue it to | 179 // If |sequence| isn't empty immediately after the pop, enqueue it to |
81 // maintain the invariant that a non-empty Sequence is always referenced by | 180 // maintain the invariant that a non-empty Sequence is always referenced by |
82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and | 181 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and |
83 // there are live references to it, it will be enqueued when a Task is added | 182 // there are live references to it, it will be enqueued when a Task is added |
84 // to it. Otherwise, it will be destroyed at the end of this scope. | 183 // to it. Otherwise, it will be destroyed at the end of this scope. |
85 if (!sequence_became_empty) | 184 if (!sequence_became_empty) { |
86 delegate_->EnqueueSequence(std::move(sequence)); | 185 if (is_single_threaded_sequence) { |
| 186 const auto sort_key = sequence->GetSortKey(); |
| 187 single_threaded_priority_queue_.BeginTransaction()->Push( |
| 188 WrapUnique(new PriorityQueue::SequenceAndSortKey( |
| 189 std::move(sequence), sort_key))); |
| 190 } else { |
| 191 delegate_->EnqueueSequence(std::move(sequence)); |
| 192 } |
| 193 } |
87 | 194 |
88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run | 195 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run |
89 // Tasks from Sequences returned by the GetWork() method of |delegate_| | 196 // Tasks from Sequences returned by the GetWork() method of |delegate_| |
90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | 197 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break |
91 // this invariant and avoids a useless loop iteration before going to sleep | 198 // this invariant and avoids a useless loop iteration before going to sleep |
92 // if WakeUp() is called while this SchedulerWorkerThread is awake. | 199 // if WakeUp() is called while this SchedulerWorkerThread is awake. |
93 wake_up_event_.Reset(); | 200 wake_up_event_.Reset(); |
94 } | 201 } |
95 } | 202 } |
96 | 203 |
97 bool SchedulerWorkerThread::ShouldExitForTesting() const { | 204 bool SchedulerWorkerThread::ShouldExitForTesting() const { |
98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 205 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
99 return should_exit_for_testing_; | 206 return should_exit_for_testing_; |
100 } | 207 } |
101 | 208 |
102 } // namespace internal | 209 } // namespace internal |
103 } // namespace base | 210 } // namespace base |
OLD | NEW |