| Index: mojo/message_pump/handle_watcher.cc
 | 
| diff --git a/mojo/message_pump/handle_watcher.cc b/mojo/message_pump/handle_watcher.cc
 | 
| deleted file mode 100644
 | 
| index d02c761109692b257b18d94332c814b9974a87c3..0000000000000000000000000000000000000000
 | 
| --- a/mojo/message_pump/handle_watcher.cc
 | 
| +++ /dev/null
 | 
| @@ -1,475 +0,0 @@
 | 
| -// Copyright 2013 The Chromium Authors. All rights reserved.
 | 
| -// Use of this source code is governed by a BSD-style license that can be
 | 
| -// found in the LICENSE file.
 | 
| -
 | 
| -#include "mojo/message_pump/handle_watcher.h"
 | 
| -
 | 
| -#include <map>
 | 
| -
 | 
| -#include "base/atomic_sequence_num.h"
 | 
| -#include "base/bind.h"
 | 
| -#include "base/lazy_instance.h"
 | 
| -#include "base/logging.h"
 | 
| -#include "base/macros.h"
 | 
| -#include "base/memory/singleton.h"
 | 
| -#include "base/memory/weak_ptr.h"
 | 
| -#include "base/message_loop/message_loop.h"
 | 
| -#include "base/single_thread_task_runner.h"
 | 
| -#include "base/synchronization/lock.h"
 | 
| -#include "base/synchronization/waitable_event.h"
 | 
| -#include "base/thread_task_runner_handle.h"
 | 
| -#include "base/threading/thread.h"
 | 
| -#include "base/threading/thread_restrictions.h"
 | 
| -#include "base/time/time.h"
 | 
| -#include "mojo/message_pump/message_pump_mojo.h"
 | 
| -#include "mojo/message_pump/message_pump_mojo_handler.h"
 | 
| -#include "mojo/message_pump/time_helper.h"
 | 
| -
 | 
| -namespace mojo {
 | 
| -namespace common {
 | 
| -
 | 
| -typedef int WatcherID;
 | 
| -
 | 
| -namespace {
 | 
| -
 | 
| -const char kWatcherThreadName[] = "handle-watcher-thread";
 | 
| -
 | 
| -base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
 | 
| -  return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
 | 
| -      internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
 | 
| -}
 | 
| -
 | 
| -// Tracks the data for a single call to Start().
 | 
| -struct WatchData {
 | 
| -  WatchData()
 | 
| -      : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {}
 | 
| -
 | 
| -  WatcherID id;
 | 
| -  Handle handle;
 | 
| -  MojoHandleSignals handle_signals;
 | 
| -  base::TimeTicks deadline;
 | 
| -  base::Callback<void(MojoResult)> callback;
 | 
| -  scoped_refptr<base::SingleThreadTaskRunner> task_runner;
 | 
| -};
 | 
| -
 | 
| -// WatcherBackend --------------------------------------------------------------
 | 
| -
 | 
| -// WatcherBackend is responsible for managing the requests and interacting with
 | 
| -// MessagePumpMojo. All access (outside of creation/destruction) is done on the
 | 
| -// thread WatcherThreadManager creates.
 | 
| -class WatcherBackend : public MessagePumpMojoHandler {
 | 
| - public:
 | 
| -  WatcherBackend();
 | 
| -  ~WatcherBackend() override;
 | 
| -
 | 
| -  void StartWatching(const WatchData& data);
 | 
| -
 | 
| -  // Cancels a previously scheduled request to start a watch.
 | 
| -  void StopWatching(WatcherID watcher_id);
 | 
| -
 | 
| - private:
 | 
| -  typedef std::map<Handle, WatchData> HandleToWatchDataMap;
 | 
| -
 | 
| -  // Invoked when a handle needs to be removed and notified.
 | 
| -  void RemoveAndNotify(const Handle& handle, MojoResult result);
 | 
| -
 | 
| -  // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
 | 
| -  // and sets |handle| to the Handle. Returns false if not a known id.
 | 
| -  bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
 | 
| -
 | 
| -  // MessagePumpMojoHandler overrides:
 | 
| -  void OnHandleReady(const Handle& handle) override;
 | 
| -  void OnHandleError(const Handle& handle, MojoResult result) override;
 | 
| -
 | 
| -  // Maps from assigned id to WatchData.
 | 
| -  HandleToWatchDataMap handle_to_data_;
 | 
| -
 | 
| -  DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
 | 
| -};
 | 
| -
 | 
| -WatcherBackend::WatcherBackend() {
 | 
| -}
 | 
| -
 | 
| -WatcherBackend::~WatcherBackend() {
 | 
| -}
 | 
| -
 | 
| -void WatcherBackend::StartWatching(const WatchData& data) {
 | 
| -  RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
 | 
| -
 | 
| -  DCHECK_EQ(0u, handle_to_data_.count(data.handle));
 | 
| -
 | 
| -  handle_to_data_[data.handle] = data;
 | 
| -  MessagePumpMojo::current()->AddHandler(this, data.handle,
 | 
| -                                         data.handle_signals,
 | 
| -                                         data.deadline);
 | 
| -}
 | 
| -
 | 
| -void WatcherBackend::StopWatching(WatcherID watcher_id) {
 | 
| -  // Because of the thread hop it is entirely possible to get here and not
 | 
| -  // have a valid handle registered for |watcher_id|.
 | 
| -  Handle handle;
 | 
| -  if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
 | 
| -    handle_to_data_.erase(handle);
 | 
| -    MessagePumpMojo::current()->RemoveHandler(handle);
 | 
| -  }
 | 
| -}
 | 
| -
 | 
| -void WatcherBackend::RemoveAndNotify(const Handle& handle,
 | 
| -                                     MojoResult result) {
 | 
| -  if (handle_to_data_.count(handle) == 0)
 | 
| -    return;
 | 
| -
 | 
| -  const WatchData data(handle_to_data_[handle]);
 | 
| -  handle_to_data_.erase(handle);
 | 
| -  MessagePumpMojo::current()->RemoveHandler(handle);
 | 
| -
 | 
| -  data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result));
 | 
