| Index: mojo/common/handle_watcher.cc
|
| diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc
|
| index a159185ba35bc0029d658ab312168c746627a19a..dd9b2e849120a4ec9cc262ed2f95998ebebeeea0 100644
|
| --- a/mojo/common/handle_watcher.cc
|
| +++ b/mojo/common/handle_watcher.cc
|
| @@ -5,19 +5,18 @@
|
| #include "mojo/common/handle_watcher.h"
|
|
|
| #include <map>
|
| -#include <set>
|
| -#include <vector>
|
|
|
| +#include "base/atomic_sequence_num.h"
|
| #include "base/bind.h"
|
| #include "base/lazy_instance.h"
|
| #include "base/memory/weak_ptr.h"
|
| #include "base/message_loop/message_loop.h"
|
| #include "base/message_loop/message_loop_proxy.h"
|
| -#include "base/synchronization/lock.h"
|
| #include "base/threading/thread.h"
|
| #include "base/time/tick_clock.h"
|
| #include "base/time/time.h"
|
| -#include "mojo/common/scoped_message_pipe.h"
|
| +#include "mojo/common/message_pump_mojo.h"
|
| +#include "mojo/common/message_pump_mojo_handler.h"
|
|
|
| namespace mojo {
|
| namespace common {
|
| @@ -28,13 +27,126 @@ namespace {
|
|
|
| const char kWatcherThreadName[] = "handle-watcher-thread";
|
|
|
| +// TODO(sky): this should be unnecessary once MessageLoop has been refactored.
|
| +MessagePumpMojo* message_pump_mojo = NULL;
|
| +
|
| +scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
|
| + message_pump_mojo = new MessagePumpMojo;
|
| + return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
|
| +}
|
| +
|
| +// Tracks the data for a single call to Start().
|
| +struct WatchData {
|
| + WatchData()
|
| + : id(0),
|
| + handle(MOJO_HANDLE_INVALID),
|
| + wait_flags(MOJO_WAIT_FLAG_NONE),
|
| + message_loop(NULL) {}
|
| +
|
| + WatcherID id;
|
| + MojoHandle handle;
|
| + MojoWaitFlags wait_flags;
|
| + base::TimeTicks deadline;
|
| + base::Callback<void(MojoResult)> callback;
|
| + scoped_refptr<base::MessageLoopProxy> message_loop;
|
| +};
|
| +
|
| +// WatcherBackend --------------------------------------------------------------
|
| +
|
| +// WatcherBackend is responsible for managing the requests and interacting with
|
| +// MessagePumpMojo. All access (outside of creation/destruction) is done on the
|
| +// thread WatcherThreadManager creates.
|
| +class WatcherBackend : public MessagePumpMojoHandler {
|
| + public:
|
| + WatcherBackend();
|
| + virtual ~WatcherBackend();
|
| +
|
| + void StartWatching(const WatchData& data);
|
| + void StopWatching(WatcherID watcher_id);
|
| +
|
| + private:
|
| + typedef std::map<MojoHandle, WatchData> HandleToWatchDataMap;
|
| +
|
| + // Invoked when a handle needs to be removed and notified.
|
| + void RemoveAndNotify(MojoHandle handle, MojoResult result);
|
| +
|
| + // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
|
| + // and sets |handle| to the MojoHandle. Returns false if not a known id.
|
| + bool GetMojoHandleByWatcherID(WatcherID watcher_id, MojoHandle* handle) const;
|
| +
|
| + // MessagePumpMojoHandler overrides:
|
| + virtual void OnHandleReady(MojoHandle handle) OVERRIDE;
|
| + virtual void OnHandleError(MojoHandle handle, MojoResult result) OVERRIDE;
|
| +
|
| + // Maps from assigned id to WatchData.
|
| + HandleToWatchDataMap handle_to_data_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
|
| +};
|
| +
|
| +WatcherBackend::WatcherBackend() {
|
| +}
|
| +
|
| +WatcherBackend::~WatcherBackend() {
|
| +}
|
| +
|
| +void WatcherBackend::StartWatching(const WatchData& data) {
|
| + RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
|
| +
|
| + DCHECK_EQ(0u, handle_to_data_.count(data.handle));
|
| +
|
| + handle_to_data_[data.handle] = data;
|
| + message_pump_mojo->AddHandler(this, data.handle,
|
| + data.wait_flags,
|
| + data.deadline);
|
| +}
|
| +
|
| +void WatcherBackend::StopWatching(WatcherID watcher_id) {
|
| + // Because of the thread hop it is entirely possible to get here and not
|
| + // have a valid handle registered for |watcher_id|.
|
| + MojoHandle handle;
|
| + if (!GetMojoHandleByWatcherID(watcher_id, &handle))
|
| + return;
|
| +
|
| + handle_to_data_.erase(handle);
|
| + message_pump_mojo->RemoveHandler(handle);
|
| +}
|
| +
|
| +void WatcherBackend::RemoveAndNotify(MojoHandle handle,
|
| + MojoResult result) {
|
| + if (handle_to_data_.count(handle) == 0)
|
| + return;
|
| +
|
| + const WatchData data(handle_to_data_[handle]);
|
| + handle_to_data_.erase(handle);
|
| + message_pump_mojo->RemoveHandler(handle);
|
| + data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
|
| +}
|
| +
|
| +bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
|
| + MojoHandle* handle) const {
|
| + for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
|
| + i != handle_to_data_.end(); ++i) {
|
| + if (i->second.id == watcher_id) {
|
| + *handle = i->second.handle;
|
| + return true;
|
| + }
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +void WatcherBackend::OnHandleReady(MojoHandle handle) {
|
| + RemoveAndNotify(handle, MOJO_RESULT_OK);
|
| +}
|
| +
|
| +void WatcherBackend::OnHandleError(MojoHandle handle, MojoResult result) {
|
| + RemoveAndNotify(handle, result);
|
| +}
|
| +
|
| // WatcherThreadManager --------------------------------------------------------
|
|
|
| -// WatcherThreadManager manages listening for the handles. It is a singleton. It
|
| -// spawns a thread that waits for any handle passed to StartWatching() to be
|
| -// ready. Additionally it creates a message pipe for communication between the
|
| -// two threads. The message pipe is used solely to wake up the background
|
| -// thread. This happens when the set of handles changes, or during shutdown.
|
| +// WatcherThreadManager manages the background thread that listens for handles
|
| +// to be ready. All requests are handled by WatcherBackend.
|
| class WatcherThreadManager {
|
| public:
|
| // Returns the shared instance.
|
| @@ -50,84 +162,20 @@ class WatcherThreadManager {
|
| const base::Callback<void(MojoResult)>& callback);
|
|
|
| // Stops watching a handle.
|
| + // This may be invoked on any thread.
|
| void StopWatching(WatcherID watcher_id);
|
|
|
| private:
|
| friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
|
|
|
| - // Tracks a single request.
|
| - struct HandleAndCallback {
|
| - HandleAndCallback()
|
| - : handle(MOJO_HANDLE_INVALID),
|
| - wait_flags(MOJO_WAIT_FLAG_NONE),
|
| - message_loop(NULL) {}
|
| -
|
| - MojoHandle handle;
|
| - MojoWaitFlags wait_flags;
|
| - base::TimeTicks deadline;
|
| - base::Callback<void(MojoResult)> callback;
|
| - scoped_refptr<base::MessageLoopProxy> message_loop;
|
| - };
|
| -
|
| - // Contains the state needed for MojoWaitMany.
|
| - // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to
|
| - // pass to MojoWaitMany.
|
| - struct WaitState {
|
| - // List of ids.
|
| - std::vector<WatcherID> ids;
|
| -
|
| - // List of handles.
|
| - std::vector<MojoHandle> handles;
|
| -
|
| - // List of flags each handle is waiting on.
|
| - std::vector<MojoWaitFlags> wait_flags;
|
| -
|
| - // First deadline.
|
| - MojoDeadline deadline;
|
| -
|
| - // Set of ids whose deadline has been reached.
|
| - std::set<WatcherID> deadline_exceeded;
|
| - };
|
| -
|
| - typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap;
|
| -
|
| WatcherThreadManager();
|
| ~WatcherThreadManager();
|
|
|
| - // Invoked on the background thread. Runs a loop waiting on current set of
|
| - // handles.
|
| - void RunOnBackgroundThread();
|
| -
|
| - // Writes to the communication pipe to wake up the background thread.
|
| - void SignalBackgroundThread();
|
| -
|
| - // Invoked when a handle associated with |id| should be removed and notified.
|
| - // |result| gives the reason for removing.
|
| - void RemoveAndNotify(WatcherID id, MojoResult result);
|
| -
|
| - // Removes all callbacks schedule for |handle|. This is used when a handle
|
| - // is identified as invalid.
|
| - void RemoveHandle(MojoHandle handle);
|
| -
|
| - MojoHandle read_handle() const { return control_pipe_.handle_0(); }
|
| - MojoHandle write_handle() const { return control_pipe_.handle_1(); }
|
| -
|
| - // Returns state needed for MojoWaitMany.
|
| - WaitState GetWaitState();
|
| -
|
| - // Guards members accessed on both threads.
|
| - base::Lock lock_;
|
| -
|
| - // Used for communicating with the background thread.
|
| - ScopedMessagePipe control_pipe_;
|
| -
|
| base::Thread thread_;
|
|
|
| - // Maps from assigned id to the callback.
|
| - IDToCallbackMap id_to_callback_;
|
| + base::AtomicSequenceNumber watcher_id_generator_;
|
|
|
| - // If true the background loop should exit.
|
| - bool quit_;
|
| + WatcherBackend backend_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
|
| };
|
| @@ -143,177 +191,42 @@ WatcherID WatcherThreadManager::StartWatching(
|
| MojoWaitFlags wait_flags,
|
| base::TimeTicks deadline,
|
| const base::Callback<void(MojoResult)>& callback) {
|
| - WatcherID id = 0;
|
| - {
|
| - static int next_id = 0;
|
| - base::AutoLock lock(lock_);
|
| - // TODO(sky): worry about overflow?
|
| - id = ++next_id;
|
| - id_to_callback_[id].handle = handle;
|
| - id_to_callback_[id].callback = callback;
|
| - id_to_callback_[id].wait_flags = wait_flags;
|
| - id_to_callback_[id].deadline = deadline;
|
| - id_to_callback_[id].message_loop = base::MessageLoopProxy::current();
|
| - }
|
| - SignalBackgroundThread();
|
| - return id;
|
| + WatchData data;
|
| + data.id = watcher_id_generator_.GetNext();
|
| + data.handle = handle;
|
| + data.callback = callback;
|
| + data.wait_flags = wait_flags;
|
| + data.deadline = deadline;
|
| + data.message_loop = base::MessageLoopProxy::current();
|
| + // We outlive |thread_|, so it's safe to use Unretained() here.
|
| + thread_.message_loop()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&WatcherBackend::StartWatching,
|
| + base::Unretained(&backend_),
|
| + data));
|
| + return data.id;
|
| }
|
|
|
| -
|
| void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - // It's possible we've already serviced the handle but HandleWatcher hasn't
|
| - // processed it yet.
|
| - IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id);
|
| - if (i == id_to_callback_.end())
|
| - return;
|
| - id_to_callback_.erase(i);
|
| - }
|
| - SignalBackgroundThread();
|
| + // We outlive |thread_|, so it's safe to use Unretained() here.
|
| + thread_.message_loop()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&WatcherBackend::StopWatching,
|
| + base::Unretained(&backend_),
|
| + watcher_id));
|
| }
|
|
|
| WatcherThreadManager::WatcherThreadManager()
|
| - : thread_(kWatcherThreadName),
|
| - quit_(false) {
|
| - // TODO(sky): deal with error condition?
|
| - CHECK_NE(MOJO_HANDLE_INVALID, read_handle());
|
| - thread_.Start();
|
| - thread_.message_loop()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&WatcherThreadManager::RunOnBackgroundThread,
|
| - base::Unretained(this)));
|
| + : thread_(kWatcherThreadName) {
|
| + base::Thread::Options thread_options;
|
| + thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
|
| + thread_.StartWithOptions(thread_options);
|
| }
|
|
|
| WatcherThreadManager::~WatcherThreadManager() {
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - quit_ = true;
|
| - }
|
| - SignalBackgroundThread();
|
| -
|
| thread_.Stop();
|
| }
|
|
|
| -void WatcherThreadManager::RunOnBackgroundThread() {
|
| - while (true) {
|
| - const WaitState state = GetWaitState();
|
| - for (std::set<WatcherID>::const_iterator i =
|
| - state.deadline_exceeded.begin();
|
| - i != state.deadline_exceeded.end(); ++i) {
|
| - RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED);
|
| - }
|
| - const MojoResult result = MojoWaitMany(&state.handles.front(),
|
| - &state.wait_flags.front(),
|
| - state.handles.size(),
|
| - state.deadline);
|
| -
|
| - if (result >= 0) {
|
| - DCHECK_LT(result, static_cast<int>(state.handles.size()));
|
| - // Last handle is used to wake us up.
|
| - if (result == static_cast<int>(state.handles.size()) - 1) {
|
| - uint32_t num_bytes = 0;
|
| - MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0,
|
| - MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - if (quit_)
|
| - return;
|
| - }
|
| - } else {
|
| - RemoveAndNotify(state.ids[result], MOJO_RESULT_OK);
|
| - }
|
| - } else if (result == MOJO_RESULT_INVALID_ARGUMENT ||
|
| - result == MOJO_RESULT_FAILED_PRECONDITION) {
|
| - // One of the handles is invalid or the flags supplied is invalid, remove
|
| - // it.
|
| - // Use -1 as last handle is used for communication and should never be
|
| - // invalid.
|
| - for (size_t i = 0; i < state.handles.size() - 1; ++i) {
|
| - const MojoResult result =
|
| - MojoWait(state.handles[i], state.wait_flags[i], 0);
|
| - switch (result) {
|
| - // TODO: do we really want to notify for all these conditions?
|
| - case MOJO_RESULT_OK:
|
| - case MOJO_RESULT_FAILED_PRECONDITION:
|
| - case MOJO_RESULT_INVALID_ARGUMENT:
|
| - RemoveAndNotify(state.ids[i], result);
|
| - break;
|
| - case MOJO_RESULT_DEADLINE_EXCEEDED:
|
| - break;
|
| - default:
|
| - NOTREACHED();
|
| - }
|
| - }
|
| - }
|
| - }
|
| -}
|
| -
|
| -void WatcherThreadManager::SignalBackgroundThread() {
|
| - // TODO(sky): deal with error?
|
| - MojoWriteMessage(write_handle(), NULL, 0, NULL, 0,
|
| - MOJO_WRITE_MESSAGE_FLAG_NONE);
|
| -}
|
| -
|
| -void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) {
|
| - HandleAndCallback to_notify;
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - IDToCallbackMap::iterator i = id_to_callback_.find(id);
|
| - if (i == id_to_callback_.end())
|
| - return;
|
| - to_notify = i->second;
|
| - id_to_callback_.erase(i);
|
| - }
|
| - to_notify.message_loop->PostTask(FROM_HERE,
|
| - base::Bind(to_notify.callback, result));
|
| -}
|
| -
|
| -void WatcherThreadManager::RemoveHandle(MojoHandle handle) {
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - for (IDToCallbackMap::iterator i = id_to_callback_.begin();
|
| - i != id_to_callback_.end(); ) {
|
| - if (i->second.handle == handle) {
|
| - id_to_callback_.erase(i++);
|
| - } else {
|
| - ++i;
|
| - }
|
| - }
|
| - }
|
| -}
|
| -
|
| -WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() {
|
| - WaitState state;
|
| - const base::TimeTicks now(HandleWatcher::NowTicks());
|
| - base::TimeDelta deadline;
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - for (IDToCallbackMap::const_iterator i = id_to_callback_.begin();
|
| - i != id_to_callback_.end(); ++i) {
|
| - if (!i->second.deadline.is_null()) {
|
| - if (i->second.deadline <= now) {
|
| - state.deadline_exceeded.insert(i->first);
|
| - continue;
|
| - } else {
|
| - const base::TimeDelta delta = i->second.deadline - now;
|
| - if (deadline == base::TimeDelta() || delta < deadline)
|
| - deadline = delta;
|
| - }
|
| - }
|
| - state.ids.push_back(i->first);
|
| - state.handles.push_back(i->second.handle);
|
| - state.wait_flags.push_back(i->second.wait_flags);
|
| - }
|
| - }
|
| - state.ids.push_back(0);
|
| - state.handles.push_back(read_handle());
|
| - state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE);
|
| - state.deadline = (deadline == base::TimeDelta()) ?
|
| - MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds();
|
| - return state;
|
| -}
|
| -
|
| } // namespace
|
|
|
| // HandleWatcher::StartState ---------------------------------------------------
|
|
|