Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_thread.h" | 5 #include "base/task_scheduler/scheduler_worker_thread.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/lazy_instance.h" | |
| 11 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/memory/ptr_util.h" | |
| 12 #include "base/task_scheduler/task_tracker.h" | 14 #include "base/task_scheduler/task_tracker.h" |
| 15 #include "base/task_scheduler/utils.h" | |
| 16 #include "base/threading/thread_local.h" | |
| 17 #include "base/time/time.h" | |
| 13 | 18 |
| 14 namespace base { | 19 namespace base { |
| 15 namespace internal { | 20 namespace internal { |
| 16 | 21 |
| 22 namespace { | |
| 23 | |
| 24 // SchedulerWorkerThread that owns the current thread, if any. | |
| 25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky | |
| 26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; | |
| 27 | |
| 28 // A task runner that runs tasks on a single SchedulerWorkerThread. | |
| 29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | |
| 30 public: | |
| 31 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | |
| 32 // tasks so long as |executor| is alive. | |
| 33 // TODO(robliao): Find a concrete way to manage |executor|'s memory. | |
| 34 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
| 35 SchedulerTaskExecutor* executor, | |
| 36 TaskTracker* task_tracker, | |
| 37 DelayedTaskManager* delayed_task_manager) | |
| 38 : traits_(traits), | |
| 39 executor_(executor), | |
| 40 task_tracker_(task_tracker), | |
| 41 delayed_task_manager_(delayed_task_manager) {} | |
| 42 | |
| 43 // SingleThreadTaskRunner: | |
| 44 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
| 45 const Closure& closure, | |
| 46 TimeDelta delay) override { | |
| 47 // Post the task as part of |sequence_|. | |
| 48 return PostTaskToExecutor( | |
| 49 WrapUnique( | |
| 50 new Task(from_here, closure, traits_, | |
| 51 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), | |
|
danakj
2016/04/20 23:16:16
nit: maybe this conversion from delta to ticks cou
fdoray
2016/04/21 19:31:28
We decided to construct the Task outside of PostTa
| |
| 52 sequence_, executor_, task_tracker_, delayed_task_manager_); | |
| 53 } | |
| 54 | |
| 55 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
| 56 const Closure& closure, | |
| 57 base::TimeDelta delay) override { | |
| 58 // Tasks are never nested within the task scheduler. | |
| 59 return PostDelayedTask(from_here, closure, delay); | |
| 60 } | |
| 61 | |
| 62 bool RunsTasksOnCurrentThread() const override { | |
| 63 return tls_current_worker_thread.Get().Get() == executor_; | |
| 64 } | |
| 65 | |
| 66 private: | |
| 67 ~SchedulerSingleThreadTaskRunner() override = default; | |
| 68 | |
| 69 // Sequence for all Tasks posted through this TaskRunner. | |
| 70 const scoped_refptr<Sequence> sequence_ = new Sequence; | |
| 71 | |
| 72 const TaskTraits traits_; | |
| 73 SchedulerTaskExecutor* const executor_; | |
| 74 TaskTracker* const task_tracker_; | |
| 75 DelayedTaskManager* const delayed_task_manager_; | |
| 76 | |
| 77 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
| 78 }; | |
| 79 | |
| 80 } // namespace | |
| 81 | |
| 17 std::unique_ptr<SchedulerWorkerThread> | 82 std::unique_ptr<SchedulerWorkerThread> |
| 18 SchedulerWorkerThread::CreateSchedulerWorkerThread( | 83 SchedulerWorkerThread::CreateSchedulerWorkerThread( |
| 19 ThreadPriority thread_priority, | 84 ThreadPriority thread_priority, |
| 20 Delegate* delegate, | 85 Delegate* delegate, |
| 21 TaskTracker* task_tracker) { | 86 TaskTracker* task_tracker, |
| 87 DelayedTaskManager* delayed_task_manager, | |
| 88 const PriorityQueue* predecessor_priority_queue) { | |
| 22 std::unique_ptr<SchedulerWorkerThread> worker_thread( | 89 std::unique_ptr<SchedulerWorkerThread> worker_thread( |
| 23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); | 90 new SchedulerWorkerThread(thread_priority, delegate, task_tracker, |
| 91 delayed_task_manager, | |
| 92 predecessor_priority_queue)); | |
| 24 | 93 |
| 25 if (worker_thread->thread_handle_.is_null()) | 94 if (worker_thread->thread_handle_.is_null()) |
| 26 return nullptr; | 95 return nullptr; |
| 27 return worker_thread; | 96 return worker_thread; |
| 28 } | 97 } |
| 29 | 98 |
| 30 SchedulerWorkerThread::~SchedulerWorkerThread() { | 99 SchedulerWorkerThread::~SchedulerWorkerThread() { |
| 31 DCHECK(ShouldExitForTesting()); | 100 DCHECK(ShouldExitForTesting()); |
| 32 } | 101 } |
| 33 | 102 |
| 103 scoped_refptr<SingleThreadTaskRunner> | |
| 104 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) { | |
| 105 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | |
| 106 traits, this, task_tracker_, delayed_task_manager_)); | |
| 107 } | |
| 108 | |
| 34 void SchedulerWorkerThread::WakeUp() { | 109 void SchedulerWorkerThread::WakeUp() { |
| 35 wake_up_event_.Signal(); | 110 wake_up_event_.Signal(); |
| 36 } | 111 } |
| 37 | 112 |
| 38 void SchedulerWorkerThread::JoinForTesting() { | 113 void SchedulerWorkerThread::JoinForTesting() { |
| 39 { | 114 { |
| 40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 115 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
| 41 should_exit_for_testing_ = true; | 116 should_exit_for_testing_ = true; |
| 42 } | 117 } |
| 43 WakeUp(); | 118 WakeUp(); |
| 44 PlatformThread::Join(thread_handle_); | 119 PlatformThread::Join(thread_handle_); |
| 45 } | 120 } |
| 46 | 121 |
| 47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, | 122 void SchedulerWorkerThread::PostTaskWithSequence( |
| 48 Delegate* delegate, | 123 std::unique_ptr<Task> task, |
| 49 TaskTracker* task_tracker) | 124 scoped_refptr<Sequence> sequence) { |
| 125 DCHECK(task); | |
| 126 DCHECK(sequence); | |
| 127 | |
| 128 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( | |
| 129 std::move(task), std::move(sequence), &single_threaded_priority_queue_); | |
| 130 | |
| 131 // If |sequence| wasn't empty before |task| was inserted into it, the worker | |
| 132 // thread has already been woken up to process it. | |
| 133 // TODO(fdoray): Remove the worker thread from the stack of idle threads of | |
| 134 // its parent thread pool when it is woken up to run single-threaded tasks. | |
| 135 if (sequence_was_empty) | |
| 136 WakeUp(); | |
| 137 } | |
| 138 | |
| 139 SchedulerWorkerThread::SchedulerWorkerThread( | |
| 140 ThreadPriority thread_priority, | |
| 141 Delegate* delegate, | |
| 142 TaskTracker* task_tracker, | |
| 143 DelayedTaskManager* delayed_task_manager, | |
| 144 const PriorityQueue* predecessor_priority_queue) | |
| 50 : wake_up_event_(false, false), | 145 : wake_up_event_(false, false), |
| 146 single_threaded_priority_queue_(predecessor_priority_queue), | |
| 51 delegate_(delegate), | 147 delegate_(delegate), |
| 52 task_tracker_(task_tracker) { | 148 task_tracker_(task_tracker), |
| 149 delayed_task_manager_(delayed_task_manager) { | |
| 53 DCHECK(delegate_); | 150 DCHECK(delegate_); |
| 54 DCHECK(task_tracker_); | 151 DCHECK(task_tracker_); |
| 152 DCHECK(delayed_task_manager_); | |
| 153 DCHECK(predecessor_priority_queue); | |
| 55 | 154 |
| 56 const size_t kDefaultStackSize = 0; | 155 const size_t kDefaultStackSize = 0; |
| 57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | 156 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
| 58 thread_priority); | 157 thread_priority); |
| 59 } | 158 } |
| 60 | 159 |
| 61 void SchedulerWorkerThread::ThreadMain() { | 160 void SchedulerWorkerThread::ThreadMain() { |
| 62 delegate_->OnMainEntry(); | 161 delegate_->OnMainEntry(); |
| 162 tls_current_worker_thread.Get().Set(this); | |
| 63 | 163 |
| 64 // A SchedulerWorkerThread starts out sleeping. | 164 // A SchedulerWorkerThread starts out sleeping. |
| 65 wake_up_event_.Wait(); | 165 wake_up_event_.Wait(); |
| 66 | 166 |
| 67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { | 167 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
| 68 // Get the sequence containing the next task to execute. | 168 // Get the sequence containing the next task to execute. |
| 69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); | 169 bool is_single_threaded_sequence = false; |
| 170 scoped_refptr<Sequence> sequence = delegate_->GetWork( | |
| 171 this, &single_threaded_priority_queue_, &is_single_threaded_sequence); | |
|
danakj
2016/04/20 23:16:16
Or maybe you can explain some of the design choice
fdoray
2016/04/21 19:31:28
1. SchedulerThreadPool could create the TaskRunner
| |
| 70 | 172 |
| 71 if (!sequence) { | 173 if (!sequence) { |
| 72 wake_up_event_.Wait(); | 174 wake_up_event_.Wait(); |
| 73 continue; | 175 continue; |
| 74 } | 176 } |
| 75 | 177 |
| 76 task_tracker_->RunTask(sequence->PeekTask()); | 178 task_tracker_->RunTask(sequence->PeekTask()); |
| 77 | 179 |
| 78 const bool sequence_became_empty = sequence->PopTask(); | 180 const bool sequence_became_empty = sequence->PopTask(); |
| 79 | 181 |
| 80 // If |sequence| isn't empty immediately after the pop, enqueue it to | 182 // If |sequence| isn't empty immediately after the pop, enqueue it to |
| 81 // maintain the invariant that a non-empty Sequence is always referenced by | 183 // maintain the invariant that a non-empty Sequence is always referenced by |
| 82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and | 184 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and |
| 83 // there are live references to it, it will be enqueued when a Task is added | 185 // there are live references to it, it will be enqueued when a Task is added |
| 84 // to it. Otherwise, it will be destroyed at the end of this scope. | 186 // to it. Otherwise, it will be destroyed at the end of this scope. |
| 85 if (!sequence_became_empty) | 187 if (!sequence_became_empty) { |
| 86 delegate_->EnqueueSequence(std::move(sequence)); | 188 if (is_single_threaded_sequence) { |
| 189 const auto sort_key = sequence->GetSortKey(); | |
| 190 single_threaded_priority_queue_.BeginTransaction()->Push( | |
| 191 WrapUnique(new PriorityQueue::SequenceAndSortKey( | |
| 192 std::move(sequence), sort_key))); | |
| 193 } else { | |
| 194 delegate_->EnqueueSequence(std::move(sequence)); | |
| 195 } | |
| 196 } | |
| 87 | 197 |
| 88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run | 198 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run |
| 89 // Tasks from Sequences returned by the GetWork() method of |delegate_| | 199 // Tasks from Sequences returned by the GetWork() method of |delegate_| |
| 90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | 200 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break |
| 91 // this invariant and avoids a useless loop iteration before going to sleep | 201 // this invariant and avoids a useless loop iteration before going to sleep |
| 92 // if WakeUp() is called while this SchedulerWorkerThread is awake. | 202 // if WakeUp() is called while this SchedulerWorkerThread is awake. |
| 93 wake_up_event_.Reset(); | 203 wake_up_event_.Reset(); |
| 94 } | 204 } |
| 95 } | 205 } |
| 96 | 206 |
| 97 bool SchedulerWorkerThread::ShouldExitForTesting() const { | 207 bool SchedulerWorkerThread::ShouldExitForTesting() const { |
| 98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 208 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
| 99 return should_exit_for_testing_; | 209 return should_exit_for_testing_; |
| 100 } | 210 } |
| 101 | 211 |
| 102 } // namespace internal | 212 } // namespace internal |
| 103 } // namespace base | 213 } // namespace base |
| OLD | NEW |