OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_thread.h" | 5 #include "base/task_scheduler/scheduler_worker_thread.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/lazy_instance.h" | |
11 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/memory/ptr_util.h" | |
12 #include "base/task_scheduler/task_tracker.h" | 14 #include "base/task_scheduler/task_tracker.h" |
15 #include "base/task_scheduler/utils.h" | |
16 #include "base/threading/thread_local.h" | |
17 #include "base/time/time.h" | |
13 | 18 |
14 namespace base { | 19 namespace base { |
15 namespace internal { | 20 namespace internal { |
16 | 21 |
22 namespace { | |
23 | |
24 // SchedulerWorkerThread that owns the current thread, if any. | |
25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky | |
26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; | |
27 | |
28 // A task runner that runs tasks on a single SchedulerWorkerThread. | |
29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | |
30 public: | |
31 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
32 SchedulerTaskExecutor* executor, | |
gab
2016/04/18 18:04:02
TODO on memory management for this one as well for
fdoray
2016/04/18 19:04:50
Done.
| |
33 TaskTracker* task_tracker, | |
34 DelayedTaskManager* delayed_task_manager) | |
35 : traits_(traits), | |
36 executor_(executor), | |
37 task_tracker_(task_tracker), | |
38 delayed_task_manager_(delayed_task_manager) {} | |
39 | |
40 // SingleThreadTaskRunner: | |
41 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
42 const Closure& closure, | |
43 TimeDelta delay) override { | |
44 // Post the task as part of |sequence|. | |
45 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, | |
46 executor_, task_tracker_, delayed_task_manager_); | |
47 } | |
48 | |
49 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
50 const Closure& closure, | |
51 base::TimeDelta delay) override { | |
52 // Tasks are never nested within the task scheduler. | |
53 return PostDelayedTask(from_here, closure, delay); | |
54 } | |
55 | |
56 bool RunsTasksOnCurrentThread() const override { | |
57 return tls_current_worker_thread.Get().Get() == executor_; | |
58 } | |
59 | |
60 private: | |
61 ~SchedulerSingleThreadTaskRunner() override = default; | |
62 | |
63 // Sequence for all Tasks posted through this TaskRunner. | |
64 const scoped_refptr<Sequence> sequence_ = new Sequence; | |
65 | |
66 const TaskTraits traits_; | |
67 SchedulerTaskExecutor* const executor_; | |
68 TaskTracker* const task_tracker_; | |
69 DelayedTaskManager* const delayed_task_manager_; | |
70 | |
71 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
72 }; | |
73 | |
74 } // namespace | |
75 | |
17 std::unique_ptr<SchedulerWorkerThread> | 76 std::unique_ptr<SchedulerWorkerThread> |
18 SchedulerWorkerThread::CreateSchedulerWorkerThread( | 77 SchedulerWorkerThread::CreateSchedulerWorkerThread( |
19 ThreadPriority thread_priority, | 78 ThreadPriority thread_priority, |
20 Delegate* delegate, | 79 Delegate* delegate, |
21 TaskTracker* task_tracker) { | 80 TaskTracker* task_tracker, |
81 DelayedTaskManager* delayed_task_manager, | |
82 const PriorityQueue* predecessor_priority_queue) { | |
22 std::unique_ptr<SchedulerWorkerThread> worker_thread( | 83 std::unique_ptr<SchedulerWorkerThread> worker_thread( |
23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); | 84 new SchedulerWorkerThread(thread_priority, delegate, task_tracker, |
85 delayed_task_manager, | |
86 predecessor_priority_queue)); | |
24 | 87 |
25 if (worker_thread->thread_handle_.is_null()) | 88 if (worker_thread->thread_handle_.is_null()) |
26 return nullptr; | 89 return nullptr; |
27 return worker_thread; | 90 return worker_thread; |
28 } | 91 } |
29 | 92 |
30 SchedulerWorkerThread::~SchedulerWorkerThread() { | 93 SchedulerWorkerThread::~SchedulerWorkerThread() { |
31 DCHECK(ShouldExitForTesting()); | 94 DCHECK(ShouldExitForTesting()); |
32 } | 95 } |
33 | 96 |
97 scoped_refptr<SingleThreadTaskRunner> | |
98 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) { | |
99 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | |
100 traits, this, task_tracker_, delayed_task_manager_)); | |
101 } | |
102 | |
34 void SchedulerWorkerThread::WakeUp() { | 103 void SchedulerWorkerThread::WakeUp() { |
35 wake_up_event_.Signal(); | 104 wake_up_event_.Signal(); |
36 } | 105 } |
37 | 106 |
38 void SchedulerWorkerThread::JoinForTesting() { | 107 void SchedulerWorkerThread::JoinForTesting() { |
39 { | 108 { |
40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 109 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
41 should_exit_for_testing_ = true; | 110 should_exit_for_testing_ = true; |
42 } | 111 } |
43 WakeUp(); | 112 WakeUp(); |
44 PlatformThread::Join(thread_handle_); | 113 PlatformThread::Join(thread_handle_); |
45 } | 114 } |
46 | 115 |
47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, | 116 void SchedulerWorkerThread::PostTaskWithSequence( |
48 Delegate* delegate, | 117 std::unique_ptr<Task> task, |
49 TaskTracker* task_tracker) | 118 scoped_refptr<Sequence> sequence) { |
119 DCHECK(task); | |
120 DCHECK(sequence); | |
121 | |
122 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( | |
123 std::move(task), std::move(sequence), &single_threaded_priority_queue_); | |
124 | |
125 // If |sequence| wasn't empty before |task| was inserted into it, the worker | |
126 // thread has already been woken up to run it. | |
gab
2016/04/18 18:04:02
How about s/run/process/ (at first I thought "it"
fdoray
2016/04/18 19:04:50
Done.
| |
127 if (sequence_was_empty) | |
128 WakeUp(); | |
gab
2016/04/18 18:04:02
Add a TODO to check for removal from idle stack (a
fdoray
2016/04/18 19:04:50
Done.
| |
129 } | |
130 | |
131 SchedulerWorkerThread::SchedulerWorkerThread( | |
132 ThreadPriority thread_priority, | |
133 Delegate* delegate, | |
134 TaskTracker* task_tracker, | |
135 DelayedTaskManager* delayed_task_manager, | |
136 const PriorityQueue* predecessor_priority_queue) | |
50 : wake_up_event_(false, false), | 137 : wake_up_event_(false, false), |
138 single_threaded_priority_queue_(predecessor_priority_queue), | |
51 delegate_(delegate), | 139 delegate_(delegate), |
52 task_tracker_(task_tracker) { | 140 task_tracker_(task_tracker), |
141 delayed_task_manager_(delayed_task_manager) { | |
53 DCHECK(delegate_); | 142 DCHECK(delegate_); |
54 DCHECK(task_tracker_); | 143 DCHECK(task_tracker_); |
144 DCHECK(delayed_task_manager_); | |
145 DCHECK(predecessor_priority_queue); | |
55 | 146 |
56 const size_t kDefaultStackSize = 0; | 147 const size_t kDefaultStackSize = 0; |
57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | 148 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
58 thread_priority); | 149 thread_priority); |
59 } | 150 } |
60 | 151 |
61 void SchedulerWorkerThread::ThreadMain() { | 152 void SchedulerWorkerThread::ThreadMain() { |
62 delegate_->OnMainEntry(); | 153 delegate_->OnMainEntry(); |
154 tls_current_worker_thread.Get().Set(this); | |
63 | 155 |
64 // A SchedulerWorkerThread starts out sleeping. | 156 // A SchedulerWorkerThread starts out sleeping. |
65 wake_up_event_.Wait(); | 157 wake_up_event_.Wait(); |
66 | 158 |
67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { | 159 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
68 // Get the sequence containing the next task to execute. | 160 // Get the sequence containing the next task to execute. |
69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); | 161 bool is_single_threaded_sequence = false; |
162 scoped_refptr<Sequence> sequence = delegate_->GetWork( | |
163 this, &single_threaded_priority_queue_, &is_single_threaded_sequence); | |
70 | 164 |
71 if (!sequence) { | 165 if (!sequence) { |
72 wake_up_event_.Wait(); | 166 wake_up_event_.Wait(); |
73 continue; | 167 continue; |
74 } | 168 } |
75 | 169 |
76 task_tracker_->RunTask(sequence->PeekTask()); | 170 task_tracker_->RunTask(sequence->PeekTask()); |
77 | 171 |
78 const bool sequence_became_empty = sequence->PopTask(); | 172 const bool sequence_became_empty = sequence->PopTask(); |
79 | 173 |
80 // If |sequence| isn't empty immediately after the pop, enqueue it to | 174 // If |sequence| isn't empty immediately after the pop, enqueue it to |
81 // maintain the invariant that a non-empty Sequence is always referenced by | 175 // maintain the invariant that a non-empty Sequence is always referenced by |
82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and | 176 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and |
83 // there are live references to it, it will be enqueued when a Task is added | 177 // there are live references to it, it will be enqueued when a Task is added |
84 // to it. Otherwise, it will be destroyed at the end of this scope. | 178 // to it. Otherwise, it will be destroyed at the end of this scope. |
85 if (!sequence_became_empty) | 179 if (!sequence_became_empty) { |
86 delegate_->EnqueueSequence(std::move(sequence)); | 180 if (is_single_threaded_sequence) { |
181 const auto sort_key = sequence->GetSortKey(); | |
182 single_threaded_priority_queue_.BeginTransaction()->Push( | |
183 WrapUnique(new PriorityQueue::SequenceAndSortKey( | |
184 std::move(sequence), sort_key))); | |
185 } else { | |
186 delegate_->EnqueueSequence(std::move(sequence)); | |
187 } | |
188 } | |
87 | 189 |
88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run | 190 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run |
89 // Tasks from Sequences returned by the GetWork() method of |delegate_| | 191 // Tasks from Sequences returned by the GetWork() method of |delegate_| |
90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | 192 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break |
91 // this invariant and avoids a useless loop iteration before going to sleep | 193 // this invariant and avoids a useless loop iteration before going to sleep |
92 // if WakeUp() is called while this SchedulerWorkerThread is awake. | 194 // if WakeUp() is called while this SchedulerWorkerThread is awake. |
93 wake_up_event_.Reset(); | 195 wake_up_event_.Reset(); |
94 } | 196 } |
95 } | 197 } |
96 | 198 |
97 bool SchedulerWorkerThread::ShouldExitForTesting() const { | 199 bool SchedulerWorkerThread::ShouldExitForTesting() const { |
98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 200 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
99 return should_exit_for_testing_; | 201 return should_exit_for_testing_; |
100 } | 202 } |
101 | 203 |
102 } // namespace internal | 204 } // namespace internal |
103 } // namespace base | 205 } // namespace base |
OLD | NEW |