Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3699)

Unified Diff: base/threading/thread.cc

Issue 1086663002: [Approach 2] Accumulate tasks in a proxy task runner until thread starts. Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/threading/thread.h ('k') | base/threading/thread_id_name_manager_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/thread.cc
diff --git a/base/threading/thread.cc b/base/threading/thread.cc
index d42ba4d1577ae4abcd9ad8a8bb5576036a01b5b0..19bd8ea115cf3e775b03d4b5b864efc26d1d1e16 100644
--- a/base/threading/thread.cc
+++ b/base/threading/thread.cc
@@ -4,9 +4,12 @@
#include "base/threading/thread.h"
+#include <queue>
+
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/profiler/scoped_tracker.h"
+#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/third_party/dynamic_annotations/dynamic_annotations.h"
#include "base/threading/thread_id_name_manager.h"
@@ -36,18 +39,93 @@ void ThreadQuitHelper() {
Thread::SetThreadWasQuitProperly(true);
}
-// Used to pass data to ThreadMain. This structure is allocated on the stack
-// from within StartWithOptions.
-struct Thread::StartupData {
- // We get away with a const reference here because of how we are allocated.
- const Thread::Options& options;
+// Accumulates incoming tasks until SetInternalTaskRunner() is called.
+// Once the internal task runner is set it forwards the posted tasks to
+// the internal one.
+class Thread::ProxyTaskRunner : public SingleThreadTaskRunner {
+ public:
+ ProxyTaskRunner() {}
+
+ void SetInternalTaskRunner(
+ const scoped_refptr<SingleThreadTaskRunner>& task_runner) {
+ AutoLock locker(lock_);
+ DCHECK(!internal_task_runner_);
+ // Naively repost all queued tasks to the internal one. Note that
+ // a task posted with X delay will run with X + Y delay if this method
+ // is called after Y.
+ internal_task_runner_ = task_runner;
+ while (!task_queue_.empty()) {
+ const Task& task = task_queue_.front();
+ if (task.nestable) {
+ internal_task_runner_->PostDelayedTask(
+ task.from_here, task.task, task.delay);
+ } else {
+ internal_task_runner_->PostNonNestableDelayedTask(
+ task.from_here, task.task, task.delay);
+ }
+ task_queue_.pop();
+ }
+ }
+
+ protected:
+ ~ProxyTaskRunner() override {}
+
+ // ProxyTaskRunner override:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& task,
+ base::TimeDelta delay) override {
+ AutoLock locker(lock_);
+ DCHECK(!task.is_null()) << from_here.ToString();
+ if (internal_task_runner_)
+ return internal_task_runner_->PostDelayedTask(from_here, task, delay);
+ task_queue_.push(Task(from_here, task, delay, true));
+ return true;
+ }
- // Used to synchronize thread startup.
- WaitableEvent event;
+ bool RunsTasksOnCurrentThread() const override {
+ return internal_task_runner_
+ ? internal_task_runner_->RunsTasksOnCurrentThread()
+ : false;
+ }
+
+ // SequencedTaskRunner override:
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& task,
+ base::TimeDelta delay) override {
+ AutoLock locker(lock_);
+ DCHECK(!task.is_null()) << from_here.ToString();
+ if (internal_task_runner_) {
+ return internal_task_runner_->PostNonNestableDelayedTask(
+ from_here, task, delay);
+ }
+ task_queue_.push(Task(from_here, task, delay, false));
+ return true;
+ }
- explicit StartupData(const Options& opt)
- : options(opt),
- event(false, false) {}
+ private:
+ struct Task {
+ Task(const tracked_objects::Location& from_here,
+ const Closure& task,
+ base::TimeDelta delay,
+ bool nestable)
+ : from_here(from_here),
+ task(task),
+ delay(delay),
+ nestable(nestable) {}
+ Task() : nestable(false) {}
+ ~Task() {}
+
+ tracked_objects::Location from_here;
+ Closure task;
+ base::TimeDelta delay;
+ bool nestable;
+ };
+
+ base::Lock lock_;
+ std::queue<Task> task_queue_;
+ scoped_refptr<SingleThreadTaskRunner> internal_task_runner_;
+
+ DISALLOW_COPY_AND_ASSIGN(ProxyTaskRunner);
};
Thread::Options::Options()
@@ -71,13 +149,10 @@ Thread::Thread(const std::string& name)
#if defined(OS_WIN)
com_status_(NONE),
#endif
- started_(false),
stopping_(false),
running_(false),
- startup_data_(NULL),
thread_(0),
- message_loop_(NULL),
- thread_id_(kInvalidThreadId),
+ message_loop_(nullptr),
name_(name) {
}
@@ -103,34 +178,42 @@ bool Thread::StartWithOptions(const Options& options) {
SetThreadWasQuitProperly(false);
- StartupData startup_data(options);
- startup_data_ = &startup_data;
+ task_runner_ = new ProxyTaskRunner();
+
+ startup_data_.reset(new Options(options));
+ start_event_.reset(new WaitableEvent(false, false));
if (!PlatformThread::Create(options.stack_size, this, &thread_)) {
DLOG(ERROR) << "failed to create thread";
- startup_data_ = NULL;
+ start_event_.reset();
return false;
}
+ return true;
+}
+
+bool Thread::StartAndWait() {
+ bool result = Start();
+ if (!result)
+ return result;
+ WaitUntilThreadStarted();
+ return result;
+}
+
+bool Thread::WaitUntilThreadStarted() {
+ if (!start_event_)
+ return false;
// TODO(kinuko): Remove once crbug.com/465458 is solved.
- tracked_objects::ScopedTracker tracking_profile_wait(
+ tracked_objects::ScopedTracker tracking_profile(
FROM_HERE_WITH_EXPLICIT_FUNCTION(
- "465458 base::Thread::StartWithOptions (Wait)"));
-
- // Wait for the thread to start and initialize message_loop_
+ "465458 base::Thread::WaitUntilThreadStarted()"));
base::ThreadRestrictions::ScopedAllowWait allow_wait;
- startup_data.event.Wait();
-
- // set it to NULL so we don't keep a pointer to some object on the stack.
- startup_data_ = NULL;
- started_ = true;
-
- DCHECK(message_loop_);
+ start_event_->Wait();
return true;
}
void Thread::Stop() {
- if (!started_)
+ if (!start_event_)
return;
StopSoon();
@@ -146,7 +229,7 @@ void Thread::Stop() {
DCHECK(!message_loop_);
// The thread no longer needs to be joined.
- started_ = false;
+ start_event_.reset();
stopping_ = false;
}
@@ -154,25 +237,26 @@ void Thread::Stop() {
void Thread::StopSoon() {
// We should only be called on the same thread that started us.
- // Reading thread_id_ without a lock can lead to a benign data race
- // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer.
- DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId());
+ DCHECK_NE(thread_id(), PlatformThread::CurrentId());
- if (stopping_ || !message_loop_)
+ if (stopping_ || !start_event_)
return;
stopping_ = true;
- message_loop_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
+ task_runner_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
}
bool Thread::IsRunning() const {
+ if (start_event_ && !stopping_)
+ return true;
+ AutoLock lock(lock_);
return running_;
}
void Thread::SetPriority(ThreadPriority priority) {
// The thread must be started (and id known) for this to be
// compatible with all platforms.
- DCHECK_NE(thread_id_, kInvalidThreadId);
+ DCHECK(!!start_event_);
PlatformThread::SetThreadPriority(thread_, priority);
}
@@ -193,60 +277,74 @@ bool Thread::GetThreadWasQuitProperly() {
}
void Thread::ThreadMain() {
+ // The message loop for this thread.
+ // Allocated on the heap to centralize any leak reports at this line.
+ scoped_ptr<MessageLoop> message_loop;
+ if (!startup_data_->message_pump_factory.is_null()) {
+ message_loop.reset(
+ new MessageLoop(startup_data_->message_pump_factory.Run()));
+ } else {
+ message_loop.reset(
+ new MessageLoop(startup_data_->message_loop_type));
+ }
+
+ // Complete the initialization of our Thread object.
+ DCHECK_EQ(thread_id(), PlatformThread::CurrentId());
+ PlatformThread::SetName(name_.c_str());
+ ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
+ message_loop->set_thread_name(name_);
+ message_loop->SetTimerSlack(startup_data_->timer_slack);
+ message_loop_ = message_loop.get();
+
+ static_cast<ProxyTaskRunner*>(task_runner_.get())->SetInternalTaskRunner(
+ message_loop_->task_runner());
+
{
- // The message loop for this thread.
- // Allocated on the heap to centralize any leak reports at this line.
- scoped_ptr<MessageLoop> message_loop;
- if (!startup_data_->options.message_pump_factory.is_null()) {
- message_loop.reset(
- new MessageLoop(startup_data_->options.message_pump_factory.Run()));
- } else {
- message_loop.reset(
- new MessageLoop(startup_data_->options.message_loop_type));
- }
+ base::AutoLock locker(task_runner_lock_);
+ task_runner_ = message_loop_->task_runner();
+ }
- // Complete the initialization of our Thread object.
- thread_id_ = PlatformThread::CurrentId();
- PlatformThread::SetName(name_.c_str());
- ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
- message_loop->set_thread_name(name_);
- message_loop->SetTimerSlack(startup_data_->options.timer_slack);
- message_loop_ = message_loop.get();
+ startup_data_.reset();
#if defined(OS_WIN)
- scoped_ptr<win::ScopedCOMInitializer> com_initializer;
- if (com_status_ != NONE) {
- com_initializer.reset((com_status_ == STA) ?
- new win::ScopedCOMInitializer() :
- new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
- }
+ scoped_ptr<win::ScopedCOMInitializer> com_initializer;
+ if (com_status_ != NONE) {
+ com_initializer.reset((com_status_ == STA) ?
+ new win::ScopedCOMInitializer() :
+ new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
+ }
#endif
- // Let the thread do extra initialization.
- // Let's do this before signaling we are started.
- Init();
+ // Let the thread do extra initialization.
+ Init();
+ {
+ AutoLock lock(lock_);
running_ = true;
- startup_data_->event.Signal();
- // startup_data_ can't be touched anymore since the starting thread is now
- // unlocked.
+ }
+
+ start_event_->Signal();
- Run(message_loop_);
+ Run(message_loop_);
+
+ {
+ AutoLock lock(lock_);
running_ = false;
+ }
- // Let the thread do extra cleanup.
- CleanUp();
+ // Let the thread do extra cleanup.
+ CleanUp();
#if defined(OS_WIN)
- com_initializer.reset();
+ com_initializer.reset();
#endif
- // Assert that MessageLoop::Quit was called by ThreadQuitHelper.
- DCHECK(GetThreadWasQuitProperly());
+ // Assert that MessageLoop::Quit was called by ThreadQuitHelper.
+ DCHECK(GetThreadWasQuitProperly());
- // We can't receive messages anymore.
- message_loop_ = NULL;
- }
+ // We can't receive messages anymore.
+ // (The message loop is destructed at the end of this block)
+ message_loop_ = NULL;
}
} // namespace base
« no previous file with comments | « base/threading/thread.h ('k') | base/threading/thread_id_name_manager_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698