| -}
 | 
| -
 | 
| -bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
 | 
| -                                              Handle* handle) const {
 | 
| -  for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
 | 
| -       i != handle_to_data_.end(); ++i) {
 | 
| -    if (i->second.id == watcher_id) {
 | 
| -      *handle = i->second.handle;
 | 
| -      return true;
 | 
| -    }
 | 
| -  }
 | 
| -  return false;
 | 
| -}
 | 
| -
 | 
| -void WatcherBackend::OnHandleReady(const Handle& handle) {
 | 
| -  RemoveAndNotify(handle, MOJO_RESULT_OK);
 | 
| -}
 | 
| -
 | 
| -void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
 | 
| -  RemoveAndNotify(handle, result);
 | 
| -}
 | 
| -
 | 
| -// WatcherThreadManager --------------------------------------------------------
 | 
| -
 | 
| -// WatcherThreadManager manages the background thread that listens for handles
 | 
| -// to be ready. All requests are handled by WatcherBackend.
 | 
| -}  // namespace
 | 
| -
 | 
| -class WatcherThreadManager {
 | 
| - public:
 | 
| -  ~WatcherThreadManager();
 | 
| -
 | 
| -  // Returns the shared instance.
 | 
| -  static WatcherThreadManager* GetInstance();
 | 
| -
 | 
| -  // Starts watching the requested handle. Returns a unique ID that is used to
 | 
| -  // stop watching the handle. When the handle is ready |callback| is notified
 | 
| -  // on the thread StartWatching() was invoked on.
 | 
| -  // This may be invoked on any thread.
 | 
| -  WatcherID StartWatching(const Handle& handle,
 | 
| -                          MojoHandleSignals handle_signals,
 | 
| -                          base::TimeTicks deadline,
 | 
| -                          const base::Callback<void(MojoResult)>& callback);
 | 
| -
 | 
| -  // Stops watching a handle.
 | 
| -  // This may be invoked on any thread.
 | 
| -  void StopWatching(WatcherID watcher_id);
 | 
| -
 | 
| - private:
 | 
| -  enum RequestType {
 | 
| -    REQUEST_START,
 | 
| -    REQUEST_STOP,
 | 
| -  };
 | 
| -
 | 
| -  // See description of |requests_| for details.
 | 
| -  struct RequestData {
 | 
| -    RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
 | 
| -
 | 
| -    RequestType type;
 | 
| -    WatchData start_data;
 | 
| -    WatcherID stop_id;
 | 
| -    base::WaitableEvent* stop_event;
 | 
| -  };
 | 
| -
 | 
| -  typedef std::vector<RequestData> Requests;
 | 
| -
 | 
| -  friend struct DefaultSingletonTraits<WatcherThreadManager>;
 | 
| -
 | 
| -  WatcherThreadManager();
 | 
| -
 | 
| -  // Schedules a request on the background thread. See |requests_| for details.
 | 
| -  void AddRequest(const RequestData& data);
 | 
| -
 | 
| -  // Processes requests added to |requests_|. This is invoked on the backend
 | 
| -  // thread.
 | 
| -  void ProcessRequestsOnBackendThread();
 | 
| -
 | 
| -  base::Thread thread_;
 | 
| -
 | 
| -  base::AtomicSequenceNumber watcher_id_generator_;
 | 
| -
 | 
| -  WatcherBackend backend_;
 | 
| -
 | 
| -  // Protects |requests_|.
 | 
| -  base::Lock lock_;
 | 
| -
 | 
| -  // Start/Stop result in adding a RequestData to |requests_| (protected by
 | 
| -  // |lock_|). When the background thread wakes up it processes the requests.
 | 
| -  Requests requests_;
 | 
| -
 | 
| -  DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
 | 
| -};
 | 
