| Index: base/threading/thread.cc
|
| diff --git a/base/threading/thread.cc b/base/threading/thread.cc
|
| index d42ba4d1577ae4abcd9ad8a8bb5576036a01b5b0..2b1df6a5c7caedba22bff10f566d7e1005ac8a8c 100644
|
| --- a/base/threading/thread.cc
|
| +++ b/base/threading/thread.cc
|
| @@ -6,6 +6,7 @@
|
|
|
| #include "base/bind.h"
|
| #include "base/lazy_instance.h"
|
| +#include "base/message_loop/incoming_task_queue.h"
|
| #include "base/profiler/scoped_tracker.h"
|
| #include "base/synchronization/waitable_event.h"
|
| #include "base/third_party/dynamic_annotations/dynamic_annotations.h"
|
| @@ -36,18 +37,51 @@ void ThreadQuitHelper() {
|
| Thread::SetThreadWasQuitProperly(true);
|
| }
|
|
|
| -// Used to pass data to ThreadMain. This structure is allocated on the stack
|
| -// from within StartWithOptions.
|
| -struct Thread::StartupData {
|
| - // We get away with a const reference here because of how we are allocated.
|
| - const Thread::Options& options;
|
| +// Provides TaskRunner interface on top of IncomingTaskQueue.
|
| +class Thread::IncomingTaskQueueRunner : public SingleThreadTaskRunner {
|
| + public:
|
| + IncomingTaskQueueRunner()
|
| + : incoming_task_queue_(new internal::IncomingTaskQueue()),
|
| + valid_thread_id_(kInvalidThreadId) {}
|
|
|
| - // Used to synchronize thread startup.
|
| - WaitableEvent event;
|
| + void AttachThread() {
|
| + valid_thread_id_ = PlatformThread::CurrentId();
|
| + }
|
| +
|
| + internal::IncomingTaskQueue* incoming_task_queue() {
|
| + return incoming_task_queue_.get();
|
| + }
|
| +
|
| + protected:
|
| + ~IncomingTaskQueueRunner() override {}
|
| +
|
| + // TaskRunner override:
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + base::TimeDelta delay) override {
|
| + DCHECK(!task.is_null()) << from_here.ToString();
|
| + return incoming_task_queue_->AddToIncomingQueue(
|
| + from_here, task, delay, true);
|
| + }
|
| +
|
| + bool RunsTasksOnCurrentThread() const override {
|
| + return valid_thread_id_ == PlatformThread::CurrentId();
|
| + }
|
|
|
| - explicit StartupData(const Options& opt)
|
| - : options(opt),
|
| - event(false, false) {}
|
| + // SequencedTaskRunner override:
|
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + base::TimeDelta delay) override {
|
| + DCHECK(!task.is_null()) << from_here.ToString();
|
| + return incoming_task_queue_->AddToIncomingQueue(
|
| + from_here, task, delay, false);
|
| + }
|
| +
|
| + private:
|
| + scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;
|
| +
|
| + // ID of the thread the task posted to this runner should run.
|
| + PlatformThreadId valid_thread_id_;
|
| };
|
|
|
| Thread::Options::Options()
|
| @@ -71,13 +105,10 @@ Thread::Thread(const std::string& name)
|
| #if defined(OS_WIN)
|
| com_status_(NONE),
|
| #endif
|
| - started_(false),
|
| stopping_(false),
|
| running_(false),
|
| - startup_data_(NULL),
|
| thread_(0),
|
| - message_loop_(NULL),
|
| - thread_id_(kInvalidThreadId),
|
| + message_loop_(nullptr),
|
| name_(name) {
|
| }
|
|
|
| @@ -103,34 +134,42 @@ bool Thread::StartWithOptions(const Options& options) {
|
|
|
| SetThreadWasQuitProperly(false);
|
|
|
| - StartupData startup_data(options);
|
| - startup_data_ = &startup_data;
|
| + task_runner_ = new IncomingTaskQueueRunner();
|
| +
|
| + startup_data_.reset(new Options(options));
|
| + start_event_.reset(new WaitableEvent(false, false));
|
|
|
| if (!PlatformThread::Create(options.stack_size, this, &thread_)) {
|
| DLOG(ERROR) << "failed to create thread";
|
| - startup_data_ = NULL;
|
| + start_event_.reset();
|
| return false;
|
| }
|
|
|
| + return true;
|
| +}
|
| +
|
| +bool Thread::StartAndWait() {
|
| + bool result = Start();
|
| + if (!result)
|
| + return result;
|
| + WaitUntilThreadStarted();
|
| + return result;
|
| +}
|
| +
|
| +bool Thread::WaitUntilThreadStarted() {
|
| + if (!start_event_)
|
| + return false;
|
| // TODO(kinuko): Remove once crbug.com/465458 is solved.
|
| - tracked_objects::ScopedTracker tracking_profile_wait(
|
| + tracked_objects::ScopedTracker tracking_profile(
|
| FROM_HERE_WITH_EXPLICIT_FUNCTION(
|
| - "465458 base::Thread::StartWithOptions (Wait)"));
|
| -
|
| - // Wait for the thread to start and initialize message_loop_
|
| + "465458 base::Thread::WaitUntilThreadStarted()"));
|
| base::ThreadRestrictions::ScopedAllowWait allow_wait;
|
| - startup_data.event.Wait();
|
| -
|
| - // set it to NULL so we don't keep a pointer to some object on the stack.
|
| - startup_data_ = NULL;
|
| - started_ = true;
|
| -
|
| - DCHECK(message_loop_);
|
| + start_event_->Wait();
|
| return true;
|
| }
|
|
|
| void Thread::Stop() {
|
| - if (!started_)
|
| + if (!start_event_)
|
| return;
|
|
|
| StopSoon();
|
| @@ -146,7 +185,7 @@ void Thread::Stop() {
|
| DCHECK(!message_loop_);
|
|
|
| // The thread no longer needs to be joined.
|
| - started_ = false;
|
| + start_event_.reset();
|
|
|
| stopping_ = false;
|
| }
|
| @@ -154,25 +193,26 @@ void Thread::Stop() {
|
| void Thread::StopSoon() {
|
| // We should only be called on the same thread that started us.
|
|
|
| - // Reading thread_id_ without a lock can lead to a benign data race
|
| - // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer.
|
| - DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId());
|
| + DCHECK_NE(thread_id(), PlatformThread::CurrentId());
|
|
|
| - if (stopping_ || !message_loop_)
|
| + if (stopping_ || !start_event_)
|
| return;
|
|
|
| stopping_ = true;
|
| - message_loop_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
|
| + task_runner_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
|
| }
|
|
|
| bool Thread::IsRunning() const {
|
| + if (start_event_ && !stopping_)
|
| + return true;
|
| + AutoLock lock(lock_);
|
| return running_;
|
| }
|
|
|
| void Thread::SetPriority(ThreadPriority priority) {
|
| // The thread must be started (and id known) for this to be
|
| // compatible with all platforms.
|
| - DCHECK_NE(thread_id_, kInvalidThreadId);
|
| + DCHECK(!!start_event_);
|
| PlatformThread::SetThreadPriority(thread_, priority);
|
| }
|
|
|
| @@ -193,60 +233,70 @@ bool Thread::GetThreadWasQuitProperly() {
|
| }
|
|
|
| void Thread::ThreadMain() {
|
| - {
|
| - // The message loop for this thread.
|
| - // Allocated on the heap to centralize any leak reports at this line.
|
| - scoped_ptr<MessageLoop> message_loop;
|
| - if (!startup_data_->options.message_pump_factory.is_null()) {
|
| - message_loop.reset(
|
| - new MessageLoop(startup_data_->options.message_pump_factory.Run()));
|
| - } else {
|
| - message_loop.reset(
|
| - new MessageLoop(startup_data_->options.message_loop_type));
|
| - }
|
| -
|
| - // Complete the initialization of our Thread object.
|
| - thread_id_ = PlatformThread::CurrentId();
|
| - PlatformThread::SetName(name_.c_str());
|
| - ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
|
| - message_loop->set_thread_name(name_);
|
| - message_loop->SetTimerSlack(startup_data_->options.timer_slack);
|
| - message_loop_ = message_loop.get();
|
| + IncomingTaskQueueRunner* incoming_task_queue_runner =
|
| + static_cast<IncomingTaskQueueRunner*>(task_runner_.get());
|
| +
|
| + // The message loop for this thread.
|
| + // Allocated on the heap to centralize any leak reports at this line.
|
| + scoped_ptr<MessageLoop> message_loop;
|
| + MessageLoop::Options options;
|
| + options.message_loop_type = startup_data_->message_loop_type;
|
| + options.message_pump_factory = startup_data_->message_pump_factory;
|
| + options.incoming_task_queue =
|
| + incoming_task_queue_runner->incoming_task_queue();
|
| + message_loop.reset(new MessageLoop(options));
|
| +
|
| + incoming_task_queue_runner->AttachThread();
|
| +
|
| + // Complete the initialization of our Thread object.
|
| + DCHECK_EQ(thread_id(), PlatformThread::CurrentId());
|
| + PlatformThread::SetName(name_.c_str());
|
| + ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
|
| + message_loop->set_thread_name(name_);
|
| + message_loop->SetTimerSlack(startup_data_->timer_slack);
|
| + message_loop_ = message_loop.get();
|
| +
|
| + startup_data_.reset();
|
|
|
| #if defined(OS_WIN)
|
| - scoped_ptr<win::ScopedCOMInitializer> com_initializer;
|
| - if (com_status_ != NONE) {
|
| - com_initializer.reset((com_status_ == STA) ?
|
| - new win::ScopedCOMInitializer() :
|
| - new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
|
| - }
|
| + scoped_ptr<win::ScopedCOMInitializer> com_initializer;
|
| + if (com_status_ != NONE) {
|
| + com_initializer.reset((com_status_ == STA) ?
|
| + new win::ScopedCOMInitializer() :
|
| + new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
|
| + }
|
| #endif
|
|
|
| - // Let the thread do extra initialization.
|
| - // Let's do this before signaling we are started.
|
| - Init();
|
| + // Let the thread do extra initialization.
|
| + Init();
|
|
|
| + {
|
| + AutoLock lock(lock_);
|
| running_ = true;
|
| - startup_data_->event.Signal();
|
| - // startup_data_ can't be touched anymore since the starting thread is now
|
| - // unlocked.
|
| + }
|
| +
|
| + start_event_->Signal();
|
| +
|
| + Run(message_loop_);
|
|
|
| - Run(message_loop_);
|
| + {
|
| + AutoLock lock(lock_);
|
| running_ = false;
|
| + }
|
|
|
| - // Let the thread do extra cleanup.
|
| - CleanUp();
|
| + // Let the thread do extra cleanup.
|
| + CleanUp();
|
|
|
| #if defined(OS_WIN)
|
| - com_initializer.reset();
|
| + com_initializer.reset();
|
| #endif
|
|
|
| - // Assert that MessageLoop::Quit was called by ThreadQuitHelper.
|
| - DCHECK(GetThreadWasQuitProperly());
|
| + // Assert that MessageLoop::Quit was called by ThreadQuitHelper.
|
| + DCHECK(GetThreadWasQuitProperly());
|
|
|
| - // We can't receive messages anymore.
|
| - message_loop_ = NULL;
|
| - }
|
| + // We can't receive messages anymore.
|
| + // (The message loop is destructed at the end of this block)
|
| + message_loop_ = NULL;
|
| }
|
|
|
| } // namespace base
|
|
|