Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: base/task_scheduler/scheduler_worker_thread.cc

Issue 1876363004: TaskScheduler [11] Support ExecutionMode::SINGLE_THREADED. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@8_delayed
Patch Set: self review Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_thread.h" 5 #include "base/task_scheduler/scheduler_worker_thread.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/lazy_instance.h"
11 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/memory/ptr_util.h"
12 #include "base/task_scheduler/task_tracker.h" 14 #include "base/task_scheduler/task_tracker.h"
15 #include "base/task_scheduler/utils.h"
16 #include "base/threading/thread_local.h"
17 #include "base/time/time.h"
13 18
14 namespace base { 19 namespace base {
15 namespace internal { 20 namespace internal {
16 21
22 namespace {
23
24 // SchedulerWorkerThread that owns the current thread, if any.
25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
27
28 // A task runner that runs tasks on a single SchedulerWorkerThread.
29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
30 public:
31 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
32 // tasks so long as |executor| is alive.
33 // TODO(robliao): Find a concrete way to manage |executor|'s memory.
34 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
35 SchedulerTaskExecutor* executor,
36 TaskTracker* task_tracker,
37 DelayedTaskManager* delayed_task_manager)
38 : traits_(traits),
39 executor_(executor),
40 task_tracker_(task_tracker),
41 delayed_task_manager_(delayed_task_manager) {}
42
43 // SingleThreadTaskRunner:
44 bool PostDelayedTask(const tracked_objects::Location& from_here,
45 const Closure& closure,
46 TimeDelta delay) override {
47 // Post the task as part of |sequence|.
48 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_,
49 executor_, task_tracker_, delayed_task_manager_);
50 }
51
52 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
53 const Closure& closure,
54 base::TimeDelta delay) override {
55 // Tasks are never nested within the task scheduler.
56 return PostDelayedTask(from_here, closure, delay);
57 }
58
59 bool RunsTasksOnCurrentThread() const override {
60 return tls_current_worker_thread.Get().Get() == executor_;
61 }
62
63 private:
64 ~SchedulerSingleThreadTaskRunner() override = default;
65
66 // Sequence for all Tasks posted through this TaskRunner.
67 const scoped_refptr<Sequence> sequence_ = new Sequence;
68
69 const TaskTraits traits_;
70 SchedulerTaskExecutor* const executor_;
71 TaskTracker* const task_tracker_;
72 DelayedTaskManager* const delayed_task_manager_;
73
74 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
75 };
76
77 } // namespace
78
17 std::unique_ptr<SchedulerWorkerThread> 79 std::unique_ptr<SchedulerWorkerThread>
18 SchedulerWorkerThread::CreateSchedulerWorkerThread( 80 SchedulerWorkerThread::CreateSchedulerWorkerThread(
19 ThreadPriority thread_priority, 81 ThreadPriority thread_priority,
20 Delegate* delegate, 82 Delegate* delegate,
21 TaskTracker* task_tracker) { 83 TaskTracker* task_tracker,
84 DelayedTaskManager* delayed_task_manager,
85 const PriorityQueue* predecessor_priority_queue) {
22 std::unique_ptr<SchedulerWorkerThread> worker_thread( 86 std::unique_ptr<SchedulerWorkerThread> worker_thread(
23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); 87 new SchedulerWorkerThread(thread_priority, delegate, task_tracker,
88 delayed_task_manager,
89 predecessor_priority_queue));
24 90
25 if (worker_thread->thread_handle_.is_null()) 91 if (worker_thread->thread_handle_.is_null())
26 return nullptr; 92 return nullptr;
27 return worker_thread; 93 return worker_thread;
28 } 94 }
29 95
30 SchedulerWorkerThread::~SchedulerWorkerThread() { 96 SchedulerWorkerThread::~SchedulerWorkerThread() {
31 DCHECK(ShouldExitForTesting()); 97 DCHECK(ShouldExitForTesting());
32 } 98 }
33 99
100 scoped_refptr<SingleThreadTaskRunner>
101 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) {
102 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
103 traits, this, task_tracker_, delayed_task_manager_));
104 }
105
34 void SchedulerWorkerThread::WakeUp() { 106 void SchedulerWorkerThread::WakeUp() {
35 wake_up_event_.Signal(); 107 wake_up_event_.Signal();
36 } 108 }
37 109
38 void SchedulerWorkerThread::JoinForTesting() { 110 void SchedulerWorkerThread::JoinForTesting() {
39 { 111 {
40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); 112 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
41 should_exit_for_testing_ = true; 113 should_exit_for_testing_ = true;
42 } 114 }
43 WakeUp(); 115 WakeUp();
44 PlatformThread::Join(thread_handle_); 116 PlatformThread::Join(thread_handle_);
45 } 117 }
46 118
47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, 119 void SchedulerWorkerThread::PostTaskWithSequence(
48 Delegate* delegate, 120 std::unique_ptr<Task> task,
49 TaskTracker* task_tracker) 121 scoped_refptr<Sequence> sequence) {
122 DCHECK(task);
123 DCHECK(sequence);
124
125 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
126 std::move(task), std::move(sequence), &single_threaded_priority_queue_);
127
128 // If |sequence| wasn't empty before |task| was inserted into it, the worker
129 // thread has already been woken up to process it.
130 // TODO(fdoray): Remove the worker thread from the stack of idle threads of
131 // its parent thread pool when it is woken up to run single-threaded tasks.
132 if (sequence_was_empty)
133 WakeUp();
134 }
135
136 SchedulerWorkerThread::SchedulerWorkerThread(
137 ThreadPriority thread_priority,
138 Delegate* delegate,
139 TaskTracker* task_tracker,
140 DelayedTaskManager* delayed_task_manager,
141 const PriorityQueue* predecessor_priority_queue)
50 : wake_up_event_(false, false), 142 : wake_up_event_(false, false),
143 single_threaded_priority_queue_(predecessor_priority_queue),
51 delegate_(delegate), 144 delegate_(delegate),
52 task_tracker_(task_tracker) { 145 task_tracker_(task_tracker),
146 delayed_task_manager_(delayed_task_manager) {
53 DCHECK(delegate_); 147 DCHECK(delegate_);
54 DCHECK(task_tracker_); 148 DCHECK(task_tracker_);
149 DCHECK(delayed_task_manager_);
150 DCHECK(predecessor_priority_queue);
55 151
56 const size_t kDefaultStackSize = 0; 152 const size_t kDefaultStackSize = 0;
57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, 153 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
58 thread_priority); 154 thread_priority);
59 } 155 }
60 156
61 void SchedulerWorkerThread::ThreadMain() { 157 void SchedulerWorkerThread::ThreadMain() {
62 delegate_->OnMainEntry(); 158 delegate_->OnMainEntry();
159 tls_current_worker_thread.Get().Set(this);
63 160
64 // A SchedulerWorkerThread starts out sleeping. 161 // A SchedulerWorkerThread starts out sleeping.
65 wake_up_event_.Wait(); 162 wake_up_event_.Wait();
66 163
67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { 164 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) {
68 // Get the sequence containing the next task to execute. 165 // Get the sequence containing the next task to execute.
69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); 166 bool is_single_threaded_sequence = false;
167 scoped_refptr<Sequence> sequence = delegate_->GetWork(
168 this, &single_threaded_priority_queue_, &is_single_threaded_sequence);
70 169
71 if (!sequence) { 170 if (!sequence) {
72 wake_up_event_.Wait(); 171 wake_up_event_.Wait();
73 continue; 172 continue;
74 } 173 }
75 174
76 task_tracker_->RunTask(sequence->PeekTask()); 175 task_tracker_->RunTask(sequence->PeekTask());
77 176
78 const bool sequence_became_empty = sequence->PopTask(); 177 const bool sequence_became_empty = sequence->PopTask();
79 178
80 // If |sequence| isn't empty immediately after the pop, enqueue it to 179 // If |sequence| isn't empty immediately after the pop, enqueue it to
81 // maintain the invariant that a non-empty Sequence is always referenced by 180 // maintain the invariant that a non-empty Sequence is always referenced by
82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and 181 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and
83 // there are live references to it, it will be enqueued when a Task is added 182 // there are live references to it, it will be enqueued when a Task is added
84 // to it. Otherwise, it will be destroyed at the end of this scope. 183 // to it. Otherwise, it will be destroyed at the end of this scope.
85 if (!sequence_became_empty) 184 if (!sequence_became_empty) {
86 delegate_->EnqueueSequence(std::move(sequence)); 185 if (is_single_threaded_sequence) {
186 const auto sort_key = sequence->GetSortKey();
187 single_threaded_priority_queue_.BeginTransaction()->Push(
188 WrapUnique(new PriorityQueue::SequenceAndSortKey(
189 std::move(sequence), sort_key)));
190 } else {
191 delegate_->EnqueueSequence(std::move(sequence));
192 }
193 }
87 194
88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run 195 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run
89 // Tasks from Sequences returned by the GetWork() method of |delegate_| 196 // Tasks from Sequences returned by the GetWork() method of |delegate_|
90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break 197 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break
91 // this invariant and avoids a useless loop iteration before going to sleep 198 // this invariant and avoids a useless loop iteration before going to sleep
92 // if WakeUp() is called while this SchedulerWorkerThread is awake. 199 // if WakeUp() is called while this SchedulerWorkerThread is awake.
93 wake_up_event_.Reset(); 200 wake_up_event_.Reset();
94 } 201 }
95 } 202 }
96 203
97 bool SchedulerWorkerThread::ShouldExitForTesting() const { 204 bool SchedulerWorkerThread::ShouldExitForTesting() const {
98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); 205 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
99 return should_exit_for_testing_; 206 return should_exit_for_testing_;
100 } 207 }
101 208
102 } // namespace internal 209 } // namespace internal
103 } // namespace base 210 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698