Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(750)

Unified Diff: mojo/common/handle_watcher.cc

Issue 69883008: Implements HandleWatcher in terms of MessagePumpMojo (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: add id to detect whether should notify Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | mojo/common/message_pump_mojo.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/common/handle_watcher.cc
diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc
index a159185ba35bc0029d658ab312168c746627a19a..dd9b2e849120a4ec9cc262ed2f95998ebebeeea0 100644
--- a/mojo/common/handle_watcher.cc
+++ b/mojo/common/handle_watcher.cc
@@ -5,19 +5,18 @@
#include "mojo/common/handle_watcher.h"
#include <map>
-#include <set>
-#include <vector>
+#include "base/atomic_sequence_num.h"
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
-#include "base/synchronization/lock.h"
#include "base/threading/thread.h"
#include "base/time/tick_clock.h"
#include "base/time/time.h"
-#include "mojo/common/scoped_message_pipe.h"
+#include "mojo/common/message_pump_mojo.h"
+#include "mojo/common/message_pump_mojo_handler.h"
namespace mojo {
namespace common {
@@ -28,13 +27,126 @@ namespace {
const char kWatcherThreadName[] = "handle-watcher-thread";
+// TODO(sky): this should be unnecessary once MessageLoop has been refactored.
+MessagePumpMojo* message_pump_mojo = NULL;
+
+scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
+ message_pump_mojo = new MessagePumpMojo;
+ return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
+}
+
+// Tracks the data for a single call to Start().
+struct WatchData {
+ WatchData()
+ : id(0),
+ handle(MOJO_HANDLE_INVALID),
+ wait_flags(MOJO_WAIT_FLAG_NONE),
+ message_loop(NULL) {}
+
+ WatcherID id;
+ MojoHandle handle;
+ MojoWaitFlags wait_flags;
+ base::TimeTicks deadline;
+ base::Callback<void(MojoResult)> callback;
+ scoped_refptr<base::MessageLoopProxy> message_loop;
+};
+
+// WatcherBackend --------------------------------------------------------------
+
+// WatcherBackend is responsible for managing the requests and interacting with
+// MessagePumpMojo. All access (outside of creation/destruction) is done on the
+// thread WatcherThreadManager creates.
+class WatcherBackend : public MessagePumpMojoHandler {
+ public:
+ WatcherBackend();
+ virtual ~WatcherBackend();
+
+ void StartWatching(const WatchData& data);
+ void StopWatching(WatcherID watcher_id);
+
+ private:
+ typedef std::map<MojoHandle, WatchData> HandleToWatchDataMap;
+
+ // Invoked when a handle needs to be removed and notified.
+ void RemoveAndNotify(MojoHandle handle, MojoResult result);
+
+ // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
+ // and sets |handle| to the MojoHandle. Returns false if not a known id.
+ bool GetMojoHandleByWatcherID(WatcherID watcher_id, MojoHandle* handle) const;
+
+ // MessagePumpMojoHandler overrides:
+ virtual void OnHandleReady(MojoHandle handle) OVERRIDE;
+ virtual void OnHandleError(MojoHandle handle, MojoResult result) OVERRIDE;
+
+ // Maps from assigned id to WatchData.
+ HandleToWatchDataMap handle_to_data_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
+};
+
+WatcherBackend::WatcherBackend() {
+}
+
+WatcherBackend::~WatcherBackend() {
+}
+
+void WatcherBackend::StartWatching(const WatchData& data) {
+ RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
+
+ DCHECK_EQ(0u, handle_to_data_.count(data.handle));
+
+ handle_to_data_[data.handle] = data;
+ message_pump_mojo->AddHandler(this, data.handle,
+ data.wait_flags,
+ data.deadline);
+}
+
+void WatcherBackend::StopWatching(WatcherID watcher_id) {
+ // Because of the thread hop it is entirely possible to get here and not
+ // have a valid handle registered for |watcher_id|.
+ MojoHandle handle;
+ if (!GetMojoHandleByWatcherID(watcher_id, &handle))
+ return;
+
+ handle_to_data_.erase(handle);
+ message_pump_mojo->RemoveHandler(handle);
+}
+
+void WatcherBackend::RemoveAndNotify(MojoHandle handle,
+ MojoResult result) {
+ if (handle_to_data_.count(handle) == 0)
+ return;
+
+ const WatchData data(handle_to_data_[handle]);
+ handle_to_data_.erase(handle);
+ message_pump_mojo->RemoveHandler(handle);
+ data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
+}
+
+bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
+ MojoHandle* handle) const {
+ for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
+ i != handle_to_data_.end(); ++i) {
+ if (i->second.id == watcher_id) {
+ *handle = i->second.handle;
+ return true;
+ }
+ }
+ return false;
+}
+
+void WatcherBackend::OnHandleReady(MojoHandle handle) {
+ RemoveAndNotify(handle, MOJO_RESULT_OK);
+}
+
+void WatcherBackend::OnHandleError(MojoHandle handle, MojoResult result) {
+ RemoveAndNotify(handle, result);
+}
+
// WatcherThreadManager --------------------------------------------------------
-// WatcherThreadManager manages listening for the handles. It is a singleton. It
-// spawns a thread that waits for any handle passed to StartWatching() to be
-// ready. Additionally it creates a message pipe for communication between the
-// two threads. The message pipe is used solely to wake up the background
-// thread. This happens when the set of handles changes, or during shutdown.
+// WatcherThreadManager manages the background thread that listens for handles
+// to be ready. All requests are handled by WatcherBackend.
class WatcherThreadManager {
public:
// Returns the shared instance.
@@ -50,84 +162,20 @@ class WatcherThreadManager {
const base::Callback<void(MojoResult)>& callback);
// Stops watching a handle.
+ // This may be invoked on any thread.
void StopWatching(WatcherID watcher_id);
private:
friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
- // Tracks a single request.
- struct HandleAndCallback {
- HandleAndCallback()
- : handle(MOJO_HANDLE_INVALID),
- wait_flags(MOJO_WAIT_FLAG_NONE),
- message_loop(NULL) {}
-
- MojoHandle handle;
- MojoWaitFlags wait_flags;
- base::TimeTicks deadline;
- base::Callback<void(MojoResult)> callback;
- scoped_refptr<base::MessageLoopProxy> message_loop;
- };
-
- // Contains the state needed for MojoWaitMany.
- // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to
- // pass to MojoWaitMany.
- struct WaitState {
- // List of ids.
- std::vector<WatcherID> ids;
-
- // List of handles.
- std::vector<MojoHandle> handles;
-
- // List of flags each handle is waiting on.
- std::vector<MojoWaitFlags> wait_flags;
-
- // First deadline.
- MojoDeadline deadline;
-
- // Set of ids whose deadline has been reached.
- std::set<WatcherID> deadline_exceeded;
- };
-
- typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap;
-
WatcherThreadManager();
~WatcherThreadManager();
- // Invoked on the background thread. Runs a loop waiting on current set of
- // handles.
- void RunOnBackgroundThread();
-
- // Writes to the communication pipe to wake up the background thread.
- void SignalBackgroundThread();
-
- // Invoked when a handle associated with |id| should be removed and notified.
- // |result| gives the reason for removing.
- void RemoveAndNotify(WatcherID id, MojoResult result);
-
- // Removes all callbacks schedule for |handle|. This is used when a handle
- // is identified as invalid.
- void RemoveHandle(MojoHandle handle);
-
- MojoHandle read_handle() const { return control_pipe_.handle_0(); }
- MojoHandle write_handle() const { return control_pipe_.handle_1(); }
-
- // Returns state needed for MojoWaitMany.
- WaitState GetWaitState();
-
- // Guards members accessed on both threads.
- base::Lock lock_;
-
- // Used for communicating with the background thread.
- ScopedMessagePipe control_pipe_;
-
base::Thread thread_;
- // Maps from assigned id to the callback.
- IDToCallbackMap id_to_callback_;
+ base::AtomicSequenceNumber watcher_id_generator_;
- // If true the background loop should exit.
- bool quit_;
+ WatcherBackend backend_;
DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
};
@@ -143,177 +191,42 @@ WatcherID WatcherThreadManager::StartWatching(
MojoWaitFlags wait_flags,
base::TimeTicks deadline,
const base::Callback<void(MojoResult)>& callback) {
- WatcherID id = 0;
- {
- static int next_id = 0;
- base::AutoLock lock(lock_);
- // TODO(sky): worry about overflow?
- id = ++next_id;
- id_to_callback_[id].handle = handle;
- id_to_callback_[id].callback = callback;
- id_to_callback_[id].wait_flags = wait_flags;
- id_to_callback_[id].deadline = deadline;
- id_to_callback_[id].message_loop = base::MessageLoopProxy::current();
- }
- SignalBackgroundThread();
- return id;
+ WatchData data;
+ data.id = watcher_id_generator_.GetNext();
+ data.handle = handle;
+ data.callback = callback;
+ data.wait_flags = wait_flags;
+ data.deadline = deadline;
+ data.message_loop = base::MessageLoopProxy::current();
+ // We outlive |thread_|, so it's safe to use Unretained() here.
+ thread_.message_loop()->PostTask(
+ FROM_HERE,
+ base::Bind(&WatcherBackend::StartWatching,
+ base::Unretained(&backend_),
+ data));
+ return data.id;
}
-
void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
- {
- base::AutoLock lock(lock_);
- // It's possible we've already serviced the handle but HandleWatcher hasn't
- // processed it yet.
- IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id);
- if (i == id_to_callback_.end())
- return;
- id_to_callback_.erase(i);
- }
- SignalBackgroundThread();
+ // We outlive |thread_|, so it's safe to use Unretained() here.
+ thread_.message_loop()->PostTask(
+ FROM_HERE,
+ base::Bind(&WatcherBackend::StopWatching,
+ base::Unretained(&backend_),
+ watcher_id));
}
WatcherThreadManager::WatcherThreadManager()
- : thread_(kWatcherThreadName),
- quit_(false) {
- // TODO(sky): deal with error condition?
- CHECK_NE(MOJO_HANDLE_INVALID, read_handle());
- thread_.Start();
- thread_.message_loop()->PostTask(
- FROM_HERE,
- base::Bind(&WatcherThreadManager::RunOnBackgroundThread,
- base::Unretained(this)));
+ : thread_(kWatcherThreadName) {
+ base::Thread::Options thread_options;
+ thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
+ thread_.StartWithOptions(thread_options);
}
WatcherThreadManager::~WatcherThreadManager() {
- {
- base::AutoLock lock(lock_);
- quit_ = true;
- }
- SignalBackgroundThread();
-
thread_.Stop();
}
-void WatcherThreadManager::RunOnBackgroundThread() {
- while (true) {
- const WaitState state = GetWaitState();
- for (std::set<WatcherID>::const_iterator i =
- state.deadline_exceeded.begin();
- i != state.deadline_exceeded.end(); ++i) {
- RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED);
- }
- const MojoResult result = MojoWaitMany(&state.handles.front(),
- &state.wait_flags.front(),
- state.handles.size(),
- state.deadline);
-
- if (result >= 0) {
- DCHECK_LT(result, static_cast<int>(state.handles.size()));
- // Last handle is used to wake us up.
- if (result == static_cast<int>(state.handles.size()) - 1) {
- uint32_t num_bytes = 0;
- MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0,
- MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
- {
- base::AutoLock lock(lock_);
- if (quit_)
- return;
- }
- } else {
- RemoveAndNotify(state.ids[result], MOJO_RESULT_OK);
- }
- } else if (result == MOJO_RESULT_INVALID_ARGUMENT ||
- result == MOJO_RESULT_FAILED_PRECONDITION) {
- // One of the handles is invalid or the flags supplied is invalid, remove
- // it.
- // Use -1 as last handle is used for communication and should never be
- // invalid.
- for (size_t i = 0; i < state.handles.size() - 1; ++i) {
- const MojoResult result =
- MojoWait(state.handles[i], state.wait_flags[i], 0);
- switch (result) {
- // TODO: do we really want to notify for all these conditions?
- case MOJO_RESULT_OK:
- case MOJO_RESULT_FAILED_PRECONDITION:
- case MOJO_RESULT_INVALID_ARGUMENT:
- RemoveAndNotify(state.ids[i], result);
- break;
- case MOJO_RESULT_DEADLINE_EXCEEDED:
- break;
- default:
- NOTREACHED();
- }
- }
- }
- }
-}
-
-void WatcherThreadManager::SignalBackgroundThread() {
- // TODO(sky): deal with error?
- MojoWriteMessage(write_handle(), NULL, 0, NULL, 0,
- MOJO_WRITE_MESSAGE_FLAG_NONE);
-}
-
-void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) {
- HandleAndCallback to_notify;
- {
- base::AutoLock lock(lock_);
- IDToCallbackMap::iterator i = id_to_callback_.find(id);
- if (i == id_to_callback_.end())
- return;
- to_notify = i->second;
- id_to_callback_.erase(i);
- }
- to_notify.message_loop->PostTask(FROM_HERE,
- base::Bind(to_notify.callback, result));
-}
-
-void WatcherThreadManager::RemoveHandle(MojoHandle handle) {
- {
- base::AutoLock lock(lock_);
- for (IDToCallbackMap::iterator i = id_to_callback_.begin();
- i != id_to_callback_.end(); ) {
- if (i->second.handle == handle) {
- id_to_callback_.erase(i++);
- } else {
- ++i;
- }
- }
- }
-}
-
-WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() {
- WaitState state;
- const base::TimeTicks now(HandleWatcher::NowTicks());
- base::TimeDelta deadline;
- {
- base::AutoLock lock(lock_);
- for (IDToCallbackMap::const_iterator i = id_to_callback_.begin();
- i != id_to_callback_.end(); ++i) {
- if (!i->second.deadline.is_null()) {
- if (i->second.deadline <= now) {
- state.deadline_exceeded.insert(i->first);
- continue;
- } else {
- const base::TimeDelta delta = i->second.deadline - now;
- if (deadline == base::TimeDelta() || delta < deadline)
- deadline = delta;
- }
- }
- state.ids.push_back(i->first);
- state.handles.push_back(i->second.handle);
- state.wait_flags.push_back(i->second.wait_flags);
- }
- }
- state.ids.push_back(0);
- state.handles.push_back(read_handle());
- state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE);
- state.deadline = (deadline == base::TimeDelta()) ?
- MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds();
- return state;
-}
-
} // namespace
// HandleWatcher::StartState ---------------------------------------------------
« no previous file with comments | « no previous file | mojo/common/message_pump_mojo.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698