| -
 | 
| -WatcherThreadManager::~WatcherThreadManager() {
 | 
| -  thread_.Stop();
 | 
| -}
 | 
| -
 | 
| -WatcherThreadManager* WatcherThreadManager::GetInstance() {
 | 
| -  return Singleton<WatcherThreadManager>::get();
 | 
| -}
 | 
| -
 | 
| -WatcherID WatcherThreadManager::StartWatching(
 | 
| -    const Handle& handle,
 | 
| -    MojoHandleSignals handle_signals,
 | 
| -    base::TimeTicks deadline,
 | 
| -    const base::Callback<void(MojoResult)>& callback) {
 | 
| -  RequestData request_data;
 | 
| -  request_data.type = REQUEST_START;
 | 
| -  request_data.start_data.id = watcher_id_generator_.GetNext();
 | 
| -  request_data.start_data.handle = handle;
 | 
| -  request_data.start_data.callback = callback;
 | 
| -  request_data.start_data.handle_signals = handle_signals;
 | 
| -  request_data.start_data.deadline = deadline;
 | 
| -  request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get();
 | 
| -  AddRequest(request_data);
 | 
| -  return request_data.start_data.id;
 | 
| -}
 | 
| -
 | 
| -void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
 | 
| -  // Handle the case of StartWatching() followed by StopWatching() before
 | 
| -  // |thread_| woke up.
 | 
| -  {
 | 
| -    base::AutoLock auto_lock(lock_);
 | 
| -    for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
 | 
| -      if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
 | 
| -        // Watcher ids are not reused, so if we find it we can stop.
 | 
| -        requests_.erase(i);
 | 
| -        return;
 | 
| -      }
 | 
| -    }
 | 
| -  }
 | 
| -
 | 
| -  base::ThreadRestrictions::ScopedAllowWait allow_wait;
 | 
| -  base::WaitableEvent event(true, false);
 | 
| -  RequestData request_data;
 | 
| -  request_data.type = REQUEST_STOP;
 | 
| -  request_data.stop_id = watcher_id;
 | 
| -  request_data.stop_event = &event;
 | 
| -  AddRequest(request_data);
 | 
| -
 | 
| -  // We need to block until the handle is actually removed.
 | 
| -  event.Wait();
 | 
| -}
 | 
| -
 | 
| -void WatcherThreadManager::AddRequest(const RequestData& data) {
 | 
| -  {
 | 
| -    base::AutoLock auto_lock(lock_);
 | 
| -    const bool was_empty = requests_.empty();
 | 
| -    requests_.push_back(data);
 | 
| -    if (!was_empty)
 | 
| -      return;
 | 
| -  }
 | 
| -  // We own |thread_|, so it's safe to use Unretained() here.
 | 
| -  thread_.task_runner()->PostTask(
 | 
| -      FROM_HERE,
 | 
| -      base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
 | 
| -                 base::Unretained(this)));
 | 
