Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(314)

Side by Side Diff: base/task_scheduler/scheduler_worker_thread.cc

Issue 1876363004: TaskScheduler [11] Support ExecutionMode::SINGLE_THREADED. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@8_delayed
Patch Set: rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_thread.h" 5 #include "base/task_scheduler/scheduler_worker_thread.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/lazy_instance.h"
11 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/memory/ptr_util.h"
12 #include "base/task_scheduler/task_tracker.h" 14 #include "base/task_scheduler/task_tracker.h"
15 #include "base/task_scheduler/utils.h"
16 #include "base/threading/thread_local.h"
17 #include "base/time/time.h"
13 18
14 namespace base { 19 namespace base {
15 namespace internal { 20 namespace internal {
16 21
22 namespace {
23
24 // SchedulerWorkerThread that owns the current thread, if any.
25 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
26 tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
27
28 // A task runner that runs tasks on a single SchedulerWorkerThread.
29 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
30 public:
31 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
32 // tasks so long as |executor| is alive.
33 // TODO(robliao): Find a concrete way to manage |executor|'s memory.
34 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
35 SchedulerTaskExecutor* executor,
36 TaskTracker* task_tracker,
37 DelayedTaskManager* delayed_task_manager)
38 : traits_(traits),
39 executor_(executor),
40 task_tracker_(task_tracker),
41 delayed_task_manager_(delayed_task_manager) {}
42
43 // SingleThreadTaskRunner:
44 bool PostDelayedTask(const tracked_objects::Location& from_here,
45 const Closure& closure,
46 TimeDelta delay) override {
47 // Post the task as part of |sequence_|.
48 return PostTaskToExecutor(
49 WrapUnique(
50 new Task(from_here, closure, traits_,
51 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
danakj 2016/04/20 23:16:16 nit: maybe this conversion from delta to ticks cou
fdoray 2016/04/21 19:31:28 We decided to construct the Task outside of PostTa
52 sequence_, executor_, task_tracker_, delayed_task_manager_);
53 }
54
55 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
56 const Closure& closure,
57 base::TimeDelta delay) override {
58 // Tasks are never nested within the task scheduler.
59 return PostDelayedTask(from_here, closure, delay);
60 }
61
62 bool RunsTasksOnCurrentThread() const override {
63 return tls_current_worker_thread.Get().Get() == executor_;
64 }
65
66 private:
67 ~SchedulerSingleThreadTaskRunner() override = default;
68
69 // Sequence for all Tasks posted through this TaskRunner.
70 const scoped_refptr<Sequence> sequence_ = new Sequence;
71
72 const TaskTraits traits_;
73 SchedulerTaskExecutor* const executor_;
74 TaskTracker* const task_tracker_;
75 DelayedTaskManager* const delayed_task_manager_;
76
77 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
78 };
79
80 } // namespace
81
17 std::unique_ptr<SchedulerWorkerThread> 82 std::unique_ptr<SchedulerWorkerThread>
18 SchedulerWorkerThread::CreateSchedulerWorkerThread( 83 SchedulerWorkerThread::CreateSchedulerWorkerThread(
19 ThreadPriority thread_priority, 84 ThreadPriority thread_priority,
20 Delegate* delegate, 85 Delegate* delegate,
21 TaskTracker* task_tracker) { 86 TaskTracker* task_tracker,
87 DelayedTaskManager* delayed_task_manager,
88 const PriorityQueue* predecessor_priority_queue) {
22 std::unique_ptr<SchedulerWorkerThread> worker_thread( 89 std::unique_ptr<SchedulerWorkerThread> worker_thread(
23 new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); 90 new SchedulerWorkerThread(thread_priority, delegate, task_tracker,
91 delayed_task_manager,
92 predecessor_priority_queue));
24 93
25 if (worker_thread->thread_handle_.is_null()) 94 if (worker_thread->thread_handle_.is_null())
26 return nullptr; 95 return nullptr;
27 return worker_thread; 96 return worker_thread;
28 } 97 }
29 98
30 SchedulerWorkerThread::~SchedulerWorkerThread() { 99 SchedulerWorkerThread::~SchedulerWorkerThread() {
31 DCHECK(ShouldExitForTesting()); 100 DCHECK(ShouldExitForTesting());
32 } 101 }
33 102
103 scoped_refptr<SingleThreadTaskRunner>
104 SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) {
105 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
106 traits, this, task_tracker_, delayed_task_manager_));
107 }
108
34 void SchedulerWorkerThread::WakeUp() { 109 void SchedulerWorkerThread::WakeUp() {
35 wake_up_event_.Signal(); 110 wake_up_event_.Signal();
36 } 111 }
37 112
38 void SchedulerWorkerThread::JoinForTesting() { 113 void SchedulerWorkerThread::JoinForTesting() {
39 { 114 {
40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); 115 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
41 should_exit_for_testing_ = true; 116 should_exit_for_testing_ = true;
42 } 117 }
43 WakeUp(); 118 WakeUp();
44 PlatformThread::Join(thread_handle_); 119 PlatformThread::Join(thread_handle_);
45 } 120 }
46 121
47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, 122 void SchedulerWorkerThread::PostTaskWithSequence(
48 Delegate* delegate, 123 std::unique_ptr<Task> task,
49 TaskTracker* task_tracker) 124 scoped_refptr<Sequence> sequence) {
125 DCHECK(task);
126 DCHECK(sequence);
127
128 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
129 std::move(task), std::move(sequence), &single_threaded_priority_queue_);
130
131 // If |sequence| wasn't empty before |task| was inserted into it, the worker
132 // thread has already been woken up to process it.
133 // TODO(fdoray): Remove the worker thread from the stack of idle threads of
134 // its parent thread pool when it is woken up to run single-threaded tasks.
135 if (sequence_was_empty)
136 WakeUp();
137 }
138
139 SchedulerWorkerThread::SchedulerWorkerThread(
140 ThreadPriority thread_priority,
141 Delegate* delegate,
142 TaskTracker* task_tracker,
143 DelayedTaskManager* delayed_task_manager,
144 const PriorityQueue* predecessor_priority_queue)
50 : wake_up_event_(false, false), 145 : wake_up_event_(false, false),
146 single_threaded_priority_queue_(predecessor_priority_queue),
51 delegate_(delegate), 147 delegate_(delegate),
52 task_tracker_(task_tracker) { 148 task_tracker_(task_tracker),
149 delayed_task_manager_(delayed_task_manager) {
53 DCHECK(delegate_); 150 DCHECK(delegate_);
54 DCHECK(task_tracker_); 151 DCHECK(task_tracker_);
152 DCHECK(delayed_task_manager_);
153 DCHECK(predecessor_priority_queue);
55 154
56 const size_t kDefaultStackSize = 0; 155 const size_t kDefaultStackSize = 0;
57 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, 156 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
58 thread_priority); 157 thread_priority);
59 } 158 }
60 159
61 void SchedulerWorkerThread::ThreadMain() { 160 void SchedulerWorkerThread::ThreadMain() {
62 delegate_->OnMainEntry(); 161 delegate_->OnMainEntry();
162 tls_current_worker_thread.Get().Set(this);
63 163
64 // A SchedulerWorkerThread starts out sleeping. 164 // A SchedulerWorkerThread starts out sleeping.
65 wake_up_event_.Wait(); 165 wake_up_event_.Wait();
66 166
67 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { 167 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) {
68 // Get the sequence containing the next task to execute. 168 // Get the sequence containing the next task to execute.
69 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); 169 bool is_single_threaded_sequence = false;
170 scoped_refptr<Sequence> sequence = delegate_->GetWork(
171 this, &single_threaded_priority_queue_, &is_single_threaded_sequence);
danakj 2016/04/20 23:16:16 Or maybe you can explain some of the design choice
fdoray 2016/04/21 19:31:28 1. SchedulerThreadPool could create the TaskRunner
70 172
71 if (!sequence) { 173 if (!sequence) {
72 wake_up_event_.Wait(); 174 wake_up_event_.Wait();
73 continue; 175 continue;
74 } 176 }
75 177
76 task_tracker_->RunTask(sequence->PeekTask()); 178 task_tracker_->RunTask(sequence->PeekTask());
77 179
78 const bool sequence_became_empty = sequence->PopTask(); 180 const bool sequence_became_empty = sequence->PopTask();
79 181
80 // If |sequence| isn't empty immediately after the pop, enqueue it to 182 // If |sequence| isn't empty immediately after the pop, enqueue it to
81 // maintain the invariant that a non-empty Sequence is always referenced by 183 // maintain the invariant that a non-empty Sequence is always referenced by
82 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and 184 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and
83 // there are live references to it, it will be enqueued when a Task is added 185 // there are live references to it, it will be enqueued when a Task is added
84 // to it. Otherwise, it will be destroyed at the end of this scope. 186 // to it. Otherwise, it will be destroyed at the end of this scope.
85 if (!sequence_became_empty) 187 if (!sequence_became_empty) {
86 delegate_->EnqueueSequence(std::move(sequence)); 188 if (is_single_threaded_sequence) {
189 const auto sort_key = sequence->GetSortKey();
190 single_threaded_priority_queue_.BeginTransaction()->Push(
191 WrapUnique(new PriorityQueue::SequenceAndSortKey(
192 std::move(sequence), sort_key)));
193 } else {
194 delegate_->EnqueueSequence(std::move(sequence));
195 }
196 }
87 197
88 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run 198 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run
89 // Tasks from Sequences returned by the GetWork() method of |delegate_| 199 // Tasks from Sequences returned by the GetWork() method of |delegate_|
90 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break 200 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break
91 // this invariant and avoids a useless loop iteration before going to sleep 201 // this invariant and avoids a useless loop iteration before going to sleep
92 // if WakeUp() is called while this SchedulerWorkerThread is awake. 202 // if WakeUp() is called while this SchedulerWorkerThread is awake.
93 wake_up_event_.Reset(); 203 wake_up_event_.Reset();
94 } 204 }
95 } 205 }
96 206
97 bool SchedulerWorkerThread::ShouldExitForTesting() const { 207 bool SchedulerWorkerThread::ShouldExitForTesting() const {
98 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); 208 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
99 return should_exit_for_testing_; 209 return should_exit_for_testing_;
100 } 210 }
101 211
102 } // namespace internal 212 } // namespace internal
103 } // namespace base 213 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698