OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/thread.h" | 5 #include "base/threading/thread.h" |
6 | 6 |
| 7 #include <queue> |
| 8 |
7 #include "base/bind.h" | 9 #include "base/bind.h" |
8 #include "base/lazy_instance.h" | 10 #include "base/lazy_instance.h" |
9 #include "base/profiler/scoped_tracker.h" | 11 #include "base/profiler/scoped_tracker.h" |
| 12 #include "base/synchronization/lock.h" |
10 #include "base/synchronization/waitable_event.h" | 13 #include "base/synchronization/waitable_event.h" |
11 #include "base/third_party/dynamic_annotations/dynamic_annotations.h" | 14 #include "base/third_party/dynamic_annotations/dynamic_annotations.h" |
12 #include "base/threading/thread_id_name_manager.h" | 15 #include "base/threading/thread_id_name_manager.h" |
13 #include "base/threading/thread_local.h" | 16 #include "base/threading/thread_local.h" |
14 #include "base/threading/thread_restrictions.h" | 17 #include "base/threading/thread_restrictions.h" |
15 | 18 |
16 #if defined(OS_WIN) | 19 #if defined(OS_WIN) |
17 #include "base/win/scoped_com_initializer.h" | 20 #include "base/win/scoped_com_initializer.h" |
18 #endif | 21 #endif |
19 | 22 |
20 namespace base { | 23 namespace base { |
21 | 24 |
22 namespace { | 25 namespace { |
23 | 26 |
24 // We use this thread-local variable to record whether or not a thread exited | 27 // We use this thread-local variable to record whether or not a thread exited |
25 // because its Stop method was called. This allows us to catch cases where | 28 // because its Stop method was called. This allows us to catch cases where |
26 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when | 29 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when |
27 // using a Thread to setup and run a MessageLoop. | 30 // using a Thread to setup and run a MessageLoop. |
28 base::LazyInstance<base::ThreadLocalBoolean> lazy_tls_bool = | 31 base::LazyInstance<base::ThreadLocalBoolean> lazy_tls_bool = |
29 LAZY_INSTANCE_INITIALIZER; | 32 LAZY_INSTANCE_INITIALIZER; |
30 | 33 |
31 } // namespace | 34 } // namespace |
32 | 35 |
33 // This is used to trigger the message loop to exit. | 36 // This is used to trigger the message loop to exit. |
34 void ThreadQuitHelper() { | 37 void ThreadQuitHelper() { |
35 MessageLoop::current()->QuitWhenIdle(); | 38 MessageLoop::current()->QuitWhenIdle(); |
36 Thread::SetThreadWasQuitProperly(true); | 39 Thread::SetThreadWasQuitProperly(true); |
37 } | 40 } |
38 | 41 |
39 // Used to pass data to ThreadMain. This structure is allocated on the stack | 42 // Accumulates incoming tasks until SetInternalTaskRunner() is called. |
40 // from within StartWithOptions. | 43 // Once the internal task runner is set it forwards the posted tasks to |
41 struct Thread::StartupData { | 44 // the internal one. |
42 // We get away with a const reference here because of how we are allocated. | 45 class Thread::ProxyTaskRunner : public SingleThreadTaskRunner { |
43 const Thread::Options& options; | 46 public: |
| 47 ProxyTaskRunner() {} |
44 | 48 |
45 // Used to synchronize thread startup. | 49 void SetInternalTaskRunner( |
46 WaitableEvent event; | 50 const scoped_refptr<SingleThreadTaskRunner>& task_runner) { |
| 51 AutoLock locker(lock_); |
| 52 DCHECK(!internal_task_runner_); |
| 53 // Naively repost all queued tasks to the internal one. Note that |
| 54 // a task posted with X delay will run with X + Y delay if this method |
| 55 // is called after Y. |
| 56 internal_task_runner_ = task_runner; |
| 57 while (!task_queue_.empty()) { |
| 58 const Task& task = task_queue_.front(); |
| 59 if (task.nestable) { |
| 60 internal_task_runner_->PostDelayedTask( |
| 61 task.from_here, task.task, task.delay); |
| 62 } else { |
| 63 internal_task_runner_->PostNonNestableDelayedTask( |
| 64 task.from_here, task.task, task.delay); |
| 65 } |
| 66 task_queue_.pop(); |
| 67 } |
| 68 } |
47 | 69 |
48 explicit StartupData(const Options& opt) | 70 protected: |
49 : options(opt), | 71 ~ProxyTaskRunner() override {} |
50 event(false, false) {} | 72 |
| 73 // ProxyTaskRunner override: |
| 74 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 75 const Closure& task, |
| 76 base::TimeDelta delay) override { |
| 77 AutoLock locker(lock_); |
| 78 DCHECK(!task.is_null()) << from_here.ToString(); |
| 79 if (internal_task_runner_) |
| 80 return internal_task_runner_->PostDelayedTask(from_here, task, delay); |
| 81 task_queue_.push(Task(from_here, task, delay, true)); |
| 82 return true; |
| 83 } |
| 84 |
| 85 bool RunsTasksOnCurrentThread() const override { |
| 86 return internal_task_runner_ |
| 87 ? internal_task_runner_->RunsTasksOnCurrentThread() |
| 88 : false; |
| 89 } |
| 90 |
| 91 // SequencedTaskRunner override: |
| 92 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 93 const Closure& task, |
| 94 base::TimeDelta delay) override { |
| 95 AutoLock locker(lock_); |
| 96 DCHECK(!task.is_null()) << from_here.ToString(); |
| 97 if (internal_task_runner_) { |
| 98 return internal_task_runner_->PostNonNestableDelayedTask( |
| 99 from_here, task, delay); |
| 100 } |
| 101 task_queue_.push(Task(from_here, task, delay, false)); |
| 102 return true; |
| 103 } |
| 104 |
| 105 private: |
| 106 struct Task { |
| 107 Task(const tracked_objects::Location& from_here, |
| 108 const Closure& task, |
| 109 base::TimeDelta delay, |
| 110 bool nestable) |
| 111 : from_here(from_here), |
| 112 task(task), |
| 113 delay(delay), |
| 114 nestable(nestable) {} |
| 115 Task() : nestable(false) {} |
| 116 ~Task() {} |
| 117 |
| 118 tracked_objects::Location from_here; |
| 119 Closure task; |
| 120 base::TimeDelta delay; |
| 121 bool nestable; |
| 122 }; |
| 123 |
| 124 base::Lock lock_; |
| 125 std::queue<Task> task_queue_; |
| 126 scoped_refptr<SingleThreadTaskRunner> internal_task_runner_; |
| 127 |
| 128 DISALLOW_COPY_AND_ASSIGN(ProxyTaskRunner); |
51 }; | 129 }; |
52 | 130 |
53 Thread::Options::Options() | 131 Thread::Options::Options() |
54 : message_loop_type(MessageLoop::TYPE_DEFAULT), | 132 : message_loop_type(MessageLoop::TYPE_DEFAULT), |
55 timer_slack(TIMER_SLACK_NONE), | 133 timer_slack(TIMER_SLACK_NONE), |
56 stack_size(0) { | 134 stack_size(0) { |
57 } | 135 } |
58 | 136 |
59 Thread::Options::Options(MessageLoop::Type type, | 137 Thread::Options::Options(MessageLoop::Type type, |
60 size_t size) | 138 size_t size) |
61 : message_loop_type(type), | 139 : message_loop_type(type), |
62 timer_slack(TIMER_SLACK_NONE), | 140 timer_slack(TIMER_SLACK_NONE), |
63 stack_size(size) { | 141 stack_size(size) { |
64 } | 142 } |
65 | 143 |
66 Thread::Options::~Options() { | 144 Thread::Options::~Options() { |
67 } | 145 } |
68 | 146 |
69 Thread::Thread(const std::string& name) | 147 Thread::Thread(const std::string& name) |
70 : | 148 : |
71 #if defined(OS_WIN) | 149 #if defined(OS_WIN) |
72 com_status_(NONE), | 150 com_status_(NONE), |
73 #endif | 151 #endif |
74 started_(false), | |
75 stopping_(false), | 152 stopping_(false), |
76 running_(false), | 153 running_(false), |
77 startup_data_(NULL), | |
78 thread_(0), | 154 thread_(0), |
79 message_loop_(NULL), | 155 message_loop_(nullptr), |
80 thread_id_(kInvalidThreadId), | |
81 name_(name) { | 156 name_(name) { |
82 } | 157 } |
83 | 158 |
84 Thread::~Thread() { | 159 Thread::~Thread() { |
85 Stop(); | 160 Stop(); |
86 } | 161 } |
87 | 162 |
88 bool Thread::Start() { | 163 bool Thread::Start() { |
89 Options options; | 164 Options options; |
90 #if defined(OS_WIN) | 165 #if defined(OS_WIN) |
91 if (com_status_ == STA) | 166 if (com_status_ == STA) |
92 options.message_loop_type = MessageLoop::TYPE_UI; | 167 options.message_loop_type = MessageLoop::TYPE_UI; |
93 #endif | 168 #endif |
94 return StartWithOptions(options); | 169 return StartWithOptions(options); |
95 } | 170 } |
96 | 171 |
97 bool Thread::StartWithOptions(const Options& options) { | 172 bool Thread::StartWithOptions(const Options& options) { |
98 DCHECK(!message_loop_); | 173 DCHECK(!message_loop_); |
99 #if defined(OS_WIN) | 174 #if defined(OS_WIN) |
100 DCHECK((com_status_ != STA) || | 175 DCHECK((com_status_ != STA) || |
101 (options.message_loop_type == MessageLoop::TYPE_UI)); | 176 (options.message_loop_type == MessageLoop::TYPE_UI)); |
102 #endif | 177 #endif |
103 | 178 |
104 SetThreadWasQuitProperly(false); | 179 SetThreadWasQuitProperly(false); |
105 | 180 |
106 StartupData startup_data(options); | 181 task_runner_ = new ProxyTaskRunner(); |
107 startup_data_ = &startup_data; | 182 |
| 183 startup_data_.reset(new Options(options)); |
| 184 start_event_.reset(new WaitableEvent(false, false)); |
108 | 185 |
109 if (!PlatformThread::Create(options.stack_size, this, &thread_)) { | 186 if (!PlatformThread::Create(options.stack_size, this, &thread_)) { |
110 DLOG(ERROR) << "failed to create thread"; | 187 DLOG(ERROR) << "failed to create thread"; |
111 startup_data_ = NULL; | 188 start_event_.reset(); |
112 return false; | 189 return false; |
113 } | 190 } |
114 | 191 |
| 192 return true; |
| 193 } |
| 194 |
| 195 bool Thread::StartAndWait() { |
| 196 bool result = Start(); |
| 197 if (!result) |
| 198 return result; |
| 199 WaitUntilThreadStarted(); |
| 200 return result; |
| 201 } |
| 202 |
| 203 bool Thread::WaitUntilThreadStarted() { |
| 204 if (!start_event_) |
| 205 return false; |
115 // TODO(kinuko): Remove once crbug.com/465458 is solved. | 206 // TODO(kinuko): Remove once crbug.com/465458 is solved. |
116 tracked_objects::ScopedTracker tracking_profile_wait( | 207 tracked_objects::ScopedTracker tracking_profile( |
117 FROM_HERE_WITH_EXPLICIT_FUNCTION( | 208 FROM_HERE_WITH_EXPLICIT_FUNCTION( |
118 "465458 base::Thread::StartWithOptions (Wait)")); | 209 "465458 base::Thread::WaitUntilThreadStarted()")); |
119 | |
120 // Wait for the thread to start and initialize message_loop_ | |
121 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 210 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
122 startup_data.event.Wait(); | 211 start_event_->Wait(); |
123 | |
124 // set it to NULL so we don't keep a pointer to some object on the stack. | |
125 startup_data_ = NULL; | |
126 started_ = true; | |
127 | |
128 DCHECK(message_loop_); | |
129 return true; | 212 return true; |
130 } | 213 } |
131 | 214 |
132 void Thread::Stop() { | 215 void Thread::Stop() { |
133 if (!started_) | 216 if (!start_event_) |
134 return; | 217 return; |
135 | 218 |
136 StopSoon(); | 219 StopSoon(); |
137 | 220 |
138 // Wait for the thread to exit. | 221 // Wait for the thread to exit. |
139 // | 222 // |
140 // TODO(darin): Unfortunately, we need to keep message_loop_ around until | 223 // TODO(darin): Unfortunately, we need to keep message_loop_ around until |
141 // the thread exits. Some consumers are abusing the API. Make them stop. | 224 // the thread exits. Some consumers are abusing the API. Make them stop. |
142 // | 225 // |
143 PlatformThread::Join(thread_); | 226 PlatformThread::Join(thread_); |
144 | 227 |
145 // The thread should NULL message_loop_ on exit. | 228 // The thread should NULL message_loop_ on exit. |
146 DCHECK(!message_loop_); | 229 DCHECK(!message_loop_); |
147 | 230 |
148 // The thread no longer needs to be joined. | 231 // The thread no longer needs to be joined. |
149 started_ = false; | 232 start_event_.reset(); |
150 | 233 |
151 stopping_ = false; | 234 stopping_ = false; |
152 } | 235 } |
153 | 236 |
154 void Thread::StopSoon() { | 237 void Thread::StopSoon() { |
155 // We should only be called on the same thread that started us. | 238 // We should only be called on the same thread that started us. |
156 | 239 |
157 // Reading thread_id_ without a lock can lead to a benign data race | 240 DCHECK_NE(thread_id(), PlatformThread::CurrentId()); |
158 // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer. | |
159 DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId()); | |
160 | 241 |
161 if (stopping_ || !message_loop_) | 242 if (stopping_ || !start_event_) |
162 return; | 243 return; |
163 | 244 |
164 stopping_ = true; | 245 stopping_ = true; |
165 message_loop_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); | 246 task_runner_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); |
166 } | 247 } |
167 | 248 |
168 bool Thread::IsRunning() const { | 249 bool Thread::IsRunning() const { |
| 250 if (start_event_ && !stopping_) |
| 251 return true; |
| 252 AutoLock lock(lock_); |
169 return running_; | 253 return running_; |
170 } | 254 } |
171 | 255 |
172 void Thread::SetPriority(ThreadPriority priority) { | 256 void Thread::SetPriority(ThreadPriority priority) { |
173 // The thread must be started (and id known) for this to be | 257 // The thread must be started (and id known) for this to be |
174 // compatible with all platforms. | 258 // compatible with all platforms. |
175 DCHECK_NE(thread_id_, kInvalidThreadId); | 259 DCHECK(!!start_event_); |
176 PlatformThread::SetThreadPriority(thread_, priority); | 260 PlatformThread::SetThreadPriority(thread_, priority); |
177 } | 261 } |
178 | 262 |
179 void Thread::Run(MessageLoop* message_loop) { | 263 void Thread::Run(MessageLoop* message_loop) { |
180 message_loop->Run(); | 264 message_loop->Run(); |
181 } | 265 } |
182 | 266 |
183 void Thread::SetThreadWasQuitProperly(bool flag) { | 267 void Thread::SetThreadWasQuitProperly(bool flag) { |
184 lazy_tls_bool.Pointer()->Set(flag); | 268 lazy_tls_bool.Pointer()->Set(flag); |
185 } | 269 } |
186 | 270 |
187 bool Thread::GetThreadWasQuitProperly() { | 271 bool Thread::GetThreadWasQuitProperly() { |
188 bool quit_properly = true; | 272 bool quit_properly = true; |
189 #ifndef NDEBUG | 273 #ifndef NDEBUG |
190 quit_properly = lazy_tls_bool.Pointer()->Get(); | 274 quit_properly = lazy_tls_bool.Pointer()->Get(); |
191 #endif | 275 #endif |
192 return quit_properly; | 276 return quit_properly; |
193 } | 277 } |
194 | 278 |
195 void Thread::ThreadMain() { | 279 void Thread::ThreadMain() { |
| 280 // The message loop for this thread. |
| 281 // Allocated on the heap to centralize any leak reports at this line. |
| 282 scoped_ptr<MessageLoop> message_loop; |
| 283 if (!startup_data_->message_pump_factory.is_null()) { |
| 284 message_loop.reset( |
| 285 new MessageLoop(startup_data_->message_pump_factory.Run())); |
| 286 } else { |
| 287 message_loop.reset( |
| 288 new MessageLoop(startup_data_->message_loop_type)); |
| 289 } |
| 290 |
| 291 // Complete the initialization of our Thread object. |
| 292 DCHECK_EQ(thread_id(), PlatformThread::CurrentId()); |
| 293 PlatformThread::SetName(name_.c_str()); |
| 294 ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. |
| 295 message_loop->set_thread_name(name_); |
| 296 message_loop->SetTimerSlack(startup_data_->timer_slack); |
| 297 message_loop_ = message_loop.get(); |
| 298 |
| 299 static_cast<ProxyTaskRunner*>(task_runner_.get())->SetInternalTaskRunner( |
| 300 message_loop_->task_runner()); |
| 301 |
196 { | 302 { |
197 // The message loop for this thread. | 303 base::AutoLock locker(task_runner_lock_); |
198 // Allocated on the heap to centralize any leak reports at this line. | 304 task_runner_ = message_loop_->task_runner(); |
199 scoped_ptr<MessageLoop> message_loop; | 305 } |
200 if (!startup_data_->options.message_pump_factory.is_null()) { | |
201 message_loop.reset( | |
202 new MessageLoop(startup_data_->options.message_pump_factory.Run())); | |
203 } else { | |
204 message_loop.reset( | |
205 new MessageLoop(startup_data_->options.message_loop_type)); | |
206 } | |
207 | 306 |
208 // Complete the initialization of our Thread object. | 307 startup_data_.reset(); |
209 thread_id_ = PlatformThread::CurrentId(); | |
210 PlatformThread::SetName(name_.c_str()); | |
211 ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. | |
212 message_loop->set_thread_name(name_); | |
213 message_loop->SetTimerSlack(startup_data_->options.timer_slack); | |
214 message_loop_ = message_loop.get(); | |
215 | 308 |
216 #if defined(OS_WIN) | 309 #if defined(OS_WIN) |
217 scoped_ptr<win::ScopedCOMInitializer> com_initializer; | 310 scoped_ptr<win::ScopedCOMInitializer> com_initializer; |
218 if (com_status_ != NONE) { | 311 if (com_status_ != NONE) { |
219 com_initializer.reset((com_status_ == STA) ? | 312 com_initializer.reset((com_status_ == STA) ? |
220 new win::ScopedCOMInitializer() : | 313 new win::ScopedCOMInitializer() : |
221 new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); | 314 new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); |
222 } | 315 } |
223 #endif | 316 #endif |
224 | 317 |
225 // Let the thread do extra initialization. | 318 // Let the thread do extra initialization. |
226 // Let's do this before signaling we are started. | 319 Init(); |
227 Init(); | |
228 | 320 |
| 321 { |
| 322 AutoLock lock(lock_); |
229 running_ = true; | 323 running_ = true; |
230 startup_data_->event.Signal(); | 324 } |
231 // startup_data_ can't be touched anymore since the starting thread is now | |
232 // unlocked. | |
233 | 325 |
234 Run(message_loop_); | 326 start_event_->Signal(); |
| 327 |
| 328 Run(message_loop_); |
| 329 |
| 330 { |
| 331 AutoLock lock(lock_); |
235 running_ = false; | 332 running_ = false; |
| 333 } |
236 | 334 |
237 // Let the thread do extra cleanup. | 335 // Let the thread do extra cleanup. |
238 CleanUp(); | 336 CleanUp(); |
239 | 337 |
240 #if defined(OS_WIN) | 338 #if defined(OS_WIN) |
241 com_initializer.reset(); | 339 com_initializer.reset(); |
242 #endif | 340 #endif |
243 | 341 |
244 // Assert that MessageLoop::Quit was called by ThreadQuitHelper. | 342 // Assert that MessageLoop::Quit was called by ThreadQuitHelper. |
245 DCHECK(GetThreadWasQuitProperly()); | 343 DCHECK(GetThreadWasQuitProperly()); |
246 | 344 |
247 // We can't receive messages anymore. | 345 // We can't receive messages anymore. |
248 message_loop_ = NULL; | 346 // (The message loop is destructed at the end of this block) |
249 } | 347 message_loop_ = NULL; |
250 } | 348 } |
251 | 349 |
252 } // namespace base | 350 } // namespace base |
OLD | NEW |