| -}
 | 
| -
 | 
| -void WatcherThreadManager::ProcessRequestsOnBackendThread() {
 | 
| -  DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
 | 
| -
 | 
| -  Requests requests;
 | 
| -  {
 | 
| -    base::AutoLock auto_lock(lock_);
 | 
| -    requests_.swap(requests);
 | 
| -  }
 | 
| -  for (size_t i = 0; i < requests.size(); ++i) {
 | 
| -    if (requests[i].type == REQUEST_START) {
 | 
| -      backend_.StartWatching(requests[i].start_data);
 | 
| -    } else {
 | 
| -      backend_.StopWatching(requests[i].stop_id);
 | 
| -      requests[i].stop_event->Signal();
 | 
| -    }
 | 
| -  }
 | 
| -}
 | 
| -
 | 
| -WatcherThreadManager::WatcherThreadManager()
 | 
| -    : thread_(kWatcherThreadName) {
 | 
| -  base::Thread::Options thread_options;
 | 
| -  thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
 | 
| -  thread_.StartWithOptions(thread_options);
 | 
| -}
 | 
| -
 | 
| -// HandleWatcher::StateBase and subclasses -------------------------------------
 | 
| -
 | 
| -// The base class of HandleWatcher's state. Owns the user's callback and
 | 
| -// monitors the current thread's MessageLoop to know when to force the callback
 | 
| -// to run (with an error) even though the pipe hasn't been signaled yet.
 | 
| -class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
 | 
| - public:
 | 
| -  StateBase(HandleWatcher* watcher,
 | 
| -            const base::Callback<void(MojoResult)>& callback)
 | 
| -      : watcher_(watcher),
 | 
| -        callback_(callback),
 | 
| -        got_ready_(false) {
 | 
| -    base::MessageLoop::current()->AddDestructionObserver(this);
 | 
| -  }
 | 
| -
 | 
| -  ~StateBase() override {
 | 
| -    base::MessageLoop::current()->RemoveDestructionObserver(this);
 | 
| -  }
 | 
| -
 | 
| - protected:
 | 
| -  void NotifyHandleReady(MojoResult result) {
 | 
| -    got_ready_ = true;
 | 
| -    NotifyAndDestroy(result);
 | 
| -  }
 | 
| -
 | 
| -  bool got_ready() const { return got_ready_; }
 | 
| -
 | 
| - private:
 | 
| -  void WillDestroyCurrentMessageLoop() override {
 | 
| -    // The current thread is exiting. Simulate a watch error.
 | 
| -    NotifyAndDestroy(MOJO_RESULT_ABORTED);
 | 
| -  }
 | 
| -
 | 
| -  void NotifyAndDestroy(MojoResult result) {
 | 
| -    base::Callback<void(MojoResult)> callback = callback_;
 | 
| -    watcher_->Stop();  // Destroys |this|.
 | 
| -
 | 
| -    callback.Run(result);
 | 
| -  }
 | 
| -
 | 
| -  HandleWatcher* watcher_;
 | 
| -  base::Callback<void(MojoResult)> callback_;
 | 
| -
 | 
| -  // Have we been notified that the handle is ready?
 | 
| -  bool got_ready_;
 | 
| -
 | 
| -  DISALLOW_COPY_AND_ASSIGN(StateBase);
 | 
| -};
 | 
| -
 | 
| -// If the thread on which HandleWatcher is used runs MessagePumpMojo,
 | 
| -// SameThreadWatchingState is used to directly watch the handle on the same
 | 
| -// thread.
 | 
| -class HandleWatcher::SameThreadWatchingState : public StateBase,
 | 
