Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_thread.h" | 5 #include "base/task_scheduler/scheduler_worker_thread.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/lazy_instance.h" | |
| 11 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/memory/ptr_util.h" | |
| 12 #include "base/task_scheduler/task_tracker.h" | 14 #include "base/task_scheduler/task_tracker.h" |
| 15 #include "base/task_scheduler/utils.h" | |
| 16 #include "base/threading/thread_local.h" | |
| 17 #include "base/time/time.h" | |
| 13 | 18 |
| 14 namespace base { | 19 namespace base { |
| 15 namespace internal { | 20 namespace internal { |
| 16 | 21 |
| 22 namespace { | |
| 23 | |
| 24 // SchedulerWorkerThread that owns the current thread, if any. | |
| 25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky | |
| 26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; | |
| 27 | |
| 28 // A task runner that runs tasks on a single SchedulerWorkerThread. | |
| 29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | |
| 30 public: | |
| 31 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
| 32 SchedulerTaskExecutor* executor, | |
|
gab
2016/04/18 18:04:02
TODO on memory management for this one as well for
fdoray
2016/04/18 19:04:50
Done.
| |
| 33 TaskTracker* task_tracker, | |
| 34 DelayedTaskManager* delayed_task_manager) | |
| 35 : traits_(traits), | |
| 36 executor_(executor), | |
| 37 task_tracker_(task_tracker), | |
| 38 delayed_task_manager_(delayed_task_manager) {} | |
| 39 | |
| 40 // SingleThreadTaskRunner: | |
| 41 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
| 42 const Closure& closure, | |
| 43 TimeDelta delay) override { | |
| 44 // Post the task as part of |sequence|. | |
| 45 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, | |
| 46 executor_, task_tracker_, delayed_task_manager_); | |
| 47 } | |
| 48 | |
| 49 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
| 50 const Closure& closure, | |
| 51 base::TimeDelta delay) override { | |
| 52 // Tasks are never nested within the task scheduler. | |
| 53 return PostDelayedTask(from_here, closure, delay); | |
| 54 } | |
| 55 | |
| 56 bool RunsTasksOnCurrentThread() const override { | |
| 57 return tls_current_worker_thread.Get().Get() == executor_; | |
| 58 } | |
| 59 | |
| 60 private: | |
| 61 ~SchedulerSingleThreadTaskRunner() override = default; | |
| 62 | |
| 63 // Sequence for all Tasks posted through this TaskRunner. | |
| 64 const scoped_refptr<Sequence> sequence_ = new Sequence; | |
| 65 | |
| 66 const TaskTraits traits_; | |
| 67 SchedulerTaskExecutor* const executor_; | |
| 68 TaskTracker* const task_tracker_; | |
| 69 DelayedTaskManager* const delayed_task_manager_; | |
| 70 | |
| 71 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
| 72 }; | |
| 73 | |
| 74 } // namespace | |
| 75 | |
| 17 std::unique_ptr<SchedulerWorkerThread> | 76 std::unique_ptr<SchedulerWorkerThread> |
| 18 SchedulerWorkerThread::CreateSchedulerWorkerThread( | 77 SchedulerWorkerThread::CreateSchedulerWorkerThread( |
| 19 ThreadPriority thread_priority, | 78 ThreadPriority thread_priority, |
| 20 Delegate* delegate, | 79 Delegate* delegate, |
| 21 TaskTracker* task_tracker) { | 80 TaskTracker* task_tracker, |
| 81 DelayedTaskManager* delayed_task_manager, | |
| 82 const PriorityQueue* predecessor_priority_queue) { | |
| 22 std::unique_ptr<SchedulerWorkerThread> worker_thread( | 83 std::unique_ptr<SchedulerWorkerThread> worker_thread( |
| 23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); | 84 new SchedulerWorkerThread(thread_priority, delegate, task_tracker, |
| 85 delayed_task_manager, | |
| 86 predecessor_priority_queue)); | |
| 24 | 87 |
| 25 if (worker_thread->thread_handle_.is_null()) | 88 if (worker_thread->thread_handle_.is_null()) |
| 26 return nullptr; | 89 return nullptr; |
| 27 return worker_thread; | 90 return worker_thread; |
| 28 } | 91 } |
| 29 | 92 |
| 30 SchedulerWorkerThread::~SchedulerWorkerThread() { | 93 SchedulerWorkerThread::~SchedulerWorkerThread() { |
| 31 DCHECK(ShouldExitForTesting()); | 94 DCHECK(ShouldExitForTesting()); |
| 32 } | 95 } |
| 33 | 96 |
| 97 scoped_refptr<SingleThreadTaskRunner> | |
| 98 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) { | |
| 99 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | |
| 100 traits, this, task_tracker_, delayed_task_manager_)); | |
| 101 } | |
| 102 | |
| 34 void SchedulerWorkerThread::WakeUp() { | 103 void SchedulerWorkerThread::WakeUp() { |
| 35 wake_up_event_.Signal(); | 104 wake_up_event_.Signal(); |
| 36 } | 105 } |
| 37 | 106 |
| 38 void SchedulerWorkerThread::JoinForTesting() { | 107 void SchedulerWorkerThread::JoinForTesting() { |
| 39 { | 108 { |
| 40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 109 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
| 41 should_exit_for_testing_ = true; | 110 should_exit_for_testing_ = true; |
| 42 } | 111 } |
| 43 WakeUp(); | 112 WakeUp(); |
| 44 PlatformThread::Join(thread_handle_); | 113 PlatformThread::Join(thread_handle_); |
| 45 } | 114 } |
| 46 | 115 |
| 47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, | 116 void SchedulerWorkerThread::PostTaskWithSequence( |
| 48 Delegate* delegate, | 117 std::unique_ptr<Task> task, |
| 49 TaskTracker* task_tracker) | 118 scoped_refptr<Sequence> sequence) { |
| 119 DCHECK(task); | |
| 120 DCHECK(sequence); | |
| 121 | |
| 122 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( | |
| 123 std::move(task), std::move(sequence), &single_threaded_priority_queue_); | |
| 124 | |
| 125 // If |sequence| wasn't empty before |task| was inserted into it, the worker | |
| 126 // thread has already been woken up to run it. | |
|
gab
2016/04/18 18:04:02
How about s/run/process/ (at first I thought "it"
fdoray
2016/04/18 19:04:50
Done.
| |
| 127 if (sequence_was_empty) | |
| 128 WakeUp(); | |
|
gab
2016/04/18 18:04:02
Add a TODO to check for removal from idle stack (a
fdoray
2016/04/18 19:04:50
Done.
| |
| 129 } | |
| 130 | |
| 131 SchedulerWorkerThread::SchedulerWorkerThread( | |
| 132 ThreadPriority thread_priority, | |
| 133 Delegate* delegate, | |
| 134 TaskTracker* task_tracker, | |
| 135 DelayedTaskManager* delayed_task_manager, | |
| 136 const PriorityQueue* predecessor_priority_queue) | |
| 50 : wake_up_event_(false, false), | 137 : wake_up_event_(false, false), |
| 138 single_threaded_priority_queue_(predecessor_priority_queue), | |
| 51 delegate_(delegate), | 139 delegate_(delegate), |
| 52 task_tracker_(task_tracker) { | 140 task_tracker_(task_tracker), |
| 141 delayed_task_manager_(delayed_task_manager) { | |
| 53 DCHECK(delegate_); | 142 DCHECK(delegate_); |
| 54 DCHECK(task_tracker_); | 143 DCHECK(task_tracker_); |
| 144 DCHECK(delayed_task_manager_); | |
| 145 DCHECK(predecessor_priority_queue); | |
| 55 | 146 |
| 56 const size_t kDefaultStackSize = 0; | 147 const size_t kDefaultStackSize = 0; |
| 57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | 148 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
| 58 thread_priority); | 149 thread_priority); |
| 59 } | 150 } |
| 60 | 151 |
| 61 void SchedulerWorkerThread::ThreadMain() { | 152 void SchedulerWorkerThread::ThreadMain() { |
| 62 delegate_->OnMainEntry(); | 153 delegate_->OnMainEntry(); |
| 154 tls_current_worker_thread.Get().Set(this); | |
| 63 | 155 |
| 64 // A SchedulerWorkerThread starts out sleeping. | 156 // A SchedulerWorkerThread starts out sleeping. |
| 65 wake_up_event_.Wait(); | 157 wake_up_event_.Wait(); |
| 66 | 158 |
| 67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { | 159 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
| 68 // Get the sequence containing the next task to execute. | 160 // Get the sequence containing the next task to execute. |
| 69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); | 161 bool is_single_threaded_sequence = false; |
| 162 scoped_refptr<Sequence> sequence = delegate_->GetWork( | |
| 163 this, &single_threaded_priority_queue_, &is_single_threaded_sequence); | |
| 70 | 164 |
| 71 if (!sequence) { | 165 if (!sequence) { |
| 72 wake_up_event_.Wait(); | 166 wake_up_event_.Wait(); |
| 73 continue; | 167 continue; |
| 74 } | 168 } |
| 75 | 169 |
| 76 task_tracker_->RunTask(sequence->PeekTask()); | 170 task_tracker_->RunTask(sequence->PeekTask()); |
| 77 | 171 |
| 78 const bool sequence_became_empty = sequence->PopTask(); | 172 const bool sequence_became_empty = sequence->PopTask(); |
| 79 | 173 |
| 80 // If |sequence| isn't empty immediately after the pop, enqueue it to | 174 // If |sequence| isn't empty immediately after the pop, enqueue it to |
| 81 // maintain the invariant that a non-empty Sequence is always referenced by | 175 // maintain the invariant that a non-empty Sequence is always referenced by |
| 82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and | 176 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and |
| 83 // there are live references to it, it will be enqueued when a Task is added | 177 // there are live references to it, it will be enqueued when a Task is added |
| 84 // to it. Otherwise, it will be destroyed at the end of this scope. | 178 // to it. Otherwise, it will be destroyed at the end of this scope. |
| 85 if (!sequence_became_empty) | 179 if (!sequence_became_empty) { |
| 86 delegate_->EnqueueSequence(std::move(sequence)); | 180 if (is_single_threaded_sequence) { |
| 181 const auto sort_key = sequence->GetSortKey(); | |
| 182 single_threaded_priority_queue_.BeginTransaction()->Push( | |
| 183 WrapUnique(new PriorityQueue::SequenceAndSortKey( | |
| 184 std::move(sequence), sort_key))); | |
| 185 } else { | |
| 186 delegate_->EnqueueSequence(std::move(sequence)); | |
| 187 } | |
| 188 } | |
| 87 | 189 |
| 88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run | 190 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run |
| 89 // Tasks from Sequences returned by the GetWork() method of |delegate_| | 191 // Tasks from Sequences returned by the GetWork() method of |delegate_| |
| 90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | 192 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break |
| 91 // this invariant and avoids a useless loop iteration before going to sleep | 193 // this invariant and avoids a useless loop iteration before going to sleep |
| 92 // if WakeUp() is called while this SchedulerWorkerThread is awake. | 194 // if WakeUp() is called while this SchedulerWorkerThread is awake. |
| 93 wake_up_event_.Reset(); | 195 wake_up_event_.Reset(); |
| 94 } | 196 } |
| 95 } | 197 } |
| 96 | 198 |
| 97 bool SchedulerWorkerThread::ShouldExitForTesting() const { | 199 bool SchedulerWorkerThread::ShouldExitForTesting() const { |
| 98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 200 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
| 99 return should_exit_for_testing_; | 201 return should_exit_for_testing_; |
| 100 } | 202 } |
| 101 | 203 |
| 102 } // namespace internal | 204 } // namespace internal |
| 103 } // namespace base | 205 } // namespace base |
| OLD | NEW |