Index: base/threading/thread.cc |
diff --git a/base/threading/thread.cc b/base/threading/thread.cc |
index d42ba4d1577ae4abcd9ad8a8bb5576036a01b5b0..19bd8ea115cf3e775b03d4b5b864efc26d1d1e16 100644 |
--- a/base/threading/thread.cc |
+++ b/base/threading/thread.cc |
@@ -4,9 +4,12 @@ |
#include "base/threading/thread.h" |
+#include <queue> |
+ |
#include "base/bind.h" |
#include "base/lazy_instance.h" |
#include "base/profiler/scoped_tracker.h" |
+#include "base/synchronization/lock.h" |
#include "base/synchronization/waitable_event.h" |
#include "base/third_party/dynamic_annotations/dynamic_annotations.h" |
#include "base/threading/thread_id_name_manager.h" |
@@ -36,18 +39,93 @@ void ThreadQuitHelper() { |
Thread::SetThreadWasQuitProperly(true); |
} |
-// Used to pass data to ThreadMain. This structure is allocated on the stack |
-// from within StartWithOptions. |
-struct Thread::StartupData { |
- // We get away with a const reference here because of how we are allocated. |
- const Thread::Options& options; |
+// Accumulates incoming tasks until SetInternalTaskRunner() is called. |
+// Once the internal task runner is set it forwards the posted tasks to |
+// the internal one. |
+class Thread::ProxyTaskRunner : public SingleThreadTaskRunner { |
+ public: |
+ ProxyTaskRunner() {} |
+ |
+ void SetInternalTaskRunner( |
+ const scoped_refptr<SingleThreadTaskRunner>& task_runner) { |
+ AutoLock locker(lock_); |
+ DCHECK(!internal_task_runner_); |
+ // Naively repost all queued tasks to the internal one. Note that |
+ // a task posted with X delay will run with X + Y delay if this method |
+ // is called after Y. |
+ internal_task_runner_ = task_runner; |
+ while (!task_queue_.empty()) { |
+ const Task& task = task_queue_.front(); |
+ if (task.nestable) { |
+ internal_task_runner_->PostDelayedTask( |
+ task.from_here, task.task, task.delay); |
+ } else { |
+ internal_task_runner_->PostNonNestableDelayedTask( |
+ task.from_here, task.task, task.delay); |
+ } |
+ task_queue_.pop(); |
+ } |
+ } |
+ |
+ protected: |
+ ~ProxyTaskRunner() override {} |
+ |
+ // ProxyTaskRunner override: |
+ bool PostDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& task, |
+ base::TimeDelta delay) override { |
+ AutoLock locker(lock_); |
+ DCHECK(!task.is_null()) << from_here.ToString(); |
+ if (internal_task_runner_) |
+ return internal_task_runner_->PostDelayedTask(from_here, task, delay); |
+ task_queue_.push(Task(from_here, task, delay, true)); |
+ return true; |
+ } |
- // Used to synchronize thread startup. |
- WaitableEvent event; |
+ bool RunsTasksOnCurrentThread() const override { |
+ return internal_task_runner_ |
+ ? internal_task_runner_->RunsTasksOnCurrentThread() |
+ : false; |
+ } |
+ |
+ // SequencedTaskRunner override: |
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& task, |
+ base::TimeDelta delay) override { |
+ AutoLock locker(lock_); |
+ DCHECK(!task.is_null()) << from_here.ToString(); |
+ if (internal_task_runner_) { |
+ return internal_task_runner_->PostNonNestableDelayedTask( |
+ from_here, task, delay); |
+ } |
+ task_queue_.push(Task(from_here, task, delay, false)); |
+ return true; |
+ } |
- explicit StartupData(const Options& opt) |
- : options(opt), |
- event(false, false) {} |
+ private: |
+ struct Task { |
+ Task(const tracked_objects::Location& from_here, |
+ const Closure& task, |
+ base::TimeDelta delay, |
+ bool nestable) |
+ : from_here(from_here), |
+ task(task), |
+ delay(delay), |
+ nestable(nestable) {} |
+ Task() : nestable(false) {} |
+ ~Task() {} |
+ |
+ tracked_objects::Location from_here; |
+ Closure task; |
+ base::TimeDelta delay; |
+ bool nestable; |
+ }; |
+ |
+ base::Lock lock_; |
+ std::queue<Task> task_queue_; |
+ scoped_refptr<SingleThreadTaskRunner> internal_task_runner_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(ProxyTaskRunner); |
}; |
Thread::Options::Options() |
@@ -71,13 +149,10 @@ Thread::Thread(const std::string& name) |
#if defined(OS_WIN) |
com_status_(NONE), |
#endif |
- started_(false), |
stopping_(false), |
running_(false), |
- startup_data_(NULL), |
thread_(0), |
- message_loop_(NULL), |
- thread_id_(kInvalidThreadId), |
+ message_loop_(nullptr), |
name_(name) { |
} |
@@ -103,34 +178,42 @@ bool Thread::StartWithOptions(const Options& options) { |
SetThreadWasQuitProperly(false); |
- StartupData startup_data(options); |
- startup_data_ = &startup_data; |
+ task_runner_ = new ProxyTaskRunner(); |
+ |
+ startup_data_.reset(new Options(options)); |
+ start_event_.reset(new WaitableEvent(false, false)); |
if (!PlatformThread::Create(options.stack_size, this, &thread_)) { |
DLOG(ERROR) << "failed to create thread"; |
- startup_data_ = NULL; |
+ start_event_.reset(); |
return false; |
} |
+ return true; |
+} |
+ |
+bool Thread::StartAndWait() { |
+ bool result = Start(); |
+ if (!result) |
+ return result; |
+ WaitUntilThreadStarted(); |
+ return result; |
+} |
+ |
+bool Thread::WaitUntilThreadStarted() { |
+ if (!start_event_) |
+ return false; |
// TODO(kinuko): Remove once crbug.com/465458 is solved. |
- tracked_objects::ScopedTracker tracking_profile_wait( |
+ tracked_objects::ScopedTracker tracking_profile( |
FROM_HERE_WITH_EXPLICIT_FUNCTION( |
- "465458 base::Thread::StartWithOptions (Wait)")); |
- |
- // Wait for the thread to start and initialize message_loop_ |
+ "465458 base::Thread::WaitUntilThreadStarted()")); |
base::ThreadRestrictions::ScopedAllowWait allow_wait; |
- startup_data.event.Wait(); |
- |
- // set it to NULL so we don't keep a pointer to some object on the stack. |
- startup_data_ = NULL; |
- started_ = true; |
- |
- DCHECK(message_loop_); |
+ start_event_->Wait(); |
return true; |
} |
void Thread::Stop() { |
- if (!started_) |
+ if (!start_event_) |
return; |
StopSoon(); |
@@ -146,7 +229,7 @@ void Thread::Stop() { |
DCHECK(!message_loop_); |
// The thread no longer needs to be joined. |
- started_ = false; |
+ start_event_.reset(); |
stopping_ = false; |
} |
@@ -154,25 +237,26 @@ void Thread::Stop() { |
void Thread::StopSoon() { |
// We should only be called on the same thread that started us. |
- // Reading thread_id_ without a lock can lead to a benign data race |
- // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer. |
- DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId()); |
+ DCHECK_NE(thread_id(), PlatformThread::CurrentId()); |
- if (stopping_ || !message_loop_) |
+ if (stopping_ || !start_event_) |
return; |
stopping_ = true; |
- message_loop_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); |
+ task_runner_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); |
} |
bool Thread::IsRunning() const { |
+ if (start_event_ && !stopping_) |
+ return true; |
+ AutoLock lock(lock_); |
return running_; |
} |
void Thread::SetPriority(ThreadPriority priority) { |
// The thread must be started (and id known) for this to be |
// compatible with all platforms. |
- DCHECK_NE(thread_id_, kInvalidThreadId); |
+ DCHECK(!!start_event_); |
PlatformThread::SetThreadPriority(thread_, priority); |
} |
@@ -193,60 +277,74 @@ bool Thread::GetThreadWasQuitProperly() { |
} |
void Thread::ThreadMain() { |
+ // The message loop for this thread. |
+ // Allocated on the heap to centralize any leak reports at this line. |
+ scoped_ptr<MessageLoop> message_loop; |
+ if (!startup_data_->message_pump_factory.is_null()) { |
+ message_loop.reset( |
+ new MessageLoop(startup_data_->message_pump_factory.Run())); |
+ } else { |
+ message_loop.reset( |
+ new MessageLoop(startup_data_->message_loop_type)); |
+ } |
+ |
+ // Complete the initialization of our Thread object. |
+ DCHECK_EQ(thread_id(), PlatformThread::CurrentId()); |
+ PlatformThread::SetName(name_.c_str()); |
+ ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. |
+ message_loop->set_thread_name(name_); |
+ message_loop->SetTimerSlack(startup_data_->timer_slack); |
+ message_loop_ = message_loop.get(); |
+ |
+ static_cast<ProxyTaskRunner*>(task_runner_.get())->SetInternalTaskRunner( |
+ message_loop_->task_runner()); |
+ |
{ |
- // The message loop for this thread. |
- // Allocated on the heap to centralize any leak reports at this line. |
- scoped_ptr<MessageLoop> message_loop; |
- if (!startup_data_->options.message_pump_factory.is_null()) { |
- message_loop.reset( |
- new MessageLoop(startup_data_->options.message_pump_factory.Run())); |
- } else { |
- message_loop.reset( |
- new MessageLoop(startup_data_->options.message_loop_type)); |
- } |
+ base::AutoLock locker(task_runner_lock_); |
+ task_runner_ = message_loop_->task_runner(); |
+ } |
- // Complete the initialization of our Thread object. |
- thread_id_ = PlatformThread::CurrentId(); |
- PlatformThread::SetName(name_.c_str()); |
- ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. |
- message_loop->set_thread_name(name_); |
- message_loop->SetTimerSlack(startup_data_->options.timer_slack); |
- message_loop_ = message_loop.get(); |
+ startup_data_.reset(); |
#if defined(OS_WIN) |
- scoped_ptr<win::ScopedCOMInitializer> com_initializer; |
- if (com_status_ != NONE) { |
- com_initializer.reset((com_status_ == STA) ? |
- new win::ScopedCOMInitializer() : |
- new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); |
- } |
+ scoped_ptr<win::ScopedCOMInitializer> com_initializer; |
+ if (com_status_ != NONE) { |
+ com_initializer.reset((com_status_ == STA) ? |
+ new win::ScopedCOMInitializer() : |
+ new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); |
+ } |
#endif |
- // Let the thread do extra initialization. |
- // Let's do this before signaling we are started. |
- Init(); |
+ // Let the thread do extra initialization. |
+ Init(); |
+ { |
+ AutoLock lock(lock_); |
running_ = true; |
- startup_data_->event.Signal(); |
- // startup_data_ can't be touched anymore since the starting thread is now |
- // unlocked. |
+ } |
+ |
+ start_event_->Signal(); |
- Run(message_loop_); |
+ Run(message_loop_); |
+ |
+ { |
+ AutoLock lock(lock_); |
running_ = false; |
+ } |
- // Let the thread do extra cleanup. |
- CleanUp(); |
+ // Let the thread do extra cleanup. |
+ CleanUp(); |
#if defined(OS_WIN) |
- com_initializer.reset(); |
+ com_initializer.reset(); |
#endif |
- // Assert that MessageLoop::Quit was called by ThreadQuitHelper. |
- DCHECK(GetThreadWasQuitProperly()); |
+ // Assert that MessageLoop::Quit was called by ThreadQuitHelper. |
+ DCHECK(GetThreadWasQuitProperly()); |
- // We can't receive messages anymore. |
- message_loop_ = NULL; |
- } |
+ // We can't receive messages anymore. |
+ // (The message loop is destructed at the end of this block) |
+ message_loop_ = NULL; |
} |
} // namespace base |