Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(189)

Side by Side Diff: base/threading/thread.cc

Issue 1086663002: [Approach 2] Accumulate tasks in a proxy task runner until thread starts. Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/threading/thread.h ('k') | base/threading/thread_id_name_manager_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/thread.h" 5 #include "base/threading/thread.h"
6 6
7 #include <queue>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/lazy_instance.h" 10 #include "base/lazy_instance.h"
9 #include "base/profiler/scoped_tracker.h" 11 #include "base/profiler/scoped_tracker.h"
12 #include "base/synchronization/lock.h"
10 #include "base/synchronization/waitable_event.h" 13 #include "base/synchronization/waitable_event.h"
11 #include "base/third_party/dynamic_annotations/dynamic_annotations.h" 14 #include "base/third_party/dynamic_annotations/dynamic_annotations.h"
12 #include "base/threading/thread_id_name_manager.h" 15 #include "base/threading/thread_id_name_manager.h"
13 #include "base/threading/thread_local.h" 16 #include "base/threading/thread_local.h"
14 #include "base/threading/thread_restrictions.h" 17 #include "base/threading/thread_restrictions.h"
15 18
16 #if defined(OS_WIN) 19 #if defined(OS_WIN)
17 #include "base/win/scoped_com_initializer.h" 20 #include "base/win/scoped_com_initializer.h"
18 #endif 21 #endif
19 22
20 namespace base { 23 namespace base {
21 24
22 namespace { 25 namespace {
23 26
24 // We use this thread-local variable to record whether or not a thread exited 27 // We use this thread-local variable to record whether or not a thread exited
25 // because its Stop method was called. This allows us to catch cases where 28 // because its Stop method was called. This allows us to catch cases where
26 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when 29 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when
27 // using a Thread to setup and run a MessageLoop. 30 // using a Thread to setup and run a MessageLoop.
28 base::LazyInstance<base::ThreadLocalBoolean> lazy_tls_bool = 31 base::LazyInstance<base::ThreadLocalBoolean> lazy_tls_bool =
29 LAZY_INSTANCE_INITIALIZER; 32 LAZY_INSTANCE_INITIALIZER;
30 33
31 } // namespace 34 } // namespace
32 35
33 // This is used to trigger the message loop to exit. 36 // This is used to trigger the message loop to exit.
34 void ThreadQuitHelper() { 37 void ThreadQuitHelper() {
35 MessageLoop::current()->QuitWhenIdle(); 38 MessageLoop::current()->QuitWhenIdle();
36 Thread::SetThreadWasQuitProperly(true); 39 Thread::SetThreadWasQuitProperly(true);
37 } 40 }
38 41
39 // Used to pass data to ThreadMain. This structure is allocated on the stack 42 // Accumulates incoming tasks until SetInternalTaskRunner() is called.
40 // from within StartWithOptions. 43 // Once the internal task runner is set it forwards the posted tasks to
41 struct Thread::StartupData { 44 // the internal one.
42 // We get away with a const reference here because of how we are allocated. 45 class Thread::ProxyTaskRunner : public SingleThreadTaskRunner {
43 const Thread::Options& options; 46 public:
47 ProxyTaskRunner() {}
44 48
45 // Used to synchronize thread startup. 49 void SetInternalTaskRunner(
46 WaitableEvent event; 50 const scoped_refptr<SingleThreadTaskRunner>& task_runner) {
51 AutoLock locker(lock_);
52 DCHECK(!internal_task_runner_);
53 // Naively repost all queued tasks to the internal one. Note that
54 // a task posted with X delay will run with X + Y delay if this method
55 // is called after Y.
56 internal_task_runner_ = task_runner;
57 while (!task_queue_.empty()) {
58 const Task& task = task_queue_.front();
59 if (task.nestable) {
60 internal_task_runner_->PostDelayedTask(
61 task.from_here, task.task, task.delay);
62 } else {
63 internal_task_runner_->PostNonNestableDelayedTask(
64 task.from_here, task.task, task.delay);
65 }
66 task_queue_.pop();
67 }
68 }
47 69
48 explicit StartupData(const Options& opt) 70 protected:
49 : options(opt), 71 ~ProxyTaskRunner() override {}
50 event(false, false) {} 72
73 // ProxyTaskRunner override:
74 bool PostDelayedTask(const tracked_objects::Location& from_here,
75 const Closure& task,
76 base::TimeDelta delay) override {
77 AutoLock locker(lock_);
78 DCHECK(!task.is_null()) << from_here.ToString();
79 if (internal_task_runner_)
80 return internal_task_runner_->PostDelayedTask(from_here, task, delay);
81 task_queue_.push(Task(from_here, task, delay, true));
82 return true;
83 }
84
85 bool RunsTasksOnCurrentThread() const override {
86 return internal_task_runner_
87 ? internal_task_runner_->RunsTasksOnCurrentThread()
88 : false;
89 }
90
91 // SequencedTaskRunner override:
92 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
93 const Closure& task,
94 base::TimeDelta delay) override {
95 AutoLock locker(lock_);
96 DCHECK(!task.is_null()) << from_here.ToString();
97 if (internal_task_runner_) {
98 return internal_task_runner_->PostNonNestableDelayedTask(
99 from_here, task, delay);
100 }
101 task_queue_.push(Task(from_here, task, delay, false));
102 return true;
103 }
104
105 private:
106 struct Task {
107 Task(const tracked_objects::Location& from_here,
108 const Closure& task,
109 base::TimeDelta delay,
110 bool nestable)
111 : from_here(from_here),
112 task(task),
113 delay(delay),
114 nestable(nestable) {}
115 Task() : nestable(false) {}
116 ~Task() {}
117
118 tracked_objects::Location from_here;
119 Closure task;
120 base::TimeDelta delay;
121 bool nestable;
122 };
123
124 base::Lock lock_;
125 std::queue<Task> task_queue_;
126 scoped_refptr<SingleThreadTaskRunner> internal_task_runner_;
127
128 DISALLOW_COPY_AND_ASSIGN(ProxyTaskRunner);
51 }; 129 };
52 130
53 Thread::Options::Options() 131 Thread::Options::Options()
54 : message_loop_type(MessageLoop::TYPE_DEFAULT), 132 : message_loop_type(MessageLoop::TYPE_DEFAULT),
55 timer_slack(TIMER_SLACK_NONE), 133 timer_slack(TIMER_SLACK_NONE),
56 stack_size(0) { 134 stack_size(0) {
57 } 135 }
58 136
59 Thread::Options::Options(MessageLoop::Type type, 137 Thread::Options::Options(MessageLoop::Type type,
60 size_t size) 138 size_t size)
61 : message_loop_type(type), 139 : message_loop_type(type),
62 timer_slack(TIMER_SLACK_NONE), 140 timer_slack(TIMER_SLACK_NONE),
63 stack_size(size) { 141 stack_size(size) {
64 } 142 }
65 143
66 Thread::Options::~Options() { 144 Thread::Options::~Options() {
67 } 145 }
68 146
69 Thread::Thread(const std::string& name) 147 Thread::Thread(const std::string& name)
70 : 148 :
71 #if defined(OS_WIN) 149 #if defined(OS_WIN)
72 com_status_(NONE), 150 com_status_(NONE),
73 #endif 151 #endif
74 started_(false),
75 stopping_(false), 152 stopping_(false),
76 running_(false), 153 running_(false),
77 startup_data_(NULL),
78 thread_(0), 154 thread_(0),
79 message_loop_(NULL), 155 message_loop_(nullptr),
80 thread_id_(kInvalidThreadId),
81 name_(name) { 156 name_(name) {
82 } 157 }
83 158
84 Thread::~Thread() { 159 Thread::~Thread() {
85 Stop(); 160 Stop();
86 } 161 }
87 162
88 bool Thread::Start() { 163 bool Thread::Start() {
89 Options options; 164 Options options;
90 #if defined(OS_WIN) 165 #if defined(OS_WIN)
91 if (com_status_ == STA) 166 if (com_status_ == STA)
92 options.message_loop_type = MessageLoop::TYPE_UI; 167 options.message_loop_type = MessageLoop::TYPE_UI;
93 #endif 168 #endif
94 return StartWithOptions(options); 169 return StartWithOptions(options);
95 } 170 }
96 171
97 bool Thread::StartWithOptions(const Options& options) { 172 bool Thread::StartWithOptions(const Options& options) {
98 DCHECK(!message_loop_); 173 DCHECK(!message_loop_);
99 #if defined(OS_WIN) 174 #if defined(OS_WIN)
100 DCHECK((com_status_ != STA) || 175 DCHECK((com_status_ != STA) ||
101 (options.message_loop_type == MessageLoop::TYPE_UI)); 176 (options.message_loop_type == MessageLoop::TYPE_UI));
102 #endif 177 #endif
103 178
104 SetThreadWasQuitProperly(false); 179 SetThreadWasQuitProperly(false);
105 180
106 StartupData startup_data(options); 181 task_runner_ = new ProxyTaskRunner();
107 startup_data_ = &startup_data; 182
183 startup_data_.reset(new Options(options));
184 start_event_.reset(new WaitableEvent(false, false));
108 185
109 if (!PlatformThread::Create(options.stack_size, this, &thread_)) { 186 if (!PlatformThread::Create(options.stack_size, this, &thread_)) {
110 DLOG(ERROR) << "failed to create thread"; 187 DLOG(ERROR) << "failed to create thread";
111 startup_data_ = NULL; 188 start_event_.reset();
112 return false; 189 return false;
113 } 190 }
114 191
192 return true;
193 }
194
195 bool Thread::StartAndWait() {
196 bool result = Start();
197 if (!result)
198 return result;
199 WaitUntilThreadStarted();
200 return result;
201 }
202
203 bool Thread::WaitUntilThreadStarted() {
204 if (!start_event_)
205 return false;
115 // TODO(kinuko): Remove once crbug.com/465458 is solved. 206 // TODO(kinuko): Remove once crbug.com/465458 is solved.
116 tracked_objects::ScopedTracker tracking_profile_wait( 207 tracked_objects::ScopedTracker tracking_profile(
117 FROM_HERE_WITH_EXPLICIT_FUNCTION( 208 FROM_HERE_WITH_EXPLICIT_FUNCTION(
118 "465458 base::Thread::StartWithOptions (Wait)")); 209 "465458 base::Thread::WaitUntilThreadStarted()"));
119
120 // Wait for the thread to start and initialize message_loop_
121 base::ThreadRestrictions::ScopedAllowWait allow_wait; 210 base::ThreadRestrictions::ScopedAllowWait allow_wait;
122 startup_data.event.Wait(); 211 start_event_->Wait();
123
124 // set it to NULL so we don't keep a pointer to some object on the stack.
125 startup_data_ = NULL;
126 started_ = true;
127
128 DCHECK(message_loop_);
129 return true; 212 return true;
130 } 213 }
131 214
132 void Thread::Stop() { 215 void Thread::Stop() {
133 if (!started_) 216 if (!start_event_)
134 return; 217 return;
135 218
136 StopSoon(); 219 StopSoon();
137 220
138 // Wait for the thread to exit. 221 // Wait for the thread to exit.
139 // 222 //
140 // TODO(darin): Unfortunately, we need to keep message_loop_ around until 223 // TODO(darin): Unfortunately, we need to keep message_loop_ around until
141 // the thread exits. Some consumers are abusing the API. Make them stop. 224 // the thread exits. Some consumers are abusing the API. Make them stop.
142 // 225 //
143 PlatformThread::Join(thread_); 226 PlatformThread::Join(thread_);
144 227
145 // The thread should NULL message_loop_ on exit. 228 // The thread should NULL message_loop_ on exit.
146 DCHECK(!message_loop_); 229 DCHECK(!message_loop_);
147 230
148 // The thread no longer needs to be joined. 231 // The thread no longer needs to be joined.
149 started_ = false; 232 start_event_.reset();
150 233
151 stopping_ = false; 234 stopping_ = false;
152 } 235 }
153 236
154 void Thread::StopSoon() { 237 void Thread::StopSoon() {
155 // We should only be called on the same thread that started us. 238 // We should only be called on the same thread that started us.
156 239
157 // Reading thread_id_ without a lock can lead to a benign data race 240 DCHECK_NE(thread_id(), PlatformThread::CurrentId());
158 // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer.
159 DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId());
160 241
161 if (stopping_ || !message_loop_) 242 if (stopping_ || !start_event_)
162 return; 243 return;
163 244
164 stopping_ = true; 245 stopping_ = true;
165 message_loop_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); 246 task_runner_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
166 } 247 }
167 248
168 bool Thread::IsRunning() const { 249 bool Thread::IsRunning() const {
250 if (start_event_ && !stopping_)
251 return true;
252 AutoLock lock(lock_);
169 return running_; 253 return running_;
170 } 254 }
171 255
172 void Thread::SetPriority(ThreadPriority priority) { 256 void Thread::SetPriority(ThreadPriority priority) {
173 // The thread must be started (and id known) for this to be 257 // The thread must be started (and id known) for this to be
174 // compatible with all platforms. 258 // compatible with all platforms.
175 DCHECK_NE(thread_id_, kInvalidThreadId); 259 DCHECK(!!start_event_);
176 PlatformThread::SetThreadPriority(thread_, priority); 260 PlatformThread::SetThreadPriority(thread_, priority);
177 } 261 }
178 262
179 void Thread::Run(MessageLoop* message_loop) { 263 void Thread::Run(MessageLoop* message_loop) {
180 message_loop->Run(); 264 message_loop->Run();
181 } 265 }
182 266
183 void Thread::SetThreadWasQuitProperly(bool flag) { 267 void Thread::SetThreadWasQuitProperly(bool flag) {
184 lazy_tls_bool.Pointer()->Set(flag); 268 lazy_tls_bool.Pointer()->Set(flag);
185 } 269 }
186 270
187 bool Thread::GetThreadWasQuitProperly() { 271 bool Thread::GetThreadWasQuitProperly() {
188 bool quit_properly = true; 272 bool quit_properly = true;
189 #ifndef NDEBUG 273 #ifndef NDEBUG
190 quit_properly = lazy_tls_bool.Pointer()->Get(); 274 quit_properly = lazy_tls_bool.Pointer()->Get();
191 #endif 275 #endif
192 return quit_properly; 276 return quit_properly;
193 } 277 }
194 278
195 void Thread::ThreadMain() { 279 void Thread::ThreadMain() {
280 // The message loop for this thread.
281 // Allocated on the heap to centralize any leak reports at this line.
282 scoped_ptr<MessageLoop> message_loop;
283 if (!startup_data_->message_pump_factory.is_null()) {
284 message_loop.reset(
285 new MessageLoop(startup_data_->message_pump_factory.Run()));
286 } else {
287 message_loop.reset(
288 new MessageLoop(startup_data_->message_loop_type));
289 }
290
291 // Complete the initialization of our Thread object.
292 DCHECK_EQ(thread_id(), PlatformThread::CurrentId());
293 PlatformThread::SetName(name_.c_str());
294 ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
295 message_loop->set_thread_name(name_);
296 message_loop->SetTimerSlack(startup_data_->timer_slack);
297 message_loop_ = message_loop.get();
298
299 static_cast<ProxyTaskRunner*>(task_runner_.get())->SetInternalTaskRunner(
300 message_loop_->task_runner());
301
196 { 302 {
197 // The message loop for this thread. 303 base::AutoLock locker(task_runner_lock_);
198 // Allocated on the heap to centralize any leak reports at this line. 304 task_runner_ = message_loop_->task_runner();
199 scoped_ptr<MessageLoop> message_loop; 305 }
200 if (!startup_data_->options.message_pump_factory.is_null()) {
201 message_loop.reset(
202 new MessageLoop(startup_data_->options.message_pump_factory.Run()));
203 } else {
204 message_loop.reset(
205 new MessageLoop(startup_data_->options.message_loop_type));
206 }
207 306
208 // Complete the initialization of our Thread object. 307 startup_data_.reset();
209 thread_id_ = PlatformThread::CurrentId();
210 PlatformThread::SetName(name_.c_str());
211 ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
212 message_loop->set_thread_name(name_);
213 message_loop->SetTimerSlack(startup_data_->options.timer_slack);
214 message_loop_ = message_loop.get();
215 308
216 #if defined(OS_WIN) 309 #if defined(OS_WIN)
217 scoped_ptr<win::ScopedCOMInitializer> com_initializer; 310 scoped_ptr<win::ScopedCOMInitializer> com_initializer;
218 if (com_status_ != NONE) { 311 if (com_status_ != NONE) {
219 com_initializer.reset((com_status_ == STA) ? 312 com_initializer.reset((com_status_ == STA) ?
220 new win::ScopedCOMInitializer() : 313 new win::ScopedCOMInitializer() :
221 new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); 314 new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
222 } 315 }
223 #endif 316 #endif
224 317
225 // Let the thread do extra initialization. 318 // Let the thread do extra initialization.
226 // Let's do this before signaling we are started. 319 Init();
227 Init();
228 320
321 {
322 AutoLock lock(lock_);
229 running_ = true; 323 running_ = true;
230 startup_data_->event.Signal(); 324 }
231 // startup_data_ can't be touched anymore since the starting thread is now
232 // unlocked.
233 325
234 Run(message_loop_); 326 start_event_->Signal();
327
328 Run(message_loop_);
329
330 {
331 AutoLock lock(lock_);
235 running_ = false; 332 running_ = false;
333 }
236 334
237 // Let the thread do extra cleanup. 335 // Let the thread do extra cleanup.
238 CleanUp(); 336 CleanUp();
239 337
240 #if defined(OS_WIN) 338 #if defined(OS_WIN)
241 com_initializer.reset(); 339 com_initializer.reset();
242 #endif 340 #endif
243 341
244 // Assert that MessageLoop::Quit was called by ThreadQuitHelper. 342 // Assert that MessageLoop::Quit was called by ThreadQuitHelper.
245 DCHECK(GetThreadWasQuitProperly()); 343 DCHECK(GetThreadWasQuitProperly());
246 344
247 // We can't receive messages anymore. 345 // We can't receive messages anymore.
248 message_loop_ = NULL; 346 // (The message loop is destructed at the end of this block)
249 } 347 message_loop_ = NULL;
250 } 348 }
251 349
252 } // namespace base 350 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/thread.h ('k') | base/threading/thread_id_name_manager_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698