| Index: mojo/common/handle_watcher.cc
|
| diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc
|
| index 33bb0bdeaa7daae80a0c74e46b688052b1164c9c..eeb3e043b4e3d7f41105d408f1d16fbfd2730fec 100644
|
| --- a/mojo/common/handle_watcher.cc
|
| +++ b/mojo/common/handle_watcher.cc
|
| @@ -9,6 +9,8 @@
|
| #include "base/atomic_sequence_num.h"
|
| #include "base/bind.h"
|
| #include "base/lazy_instance.h"
|
| +#include "base/logging.h"
|
| +#include "base/macros.h"
|
| #include "base/memory/singleton.h"
|
| #include "base/memory/weak_ptr.h"
|
| #include "base/message_loop/message_loop.h"
|
| @@ -31,14 +33,6 @@ namespace {
|
|
|
| const char kWatcherThreadName[] = "handle-watcher-thread";
|
|
|
| -// TODO(sky): this should be unnecessary once MessageLoop has been refactored.
|
| -MessagePumpMojo* message_pump_mojo = NULL;
|
| -
|
| -scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
|
| - message_pump_mojo = new MessagePumpMojo;
|
| - return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
|
| -}
|
| -
|
| base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
|
| return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
|
| internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
|
| @@ -106,9 +100,9 @@ void WatcherBackend::StartWatching(const WatchData& data) {
|
| DCHECK_EQ(0u, handle_to_data_.count(data.handle));
|
|
|
| handle_to_data_[data.handle] = data;
|
| - message_pump_mojo->AddHandler(this, data.handle,
|
| - data.handle_signals,
|
| - data.deadline);
|
| + MessagePumpMojo::current()->AddHandler(this, data.handle,
|
| + data.handle_signals,
|
| + data.deadline);
|
| }
|
|
|
| void WatcherBackend::StopWatching(WatcherID watcher_id) {
|
| @@ -117,7 +111,7 @@ void WatcherBackend::StopWatching(WatcherID watcher_id) {
|
| Handle handle;
|
| if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
|
| handle_to_data_.erase(handle);
|
| - message_pump_mojo->RemoveHandler(handle);
|
| + MessagePumpMojo::current()->RemoveHandler(handle);
|
| }
|
| }
|
|
|
| @@ -128,7 +122,7 @@ void WatcherBackend::RemoveAndNotify(const Handle& handle,
|
|
|
| const WatchData data(handle_to_data_[handle]);
|
| handle_to_data_.erase(handle);
|
| - message_pump_mojo->RemoveHandler(handle);
|
| + MessagePumpMojo::current()->RemoveHandler(handle);
|
| data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
|
| }
|
|
|
| @@ -312,58 +306,43 @@ void WatcherThreadManager::ProcessRequestsOnBackendThread() {
|
| WatcherThreadManager::WatcherThreadManager()
|
| : thread_(kWatcherThreadName) {
|
| base::Thread::Options thread_options;
|
| - thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
|
| + thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
|
| thread_.StartWithOptions(thread_options);
|
| }
|
|
|
| -// HandleWatcher::State --------------------------------------------------------
|
| +// HandleWatcher::StateBase and subclasses -------------------------------------
|
|
|
| -// Represents the state of the HandleWatcher. Owns the user's callback and
|
| +// The base class of HandleWatcher's state. Owns the user's callback and
|
| // monitors the current thread's MessageLoop to know when to force the callback
|
| // to run (with an error) even though the pipe hasn't been signaled yet.
|
| -class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
|
| +class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
|
| public:
|
| - State(HandleWatcher* watcher,
|
| - const Handle& handle,
|
| - MojoHandleSignals handle_signals,
|
| - MojoDeadline deadline,
|
| - const base::Callback<void(MojoResult)>& callback)
|
| + StateBase(HandleWatcher* watcher,
|
| + const base::Callback<void(MojoResult)>& callback)
|
| : watcher_(watcher),
|
| callback_(callback),
|
| - got_ready_(false),
|
| - weak_factory_(this) {
|
| + got_ready_(false) {
|
| base::MessageLoop::current()->AddDestructionObserver(this);
|
| -
|
| - watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
|
| - handle,
|
| - handle_signals,
|
| - MojoDeadlineToTimeTicks(deadline),
|
| - base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
|
| }
|
|
|
| - virtual ~State() {
|
| + virtual ~StateBase() {
|
| base::MessageLoop::current()->RemoveDestructionObserver(this);
|
| + }
|
|
|
| - // If we've been notified the handle is ready (|got_ready_| is true) then
|
| - // the watch has been implicitly removed by
|
| - // WatcherThreadManager/MessagePumpMojo and we don't have to call
|
| - // StopWatching(). To do so would needlessly entail posting a task and
|
| - // blocking until the background thread services it.
|
| - if (!got_ready_)
|
| - WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
|
| + protected:
|
| + void NotifyHandleReady(MojoResult result) {
|
| + got_ready_ = true;
|
| + NotifyAndDestroy(result);
|
| }
|
|
|
| + bool got_ready() const { return got_ready_; }
|
| +
|
| private:
|
| virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
|
| // The current thread is exiting. Simulate a watch error.
|
| NotifyAndDestroy(MOJO_RESULT_ABORTED);
|
| }
|
|
|
| - void OnHandleReady(MojoResult result) {
|
| - got_ready_ = true;
|
| - NotifyAndDestroy(result);
|
| - }
|
| -
|
| void NotifyAndDestroy(MojoResult result) {
|
| base::Callback<void(MojoResult)> callback = callback_;
|
| watcher_->Stop(); // Destroys |this|.
|
| @@ -372,14 +351,96 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
|
| }
|
|
|
| HandleWatcher* watcher_;
|
| - WatcherID watcher_id_;
|
| base::Callback<void(MojoResult)> callback_;
|
|
|
| // Have we been notified that the handle is ready?
|
| bool got_ready_;
|
|
|
| + DISALLOW_COPY_AND_ASSIGN(StateBase);
|
| +};
|
| +
|
| +// If the thread on which HandleWatcher is used runs MessagePumpMojo,
|
| +// SameThreadWatchingState is used to directly watch the handle on the same
|
| +// thread.
|
| +class HandleWatcher::SameThreadWatchingState : public StateBase,
|
| + public MessagePumpMojoHandler {
|
| + public:
|
| + SameThreadWatchingState(HandleWatcher* watcher,
|
| + const Handle& handle,
|
| + MojoHandleSignals handle_signals,
|
| + MojoDeadline deadline,
|
| + const base::Callback<void(MojoResult)>& callback)
|
| + : StateBase(watcher, callback),
|
| + handle_(handle) {
|
| + DCHECK(MessagePumpMojo::IsCurrent());
|
| +
|
| + MessagePumpMojo::current()->AddHandler(
|
| + this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
|
| + }
|
| +
|
| + virtual ~SameThreadWatchingState() {
|
| + if (!got_ready())
|
| + MessagePumpMojo::current()->RemoveHandler(handle_);
|
| + }
|
| +
|
| + private:
|
| + // MessagePumpMojoHandler overrides:
|
| + virtual void OnHandleReady(const Handle& handle) OVERRIDE {
|
| + StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
|
| + }
|
| +
|
| + virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
|
| + StopWatchingAndNotifyReady(handle, result);
|
| + }
|
| +
|
| + void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
|
| + DCHECK_EQ(handle.value(), handle_.value());
|
| + MessagePumpMojo::current()->RemoveHandler(handle_);
|
| + NotifyHandleReady(result);
|
| + }
|
| +
|
| + Handle handle_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
|
| +};
|
| +
|
| +// If the thread on which HandleWatcher is used runs a message pump different
|
| +// from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
|
| +// handle on the handle watcher thread.
|
| +class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
|
| + public:
|
| + SecondaryThreadWatchingState(HandleWatcher* watcher,
|
| + const Handle& handle,
|
| + MojoHandleSignals handle_signals,
|
| + MojoDeadline deadline,
|
| + const base::Callback<void(MojoResult)>& callback)
|
| + : StateBase(watcher, callback),
|
| + weak_factory_(this) {
|
| + watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
|
| + handle,
|
| + handle_signals,
|
| + MojoDeadlineToTimeTicks(deadline),
|
| + base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
|
| + weak_factory_.GetWeakPtr()));
|
| + }
|
| +
|
| + virtual ~SecondaryThreadWatchingState() {
|
| + // If we've been notified the handle is ready (|got_ready()| is true) then
|
| + // the watch has been implicitly removed by
|
| + // WatcherThreadManager/MessagePumpMojo and we don't have to call
|
| + // StopWatching(). To do so would needlessly entail posting a task and
|
| + // blocking until the background thread services it.
|
| + if (!got_ready())
|
| + WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
|
| + }
|
| +
|
| + private:
|
| + WatcherID watcher_id_;
|
| +
|
| // Used to weakly bind |this| to the WatcherThreadManager.
|
| - base::WeakPtrFactory<State> weak_factory_;
|
| + base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
|
| };
|
|
|
| // HandleWatcher ---------------------------------------------------------------
|
| @@ -397,7 +458,13 @@ void HandleWatcher::Start(const Handle& handle,
|
| DCHECK(handle.is_valid());
|
| DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
|
|
|
| - state_.reset(new State(this, handle, handle_signals, deadline, callback));
|
| + if (MessagePumpMojo::IsCurrent()) {
|
| + state_.reset(new SameThreadWatchingState(
|
| + this, handle, handle_signals, deadline, callback));
|
| + } else {
|
| + state_.reset(new SecondaryThreadWatchingState(
|
| + this, handle, handle_signals, deadline, callback));
|
| + }
|
| }
|
|
|
| void HandleWatcher::Stop() {
|
|
|