| -                                               public MessagePumpMojoHandler {
 | 
| - public:
 | 
| -  SameThreadWatchingState(HandleWatcher* watcher,
 | 
| -                          const Handle& handle,
 | 
| -                          MojoHandleSignals handle_signals,
 | 
| -                          MojoDeadline deadline,
 | 
| -                          const base::Callback<void(MojoResult)>& callback)
 | 
| -      : StateBase(watcher, callback),
 | 
| -        handle_(handle) {
 | 
| -    DCHECK(MessagePumpMojo::IsCurrent());
 | 
| -
 | 
| -    MessagePumpMojo::current()->AddHandler(
 | 
| -        this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
 | 
| -  }
 | 
| -
 | 
| -  ~SameThreadWatchingState() override {
 | 
| -    if (!got_ready())
 | 
| -      MessagePumpMojo::current()->RemoveHandler(handle_);
 | 
| -  }
 | 
| -
 | 
| - private:
 | 
| -  // MessagePumpMojoHandler overrides:
 | 
| -  void OnHandleReady(const Handle& handle) override {
 | 
| -    StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
 | 
| -  }
 | 
| -
 | 
| -  void OnHandleError(const Handle& handle, MojoResult result) override {
 | 
| -    StopWatchingAndNotifyReady(handle, result);
 | 
| -  }
 | 
| -
 | 
| -  void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
 | 
| -    DCHECK_EQ(handle.value(), handle_.value());
 | 
| -    MessagePumpMojo::current()->RemoveHandler(handle_);
 | 
| -    NotifyHandleReady(result);
 | 
| -  }
 | 
| -
 | 
| -  Handle handle_;
 | 
| -
 | 
| -  DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
 | 
| -};
 | 
| -
 | 
| -// If the thread on which HandleWatcher is used runs a message pump different
 | 
| -// from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
 | 
| -// handle on the handle watcher thread.
 | 
| -class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
 | 
| - public:
 | 
| -  SecondaryThreadWatchingState(HandleWatcher* watcher,
 | 
| -                               const Handle& handle,
 | 
| -                               MojoHandleSignals handle_signals,
 | 
| -                               MojoDeadline deadline,
 | 
| -                               const base::Callback<void(MojoResult)>& callback)
 | 
| -      : StateBase(watcher, callback),
 | 
| -        weak_factory_(this) {
 | 
| -    watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
 | 
| -        handle,
 | 
| -        handle_signals,
 | 
| -        MojoDeadlineToTimeTicks(deadline),
 | 
| -        base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
 | 
| -                   weak_factory_.GetWeakPtr()));
 | 
| -  }
 | 
| -
 | 
| -  ~SecondaryThreadWatchingState() override {
 | 
| -    // If we've been notified the handle is ready (|got_ready()| is true) then
 | 
| -    // the watch has been implicitly removed by
 | 
| -    // WatcherThreadManager/MessagePumpMojo and we don't have to call
 | 
| -    // StopWatching(). To do so would needlessly entail posting a task and
 | 
| -    // blocking until the background thread services it.
 | 
| -    if (!got_ready())
 | 
| -      WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
 | 
| -  }
 | 
| -
 | 
| - private:
 | 
| -  WatcherID watcher_id_;
 | 
| -
 | 
| -  // Used to weakly bind |this| to the WatcherThreadManager.
 | 
| -  base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
 | 
| -
 | 
| -  DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
 | 
| -};
 | 
| -
 | 
| -// HandleWatcher ---------------------------------------------------------------
 | 
| -
 | 
| -HandleWatcher::HandleWatcher() {
 | 
| -}
 | 
| -
 | 
| -HandleWatcher::~HandleWatcher() {
 | 
| -}
 | 
| -
 | 
| -void HandleWatcher::Start(const Handle& handle,
 | 
| -                          MojoHandleSignals handle_signals,
 | 
| -                          MojoDeadline deadline,
 | 
| -                          const base::Callback<void(MojoResult)>& callback) {
 | 
| -  DCHECK(handle.is_valid());
 | 
| -  DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
 | 
| -
 | 
| -  // Need to clear the state before creating a new one.
 | 
| -  state_.reset();
 | 
| -  if (MessagePumpMojo::IsCurrent()) {
 | 
| -    state_.reset(new SameThreadWatchingState(
 | 
| -        this, handle, handle_signals, deadline, callback));
 | 
| -  } else {
 | 
| -    state_.reset(new SecondaryThreadWatchingState(
 | 
| -        this, handle, handle_signals, deadline, callback));
 | 
| -  }
 | 
| -}
 | 
| -
 | 
| -void HandleWatcher::Stop() {
 | 
| -  state_.reset();
 | 
| -}
 | 
| -
 | 
| -}  // namespace common
 | 
| -}  // namespace mojo
 | 
| 
 |