Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(83)

Side by Side Diff: base/task_scheduler/scheduler_worker_thread.cc

Issue 1876363004: TaskScheduler [11] Support ExecutionMode::SINGLE_THREADED. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@8_delayed
Patch Set: self review (remove unused headers) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_thread.h" 5 #include "base/task_scheduler/scheduler_worker_thread.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/lazy_instance.h"
11 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/memory/ptr_util.h"
12 #include "base/task_scheduler/task_tracker.h" 14 #include "base/task_scheduler/task_tracker.h"
15 #include "base/task_scheduler/utils.h"
16 #include "base/threading/thread_local.h"
17 #include "base/time/time.h"
13 18
14 namespace base { 19 namespace base {
15 namespace internal { 20 namespace internal {
16 21
22 namespace {
23
24 // SchedulerWorkerThread that owns the current thread, if any.
25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
27
28 // A task runner that runs tasks on a single SchedulerWorkerThread.
29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
30 public:
31 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
32 SchedulerTaskExecutor* executor,
gab 2016/04/18 18:04:02 TODO on memory management for this one as well for
fdoray 2016/04/18 19:04:50 Done.
33 TaskTracker* task_tracker,
34 DelayedTaskManager* delayed_task_manager)
35 : traits_(traits),
36 executor_(executor),
37 task_tracker_(task_tracker),
38 delayed_task_manager_(delayed_task_manager) {}
39
40 // SingleThreadTaskRunner:
41 bool PostDelayedTask(const tracked_objects::Location& from_here,
42 const Closure& closure,
43 TimeDelta delay) override {
44 // Post the task as part of |sequence|.
45 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_,
46 executor_, task_tracker_, delayed_task_manager_);
47 }
48
49 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
50 const Closure& closure,
51 base::TimeDelta delay) override {
52 // Tasks are never nested within the task scheduler.
53 return PostDelayedTask(from_here, closure, delay);
54 }
55
56 bool RunsTasksOnCurrentThread() const override {
57 return tls_current_worker_thread.Get().Get() == executor_;
58 }
59
60 private:
61 ~SchedulerSingleThreadTaskRunner() override = default;
62
63 // Sequence for all Tasks posted through this TaskRunner.
64 const scoped_refptr<Sequence> sequence_ = new Sequence;
65
66 const TaskTraits traits_;
67 SchedulerTaskExecutor* const executor_;
68 TaskTracker* const task_tracker_;
69 DelayedTaskManager* const delayed_task_manager_;
70
71 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
72 };
73
74 } // namespace
75
17 std::unique_ptr<SchedulerWorkerThread> 76 std::unique_ptr<SchedulerWorkerThread>
18 SchedulerWorkerThread::CreateSchedulerWorkerThread( 77 SchedulerWorkerThread::CreateSchedulerWorkerThread(
19 ThreadPriority thread_priority, 78 ThreadPriority thread_priority,
20 Delegate* delegate, 79 Delegate* delegate,
21 TaskTracker* task_tracker) { 80 TaskTracker* task_tracker,
81 DelayedTaskManager* delayed_task_manager,
82 const PriorityQueue* predecessor_priority_queue) {
22 std::unique_ptr<SchedulerWorkerThread> worker_thread( 83 std::unique_ptr<SchedulerWorkerThread> worker_thread(
23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); 84 new SchedulerWorkerThread(thread_priority, delegate, task_tracker,
85 delayed_task_manager,
86 predecessor_priority_queue));
24 87
25 if (worker_thread->thread_handle_.is_null()) 88 if (worker_thread->thread_handle_.is_null())
26 return nullptr; 89 return nullptr;
27 return worker_thread; 90 return worker_thread;
28 } 91 }
29 92
30 SchedulerWorkerThread::~SchedulerWorkerThread() { 93 SchedulerWorkerThread::~SchedulerWorkerThread() {
31 DCHECK(ShouldExitForTesting()); 94 DCHECK(ShouldExitForTesting());
32 } 95 }
33 96
97 scoped_refptr<SingleThreadTaskRunner>
98 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) {
99 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
100 traits, this, task_tracker_, delayed_task_manager_));
101 }
102
34 void SchedulerWorkerThread::WakeUp() { 103 void SchedulerWorkerThread::WakeUp() {
35 wake_up_event_.Signal(); 104 wake_up_event_.Signal();
36 } 105 }
37 106
38 void SchedulerWorkerThread::JoinForTesting() { 107 void SchedulerWorkerThread::JoinForTesting() {
39 { 108 {
40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); 109 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
41 should_exit_for_testing_ = true; 110 should_exit_for_testing_ = true;
42 } 111 }
43 WakeUp(); 112 WakeUp();
44 PlatformThread::Join(thread_handle_); 113 PlatformThread::Join(thread_handle_);
45 } 114 }
46 115
47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, 116 void SchedulerWorkerThread::PostTaskWithSequence(
48 Delegate* delegate, 117 std::unique_ptr<Task> task,
49 TaskTracker* task_tracker) 118 scoped_refptr<Sequence> sequence) {
119 DCHECK(task);
120 DCHECK(sequence);
121
122 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
123 std::move(task), std::move(sequence), &single_threaded_priority_queue_);
124
125 // If |sequence| wasn't empty before |task| was inserted into it, the worker
126 // thread has already been woken up to run it.
gab 2016/04/18 18:04:02 How about s/run/process/ (at first I thought "it"
fdoray 2016/04/18 19:04:50 Done.
127 if (sequence_was_empty)
128 WakeUp();
gab 2016/04/18 18:04:02 Add a TODO to check for removal from idle stack (a
fdoray 2016/04/18 19:04:50 Done.
129 }
130
131 SchedulerWorkerThread::SchedulerWorkerThread(
132 ThreadPriority thread_priority,
133 Delegate* delegate,
134 TaskTracker* task_tracker,
135 DelayedTaskManager* delayed_task_manager,
136 const PriorityQueue* predecessor_priority_queue)
50 : wake_up_event_(false, false), 137 : wake_up_event_(false, false),
138 single_threaded_priority_queue_(predecessor_priority_queue),
51 delegate_(delegate), 139 delegate_(delegate),
52 task_tracker_(task_tracker) { 140 task_tracker_(task_tracker),
141 delayed_task_manager_(delayed_task_manager) {
53 DCHECK(delegate_); 142 DCHECK(delegate_);
54 DCHECK(task_tracker_); 143 DCHECK(task_tracker_);
144 DCHECK(delayed_task_manager_);
145 DCHECK(predecessor_priority_queue);
55 146
56 const size_t kDefaultStackSize = 0; 147 const size_t kDefaultStackSize = 0;
57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, 148 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
58 thread_priority); 149 thread_priority);
59 } 150 }
60 151
61 void SchedulerWorkerThread::ThreadMain() { 152 void SchedulerWorkerThread::ThreadMain() {
62 delegate_->OnMainEntry(); 153 delegate_->OnMainEntry();
154 tls_current_worker_thread.Get().Set(this);
63 155
64 // A SchedulerWorkerThread starts out sleeping. 156 // A SchedulerWorkerThread starts out sleeping.
65 wake_up_event_.Wait(); 157 wake_up_event_.Wait();
66 158
67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { 159 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) {
68 // Get the sequence containing the next task to execute. 160 // Get the sequence containing the next task to execute.
69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); 161 bool is_single_threaded_sequence = false;
162 scoped_refptr<Sequence> sequence = delegate_->GetWork(
163 this, &single_threaded_priority_queue_, &is_single_threaded_sequence);
70 164
71 if (!sequence) { 165 if (!sequence) {
72 wake_up_event_.Wait(); 166 wake_up_event_.Wait();
73 continue; 167 continue;
74 } 168 }
75 169
76 task_tracker_->RunTask(sequence->PeekTask()); 170 task_tracker_->RunTask(sequence->PeekTask());
77 171
78 const bool sequence_became_empty = sequence->PopTask(); 172 const bool sequence_became_empty = sequence->PopTask();
79 173
80 // If |sequence| isn't empty immediately after the pop, enqueue it to 174 // If |sequence| isn't empty immediately after the pop, enqueue it to
81 // maintain the invariant that a non-empty Sequence is always referenced by 175 // maintain the invariant that a non-empty Sequence is always referenced by
82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and 176 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and
83 // there are live references to it, it will be enqueued when a Task is added 177 // there are live references to it, it will be enqueued when a Task is added
84 // to it. Otherwise, it will be destroyed at the end of this scope. 178 // to it. Otherwise, it will be destroyed at the end of this scope.
85 if (!sequence_became_empty) 179 if (!sequence_became_empty) {
86 delegate_->EnqueueSequence(std::move(sequence)); 180 if (is_single_threaded_sequence) {
181 const auto sort_key = sequence->GetSortKey();
182 single_threaded_priority_queue_.BeginTransaction()->Push(
183 WrapUnique(new PriorityQueue::SequenceAndSortKey(
184 std::move(sequence), sort_key)));
185 } else {
186 delegate_->EnqueueSequence(std::move(sequence));
187 }
188 }
87 189
88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run 190 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run
89 // Tasks from Sequences returned by the GetWork() method of |delegate_| 191 // Tasks from Sequences returned by the GetWork() method of |delegate_|
90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break 192 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break
91 // this invariant and avoids a useless loop iteration before going to sleep 193 // this invariant and avoids a useless loop iteration before going to sleep
92 // if WakeUp() is called while this SchedulerWorkerThread is awake. 194 // if WakeUp() is called while this SchedulerWorkerThread is awake.
93 wake_up_event_.Reset(); 195 wake_up_event_.Reset();
94 } 196 }
95 } 197 }
96 198
97 bool SchedulerWorkerThread::ShouldExitForTesting() const { 199 bool SchedulerWorkerThread::ShouldExitForTesting() const {
98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); 200 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
99 return should_exit_for_testing_; 201 return should_exit_for_testing_;
100 } 202 }
101 203
102 } // namespace internal 204 } // namespace internal
103 } // namespace base 205 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698