OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_thread.h" | 5 #include "base/task_scheduler/scheduler_worker_thread.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/lazy_instance.h" | |
11 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/memory/ptr_util.h" | |
12 #include "base/task_scheduler/task_tracker.h" | 14 #include "base/task_scheduler/task_tracker.h" |
15 #include "base/task_scheduler/utils.h" | |
16 #include "base/threading/thread_local.h" | |
17 #include "base/time/time.h" | |
13 | 18 |
14 namespace base { | 19 namespace base { |
15 namespace internal { | 20 namespace internal { |
16 | 21 |
22 namespace { | |
23 | |
24 // SchedulerWorkerThread that owns the current thread, if any. | |
25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky | |
26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; | |
27 | |
28 // A task runner that runs tasks on a single SchedulerWorkerThread. | |
29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | |
30 public: | |
31 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | |
32 // tasks so long as |executor| is alive. | |
33 // TODO(robliao): Find a concrete way to manage |executor|'s memory. | |
34 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
35 SchedulerTaskExecutor* executor, | |
36 TaskTracker* task_tracker, | |
37 DelayedTaskManager* delayed_task_manager) | |
38 : traits_(traits), | |
39 executor_(executor), | |
40 task_tracker_(task_tracker), | |
41 delayed_task_manager_(delayed_task_manager) {} | |
42 | |
43 // SingleThreadTaskRunner: | |
44 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
45 const Closure& closure, | |
46 TimeDelta delay) override { | |
47 // Post the task as part of |sequence_|. | |
48 return PostTaskToExecutor( | |
49 WrapUnique( | |
50 new Task(from_here, closure, traits_, | |
51 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), | |
danakj
2016/04/20 23:16:16
nit: maybe this conversion from delta to ticks cou
fdoray
2016/04/21 19:31:28
We decided to construct the Task outside of PostTa
| |
52 sequence_, executor_, task_tracker_, delayed_task_manager_); | |
53 } | |
54 | |
55 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
56 const Closure& closure, | |
57 base::TimeDelta delay) override { | |
58 // Tasks are never nested within the task scheduler. | |
59 return PostDelayedTask(from_here, closure, delay); | |
60 } | |
61 | |
62 bool RunsTasksOnCurrentThread() const override { | |
63 return tls_current_worker_thread.Get().Get() == executor_; | |
64 } | |
65 | |
66 private: | |
67 ~SchedulerSingleThreadTaskRunner() override = default; | |
68 | |
69 // Sequence for all Tasks posted through this TaskRunner. | |
70 const scoped_refptr<Sequence> sequence_ = new Sequence; | |
71 | |
72 const TaskTraits traits_; | |
73 SchedulerTaskExecutor* const executor_; | |
74 TaskTracker* const task_tracker_; | |
75 DelayedTaskManager* const delayed_task_manager_; | |
76 | |
77 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
78 }; | |
79 | |
80 } // namespace | |
81 | |
17 std::unique_ptr<SchedulerWorkerThread> | 82 std::unique_ptr<SchedulerWorkerThread> |
18 SchedulerWorkerThread::CreateSchedulerWorkerThread( | 83 SchedulerWorkerThread::CreateSchedulerWorkerThread( |
19 ThreadPriority thread_priority, | 84 ThreadPriority thread_priority, |
20 Delegate* delegate, | 85 Delegate* delegate, |
21 TaskTracker* task_tracker) { | 86 TaskTracker* task_tracker, |
87 DelayedTaskManager* delayed_task_manager, | |
88 const PriorityQueue* predecessor_priority_queue) { | |
22 std::unique_ptr<SchedulerWorkerThread> worker_thread( | 89 std::unique_ptr<SchedulerWorkerThread> worker_thread( |
23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); | 90 new SchedulerWorkerThread(thread_priority, delegate, task_tracker, |
91 delayed_task_manager, | |
92 predecessor_priority_queue)); | |
24 | 93 |
25 if (worker_thread->thread_handle_.is_null()) | 94 if (worker_thread->thread_handle_.is_null()) |
26 return nullptr; | 95 return nullptr; |
27 return worker_thread; | 96 return worker_thread; |
28 } | 97 } |
29 | 98 |
30 SchedulerWorkerThread::~SchedulerWorkerThread() { | 99 SchedulerWorkerThread::~SchedulerWorkerThread() { |
31 DCHECK(ShouldExitForTesting()); | 100 DCHECK(ShouldExitForTesting()); |
32 } | 101 } |
33 | 102 |
103 scoped_refptr<SingleThreadTaskRunner> | |
104 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) { | |
105 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | |
106 traits, this, task_tracker_, delayed_task_manager_)); | |
107 } | |
108 | |
34 void SchedulerWorkerThread::WakeUp() { | 109 void SchedulerWorkerThread::WakeUp() { |
35 wake_up_event_.Signal(); | 110 wake_up_event_.Signal(); |
36 } | 111 } |
37 | 112 |
38 void SchedulerWorkerThread::JoinForTesting() { | 113 void SchedulerWorkerThread::JoinForTesting() { |
39 { | 114 { |
40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 115 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
41 should_exit_for_testing_ = true; | 116 should_exit_for_testing_ = true; |
42 } | 117 } |
43 WakeUp(); | 118 WakeUp(); |
44 PlatformThread::Join(thread_handle_); | 119 PlatformThread::Join(thread_handle_); |
45 } | 120 } |
46 | 121 |
47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, | 122 void SchedulerWorkerThread::PostTaskWithSequence( |
48 Delegate* delegate, | 123 std::unique_ptr<Task> task, |
49 TaskTracker* task_tracker) | 124 scoped_refptr<Sequence> sequence) { |
125 DCHECK(task); | |
126 DCHECK(sequence); | |
127 | |
128 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( | |
129 std::move(task), std::move(sequence), &single_threaded_priority_queue_); | |
130 | |
131 // If |sequence| wasn't empty before |task| was inserted into it, the worker | |
132 // thread has already been woken up to process it. | |
133 // TODO(fdoray): Remove the worker thread from the stack of idle threads of | |
134 // its parent thread pool when it is woken up to run single-threaded tasks. | |
135 if (sequence_was_empty) | |
136 WakeUp(); | |
137 } | |
138 | |
139 SchedulerWorkerThread::SchedulerWorkerThread( | |
140 ThreadPriority thread_priority, | |
141 Delegate* delegate, | |
142 TaskTracker* task_tracker, | |
143 DelayedTaskManager* delayed_task_manager, | |
144 const PriorityQueue* predecessor_priority_queue) | |
50 : wake_up_event_(false, false), | 145 : wake_up_event_(false, false), |
146 single_threaded_priority_queue_(predecessor_priority_queue), | |
51 delegate_(delegate), | 147 delegate_(delegate), |
52 task_tracker_(task_tracker) { | 148 task_tracker_(task_tracker), |
149 delayed_task_manager_(delayed_task_manager) { | |
53 DCHECK(delegate_); | 150 DCHECK(delegate_); |
54 DCHECK(task_tracker_); | 151 DCHECK(task_tracker_); |
152 DCHECK(delayed_task_manager_); | |
153 DCHECK(predecessor_priority_queue); | |
55 | 154 |
56 const size_t kDefaultStackSize = 0; | 155 const size_t kDefaultStackSize = 0; |
57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | 156 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
58 thread_priority); | 157 thread_priority); |
59 } | 158 } |
60 | 159 |
61 void SchedulerWorkerThread::ThreadMain() { | 160 void SchedulerWorkerThread::ThreadMain() { |
62 delegate_->OnMainEntry(); | 161 delegate_->OnMainEntry(); |
162 tls_current_worker_thread.Get().Set(this); | |
63 | 163 |
64 // A SchedulerWorkerThread starts out sleeping. | 164 // A SchedulerWorkerThread starts out sleeping. |
65 wake_up_event_.Wait(); | 165 wake_up_event_.Wait(); |
66 | 166 |
67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { | 167 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
68 // Get the sequence containing the next task to execute. | 168 // Get the sequence containing the next task to execute. |
69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); | 169 bool is_single_threaded_sequence = false; |
170 scoped_refptr<Sequence> sequence = delegate_->GetWork( | |
171 this, &single_threaded_priority_queue_, &is_single_threaded_sequence); | |
danakj
2016/04/20 23:16:16
Or maybe you can explain some of the design choice
fdoray
2016/04/21 19:31:28
1. SchedulerThreadPool could create the TaskRunner
| |
70 | 172 |
71 if (!sequence) { | 173 if (!sequence) { |
72 wake_up_event_.Wait(); | 174 wake_up_event_.Wait(); |
73 continue; | 175 continue; |
74 } | 176 } |
75 | 177 |
76 task_tracker_->RunTask(sequence->PeekTask()); | 178 task_tracker_->RunTask(sequence->PeekTask()); |
77 | 179 |
78 const bool sequence_became_empty = sequence->PopTask(); | 180 const bool sequence_became_empty = sequence->PopTask(); |
79 | 181 |
80 // If |sequence| isn't empty immediately after the pop, enqueue it to | 182 // If |sequence| isn't empty immediately after the pop, enqueue it to |
81 // maintain the invariant that a non-empty Sequence is always referenced by | 183 // maintain the invariant that a non-empty Sequence is always referenced by |
82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and | 184 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and |
83 // there are live references to it, it will be enqueued when a Task is added | 185 // there are live references to it, it will be enqueued when a Task is added |
84 // to it. Otherwise, it will be destroyed at the end of this scope. | 186 // to it. Otherwise, it will be destroyed at the end of this scope. |
85 if (!sequence_became_empty) | 187 if (!sequence_became_empty) { |
86 delegate_->EnqueueSequence(std::move(sequence)); | 188 if (is_single_threaded_sequence) { |
189 const auto sort_key = sequence->GetSortKey(); | |
190 single_threaded_priority_queue_.BeginTransaction()->Push( | |
191 WrapUnique(new PriorityQueue::SequenceAndSortKey( | |
192 std::move(sequence), sort_key))); | |
193 } else { | |
194 delegate_->EnqueueSequence(std::move(sequence)); | |
195 } | |
196 } | |
87 | 197 |
88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run | 198 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run |
89 // Tasks from Sequences returned by the GetWork() method of |delegate_| | 199 // Tasks from Sequences returned by the GetWork() method of |delegate_| |
90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | 200 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break |
91 // this invariant and avoids a useless loop iteration before going to sleep | 201 // this invariant and avoids a useless loop iteration before going to sleep |
92 // if WakeUp() is called while this SchedulerWorkerThread is awake. | 202 // if WakeUp() is called while this SchedulerWorkerThread is awake. |
93 wake_up_event_.Reset(); | 203 wake_up_event_.Reset(); |
94 } | 204 } |
95 } | 205 } |
96 | 206 |
97 bool SchedulerWorkerThread::ShouldExitForTesting() const { | 207 bool SchedulerWorkerThread::ShouldExitForTesting() const { |
98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 208 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
99 return should_exit_for_testing_; | 209 return should_exit_for_testing_; |
100 } | 210 } |
101 | 211 |
102 } // namespace internal | 212 } // namespace internal |
103 } // namespace base | 213 } // namespace base |
OLD | NEW |