Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(256)

Unified Diff: mojo/common/handle_watcher.cc

Issue 506353002: Make HandleWatcher watch on the same thread if the thread is running a MessagePumpMojo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/common/handle_watcher.cc
diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc
index 33bb0bdeaa7daae80a0c74e46b688052b1164c9c..dfcd1d914353fb943ed9a0288fe2bf0c9334ad04 100644
--- a/mojo/common/handle_watcher.cc
+++ b/mojo/common/handle_watcher.cc
@@ -9,6 +9,7 @@
#include "base/atomic_sequence_num.h"
#include "base/bind.h"
#include "base/lazy_instance.h"
+#include "base/logging.h"
#include "base/memory/singleton.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
@@ -31,14 +32,6 @@ namespace {
const char kWatcherThreadName[] = "handle-watcher-thread";
-// TODO(sky): this should be unnecessary once MessageLoop has been refactored.
-MessagePumpMojo* message_pump_mojo = NULL;
-
-scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
- message_pump_mojo = new MessagePumpMojo;
- return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
-}
-
base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
@@ -106,9 +99,9 @@ void WatcherBackend::StartWatching(const WatchData& data) {
DCHECK_EQ(0u, handle_to_data_.count(data.handle));
handle_to_data_[data.handle] = data;
- message_pump_mojo->AddHandler(this, data.handle,
- data.handle_signals,
- data.deadline);
+ MessagePumpMojo::current()->AddHandler(this, data.handle,
+ data.handle_signals,
+ data.deadline);
}
void WatcherBackend::StopWatching(WatcherID watcher_id) {
@@ -117,7 +110,7 @@ void WatcherBackend::StopWatching(WatcherID watcher_id) {
Handle handle;
if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
handle_to_data_.erase(handle);
- message_pump_mojo->RemoveHandler(handle);
+ MessagePumpMojo::current()->RemoveHandler(handle);
}
}
@@ -128,7 +121,7 @@ void WatcherBackend::RemoveAndNotify(const Handle& handle,
const WatchData data(handle_to_data_[handle]);
handle_to_data_.erase(handle);
- message_pump_mojo->RemoveHandler(handle);
+ MessagePumpMojo::current()->RemoveHandler(handle);
data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
}
@@ -312,58 +305,42 @@ void WatcherThreadManager::ProcessRequestsOnBackendThread() {
WatcherThreadManager::WatcherThreadManager()
: thread_(kWatcherThreadName) {
base::Thread::Options thread_options;
- thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
+ thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
thread_.StartWithOptions(thread_options);
}
-// HandleWatcher::State --------------------------------------------------------
+// HandleWatcher::StateBase and subclasses -------------------------------------
-// Represents the state of the HandleWatcher. Owns the user's callback and
+// The base class of HandleWatcher's state. Owns the user's callback and
// monitors the current thread's MessageLoop to know when to force the callback
// to run (with an error) even though the pipe hasn't been signaled yet.
-class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
+class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
sky 2014/08/27 21:37:27 Is the DestructionObserver code necessary for Same
yzshen1 2014/08/27 22:31:48 Thanks for the suggestion! According to discussion
public:
- State(HandleWatcher* watcher,
- const Handle& handle,
- MojoHandleSignals handle_signals,
- MojoDeadline deadline,
- const base::Callback<void(MojoResult)>& callback)
+ StateBase(HandleWatcher* watcher,
+ const base::Callback<void(MojoResult)>& callback)
: watcher_(watcher),
callback_(callback),
- got_ready_(false),
- weak_factory_(this) {
+ got_ready_(false) {
base::MessageLoop::current()->AddDestructionObserver(this);
-
- watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
- handle,
- handle_signals,
- MojoDeadlineToTimeTicks(deadline),
- base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
}
- virtual ~State() {
+ virtual ~StateBase() {
base::MessageLoop::current()->RemoveDestructionObserver(this);
+ }
- // If we've been notified the handle is ready (|got_ready_| is true) then
- // the watch has been implicitly removed by
- // WatcherThreadManager/MessagePumpMojo and we don't have to call
- // StopWatching(). To do so would needlessly entail posting a task and
- // blocking until the background thread services it.
- if (!got_ready_)
- WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
+ void NotifyHandleReady(MojoResult result) {
+ got_ready_ = true;
+ NotifyAndDestroy(result);
}
+ bool got_ready() const { return got_ready_; }
+
private:
virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
// The current thread is exiting. Simulate a watch error.
NotifyAndDestroy(MOJO_RESULT_ABORTED);
}
- void OnHandleReady(MojoResult result) {
- got_ready_ = true;
- NotifyAndDestroy(result);
- }
-
void NotifyAndDestroy(MojoResult result) {
base::Callback<void(MojoResult)> callback = callback_;
watcher_->Stop(); // Destroys |this|.
@@ -372,14 +349,81 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
}
HandleWatcher* watcher_;
- WatcherID watcher_id_;
base::Callback<void(MojoResult)> callback_;
// Have we been notified that the handle is ready?
bool got_ready_;
+};
sky 2014/08/27 21:37:27 DISALLOW_...
yzshen1 2014/08/27 22:31:48 Done.
+
+class HandleWatcher::SameThreadWatchingState : public StateBase,
sky 2014/08/27 21:37:27 Add description, especially since same thread is n
+ public MessagePumpMojoHandler {
+ public:
+ SameThreadWatchingState(HandleWatcher* watcher,
+ const Handle& handle,
+ MojoHandleSignals handle_signals,
+ MojoDeadline deadline,
+ const base::Callback<void(MojoResult)>& callback)
+ : StateBase(watcher, callback),
+ handle_(handle) {
+ DCHECK(MessagePumpMojo::IsCurrent());
+
+ MessagePumpMojo::current()->AddHandler(
+ this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
+ }
+
+ virtual ~SameThreadWatchingState() {
+ if (!got_ready())
+ MessagePumpMojo::current()->RemoveHandler(handle_);
+ }
+
+ private:
+ // MessagePumpMojoHandler overrides:
+ virtual void OnHandleReady(const Handle& handle) OVERRIDE {
+ DCHECK_EQ(handle.value(), handle_.value());
+ MessagePumpMojo::current()->RemoveHandler(handle_);
+ NotifyHandleReady(MOJO_RESULT_OK);
+ }
+
+ virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
+ DCHECK_EQ(handle.value(), handle_.value());
+ MessagePumpMojo::current()->RemoveHandler(handle_);
+ NotifyHandleReady(result);
+ }
+
+ Handle handle_;
+};
sky 2014/08/27 21:37:27 DISALLOW..
yzshen1 2014/08/27 22:31:48 Done.
+
+class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
+ public:
+ SecondaryThreadWatchingState(HandleWatcher* watcher,
+ const Handle& handle,
+ MojoHandleSignals handle_signals,
+ MojoDeadline deadline,
+ const base::Callback<void(MojoResult)>& callback)
+ : StateBase(watcher, callback),
+ weak_factory_(this) {
+ watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
+ handle,
+ handle_signals,
+ MojoDeadlineToTimeTicks(deadline),
+ base::Bind(&StateBase::NotifyHandleReady, weak_factory_.GetWeakPtr()));
+ }
+
+ virtual ~SecondaryThreadWatchingState() {
+ // If we've been notified the handle is ready (|got_ready()| is true) then
+ // the watch has been implicitly removed by
+ // WatcherThreadManager/MessagePumpMojo and we don't have to call
+ // StopWatching(). To do so would needlessly entail posting a task and
+ // blocking until the background thread services it.
+ if (!got_ready())
+ WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
+ }
+
+ private:
+ WatcherID watcher_id_;
// Used to weakly bind |this| to the WatcherThreadManager.
- base::WeakPtrFactory<State> weak_factory_;
+ base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
};
sky 2014/08/27 21:37:27 DISALLOW...
yzshen1 2014/08/27 22:31:48 Done.
// HandleWatcher ---------------------------------------------------------------
@@ -397,7 +441,13 @@ void HandleWatcher::Start(const Handle& handle,
DCHECK(handle.is_valid());
DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
- state_.reset(new State(this, handle, handle_signals, deadline, callback));
+ if (MessagePumpMojo::IsCurrent()) {
+ state_.reset(new SameThreadWatchingState(
+ this, handle, handle_signals, deadline, callback));
+ } else {
+ state_.reset(new SecondaryThreadWatchingState(
+ this, handle, handle_signals, deadline, callback));
+ }
}
void HandleWatcher::Stop() {

Powered by Google App Engine
This is Rietveld 408576698