| Index: base/threading/thread.cc
|
| diff --git a/base/threading/thread.cc b/base/threading/thread.cc
|
| index d42ba4d1577ae4abcd9ad8a8bb5576036a01b5b0..19bd8ea115cf3e775b03d4b5b864efc26d1d1e16 100644
|
| --- a/base/threading/thread.cc
|
| +++ b/base/threading/thread.cc
|
| @@ -4,9 +4,12 @@
|
|
|
| #include "base/threading/thread.h"
|
|
|
| +#include <queue>
|
| +
|
| #include "base/bind.h"
|
| #include "base/lazy_instance.h"
|
| #include "base/profiler/scoped_tracker.h"
|
| +#include "base/synchronization/lock.h"
|
| #include "base/synchronization/waitable_event.h"
|
| #include "base/third_party/dynamic_annotations/dynamic_annotations.h"
|
| #include "base/threading/thread_id_name_manager.h"
|
| @@ -36,18 +39,93 @@ void ThreadQuitHelper() {
|
| Thread::SetThreadWasQuitProperly(true);
|
| }
|
|
|
| -// Used to pass data to ThreadMain. This structure is allocated on the stack
|
| -// from within StartWithOptions.
|
| -struct Thread::StartupData {
|
| - // We get away with a const reference here because of how we are allocated.
|
| - const Thread::Options& options;
|
| +// Accumulates incoming tasks until SetInternalTaskRunner() is called.
|
| +// Once the internal task runner is set it forwards the posted tasks to
|
| +// the internal one.
|
| +class Thread::ProxyTaskRunner : public SingleThreadTaskRunner {
|
| + public:
|
| + ProxyTaskRunner() {}
|
| +
|
| + void SetInternalTaskRunner(
|
| + const scoped_refptr<SingleThreadTaskRunner>& task_runner) {
|
| + AutoLock locker(lock_);
|
| + DCHECK(!internal_task_runner_);
|
| + // Naively repost all queued tasks to the internal one. Note that
|
| + // a task posted with X delay will run with X + Y delay if this method
|
| + // is called after Y.
|
| + internal_task_runner_ = task_runner;
|
| + while (!task_queue_.empty()) {
|
| + const Task& task = task_queue_.front();
|
| + if (task.nestable) {
|
| + internal_task_runner_->PostDelayedTask(
|
| + task.from_here, task.task, task.delay);
|
| + } else {
|
| + internal_task_runner_->PostNonNestableDelayedTask(
|
| + task.from_here, task.task, task.delay);
|
| + }
|
| + task_queue_.pop();
|
| + }
|
| + }
|
| +
|
| + protected:
|
| + ~ProxyTaskRunner() override {}
|
| +
|
| + // ProxyTaskRunner override:
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + base::TimeDelta delay) override {
|
| + AutoLock locker(lock_);
|
| + DCHECK(!task.is_null()) << from_here.ToString();
|
| + if (internal_task_runner_)
|
| + return internal_task_runner_->PostDelayedTask(from_here, task, delay);
|
| + task_queue_.push(Task(from_here, task, delay, true));
|
| + return true;
|
| + }
|
|
|
| - // Used to synchronize thread startup.
|
| - WaitableEvent event;
|
| + bool RunsTasksOnCurrentThread() const override {
|
| + return internal_task_runner_
|
| + ? internal_task_runner_->RunsTasksOnCurrentThread()
|
| + : false;
|
| + }
|
| +
|
| + // SequencedTaskRunner override:
|
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + base::TimeDelta delay) override {
|
| + AutoLock locker(lock_);
|
| + DCHECK(!task.is_null()) << from_here.ToString();
|
| + if (internal_task_runner_) {
|
| + return internal_task_runner_->PostNonNestableDelayedTask(
|
| + from_here, task, delay);
|
| + }
|
| + task_queue_.push(Task(from_here, task, delay, false));
|
| + return true;
|
| + }
|
|
|
| - explicit StartupData(const Options& opt)
|
| - : options(opt),
|
| - event(false, false) {}
|
| + private:
|
| + struct Task {
|
| + Task(const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + base::TimeDelta delay,
|
| + bool nestable)
|
| + : from_here(from_here),
|
| + task(task),
|
| + delay(delay),
|
| + nestable(nestable) {}
|
| + Task() : nestable(false) {}
|
| + ~Task() {}
|
| +
|
| + tracked_objects::Location from_here;
|
| + Closure task;
|
| + base::TimeDelta delay;
|
| + bool nestable;
|
| + };
|
| +
|
| + base::Lock lock_;
|
| + std::queue<Task> task_queue_;
|
| + scoped_refptr<SingleThreadTaskRunner> internal_task_runner_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(ProxyTaskRunner);
|
| };
|
|
|
| Thread::Options::Options()
|
| @@ -71,13 +149,10 @@ Thread::Thread(const std::string& name)
|
| #if defined(OS_WIN)
|
| com_status_(NONE),
|
| #endif
|
| - started_(false),
|
| stopping_(false),
|
| running_(false),
|
| - startup_data_(NULL),
|
| thread_(0),
|
| - message_loop_(NULL),
|
| - thread_id_(kInvalidThreadId),
|
| + message_loop_(nullptr),
|
| name_(name) {
|
| }
|
|
|
| @@ -103,34 +178,42 @@ bool Thread::StartWithOptions(const Options& options) {
|
|
|
| SetThreadWasQuitProperly(false);
|
|
|
| - StartupData startup_data(options);
|
| - startup_data_ = &startup_data;
|
| + task_runner_ = new ProxyTaskRunner();
|
| +
|
| + startup_data_.reset(new Options(options));
|
| + start_event_.reset(new WaitableEvent(false, false));
|
|
|
| if (!PlatformThread::Create(options.stack_size, this, &thread_)) {
|
| DLOG(ERROR) << "failed to create thread";
|
| - startup_data_ = NULL;
|
| + start_event_.reset();
|
| return false;
|
| }
|
|
|
| + return true;
|
| +}
|
| +
|
| +bool Thread::StartAndWait() {
|
| + bool result = Start();
|
| + if (!result)
|
| + return result;
|
| + WaitUntilThreadStarted();
|
| + return result;
|
| +}
|
| +
|
| +bool Thread::WaitUntilThreadStarted() {
|
| + if (!start_event_)
|
| + return false;
|
| // TODO(kinuko): Remove once crbug.com/465458 is solved.
|
| - tracked_objects::ScopedTracker tracking_profile_wait(
|
| + tracked_objects::ScopedTracker tracking_profile(
|
| FROM_HERE_WITH_EXPLICIT_FUNCTION(
|
| - "465458 base::Thread::StartWithOptions (Wait)"));
|
| -
|
| - // Wait for the thread to start and initialize message_loop_
|
| + "465458 base::Thread::WaitUntilThreadStarted()"));
|
| base::ThreadRestrictions::ScopedAllowWait allow_wait;
|
| - startup_data.event.Wait();
|
| -
|
| - // set it to NULL so we don't keep a pointer to some object on the stack.
|
| - startup_data_ = NULL;
|
| - started_ = true;
|
| -
|
| - DCHECK(message_loop_);
|
| + start_event_->Wait();
|
| return true;
|
| }
|
|
|
| void Thread::Stop() {
|
| - if (!started_)
|
| + if (!start_event_)
|
| return;
|
|
|
| StopSoon();
|
| @@ -146,7 +229,7 @@ void Thread::Stop() {
|
| DCHECK(!message_loop_);
|
|
|
| // The thread no longer needs to be joined.
|
| - started_ = false;
|
| + start_event_.reset();
|
|
|
| stopping_ = false;
|
| }
|
| @@ -154,25 +237,26 @@ void Thread::Stop() {
|
| void Thread::StopSoon() {
|
| // We should only be called on the same thread that started us.
|
|
|
| - // Reading thread_id_ without a lock can lead to a benign data race
|
| - // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer.
|
| - DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId());
|
| + DCHECK_NE(thread_id(), PlatformThread::CurrentId());
|
|
|
| - if (stopping_ || !message_loop_)
|
| + if (stopping_ || !start_event_)
|
| return;
|
|
|
| stopping_ = true;
|
| - message_loop_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
|
| + task_runner_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
|
| }
|
|
|
| bool Thread::IsRunning() const {
|
| + if (start_event_ && !stopping_)
|
| + return true;
|
| + AutoLock lock(lock_);
|
| return running_;
|
| }
|
|
|
| void Thread::SetPriority(ThreadPriority priority) {
|
| // The thread must be started (and id known) for this to be
|
| // compatible with all platforms.
|
| - DCHECK_NE(thread_id_, kInvalidThreadId);
|
| + DCHECK(!!start_event_);
|
| PlatformThread::SetThreadPriority(thread_, priority);
|
| }
|
|
|
| @@ -193,60 +277,74 @@ bool Thread::GetThreadWasQuitProperly() {
|
| }
|
|
|
| void Thread::ThreadMain() {
|
| + // The message loop for this thread.
|
| + // Allocated on the heap to centralize any leak reports at this line.
|
| + scoped_ptr<MessageLoop> message_loop;
|
| + if (!startup_data_->message_pump_factory.is_null()) {
|
| + message_loop.reset(
|
| + new MessageLoop(startup_data_->message_pump_factory.Run()));
|
| + } else {
|
| + message_loop.reset(
|
| + new MessageLoop(startup_data_->message_loop_type));
|
| + }
|
| +
|
| + // Complete the initialization of our Thread object.
|
| + DCHECK_EQ(thread_id(), PlatformThread::CurrentId());
|
| + PlatformThread::SetName(name_.c_str());
|
| + ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
|
| + message_loop->set_thread_name(name_);
|
| + message_loop->SetTimerSlack(startup_data_->timer_slack);
|
| + message_loop_ = message_loop.get();
|
| +
|
| + static_cast<ProxyTaskRunner*>(task_runner_.get())->SetInternalTaskRunner(
|
| + message_loop_->task_runner());
|
| +
|
| {
|
| - // The message loop for this thread.
|
| - // Allocated on the heap to centralize any leak reports at this line.
|
| - scoped_ptr<MessageLoop> message_loop;
|
| - if (!startup_data_->options.message_pump_factory.is_null()) {
|
| - message_loop.reset(
|
| - new MessageLoop(startup_data_->options.message_pump_factory.Run()));
|
| - } else {
|
| - message_loop.reset(
|
| - new MessageLoop(startup_data_->options.message_loop_type));
|
| - }
|
| + base::AutoLock locker(task_runner_lock_);
|
| + task_runner_ = message_loop_->task_runner();
|
| + }
|
|
|
| - // Complete the initialization of our Thread object.
|
| - thread_id_ = PlatformThread::CurrentId();
|
| - PlatformThread::SetName(name_.c_str());
|
| - ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
|
| - message_loop->set_thread_name(name_);
|
| - message_loop->SetTimerSlack(startup_data_->options.timer_slack);
|
| - message_loop_ = message_loop.get();
|
| + startup_data_.reset();
|
|
|
| #if defined(OS_WIN)
|
| - scoped_ptr<win::ScopedCOMInitializer> com_initializer;
|
| - if (com_status_ != NONE) {
|
| - com_initializer.reset((com_status_ == STA) ?
|
| - new win::ScopedCOMInitializer() :
|
| - new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
|
| - }
|
| + scoped_ptr<win::ScopedCOMInitializer> com_initializer;
|
| + if (com_status_ != NONE) {
|
| + com_initializer.reset((com_status_ == STA) ?
|
| + new win::ScopedCOMInitializer() :
|
| + new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
|
| + }
|
| #endif
|
|
|
| - // Let the thread do extra initialization.
|
| - // Let's do this before signaling we are started.
|
| - Init();
|
| + // Let the thread do extra initialization.
|
| + Init();
|
|
|
| + {
|
| + AutoLock lock(lock_);
|
| running_ = true;
|
| - startup_data_->event.Signal();
|
| - // startup_data_ can't be touched anymore since the starting thread is now
|
| - // unlocked.
|
| + }
|
| +
|
| + start_event_->Signal();
|
|
|
| - Run(message_loop_);
|
| + Run(message_loop_);
|
| +
|
| + {
|
| + AutoLock lock(lock_);
|
| running_ = false;
|
| + }
|
|
|
| - // Let the thread do extra cleanup.
|
| - CleanUp();
|
| + // Let the thread do extra cleanup.
|
| + CleanUp();
|
|
|
| #if defined(OS_WIN)
|
| - com_initializer.reset();
|
| + com_initializer.reset();
|
| #endif
|
|
|
| - // Assert that MessageLoop::Quit was called by ThreadQuitHelper.
|
| - DCHECK(GetThreadWasQuitProperly());
|
| + // Assert that MessageLoop::Quit was called by ThreadQuitHelper.
|
| + DCHECK(GetThreadWasQuitProperly());
|
|
|
| - // We can't receive messages anymore.
|
| - message_loop_ = NULL;
|
| - }
|
| + // We can't receive messages anymore.
|
| + // (The message loop is destructed at the end of this block)
|
| + message_loop_ = NULL;
|
| }
|
|
|
| } // namespace base
|
|
|