Index: mojo/common/handle_watcher.cc |
diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc |
index a159185ba35bc0029d658ab312168c746627a19a..dd9b2e849120a4ec9cc262ed2f95998ebebeeea0 100644 |
--- a/mojo/common/handle_watcher.cc |
+++ b/mojo/common/handle_watcher.cc |
@@ -5,19 +5,18 @@ |
#include "mojo/common/handle_watcher.h" |
#include <map> |
-#include <set> |
-#include <vector> |
+#include "base/atomic_sequence_num.h" |
#include "base/bind.h" |
#include "base/lazy_instance.h" |
#include "base/memory/weak_ptr.h" |
#include "base/message_loop/message_loop.h" |
#include "base/message_loop/message_loop_proxy.h" |
-#include "base/synchronization/lock.h" |
#include "base/threading/thread.h" |
#include "base/time/tick_clock.h" |
#include "base/time/time.h" |
-#include "mojo/common/scoped_message_pipe.h" |
+#include "mojo/common/message_pump_mojo.h" |
+#include "mojo/common/message_pump_mojo_handler.h" |
namespace mojo { |
namespace common { |
@@ -28,13 +27,126 @@ namespace { |
const char kWatcherThreadName[] = "handle-watcher-thread"; |
+// TODO(sky): this should be unnecessary once MessageLoop has been refactored. |
+MessagePumpMojo* message_pump_mojo = NULL; |
+ |
+scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { |
+ message_pump_mojo = new MessagePumpMojo; |
+ return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass(); |
+} |
+ |
+// Tracks the data for a single call to Start(). |
+struct WatchData { |
+ WatchData() |
+ : id(0), |
+ handle(MOJO_HANDLE_INVALID), |
+ wait_flags(MOJO_WAIT_FLAG_NONE), |
+ message_loop(NULL) {} |
+ |
+ WatcherID id; |
+ MojoHandle handle; |
+ MojoWaitFlags wait_flags; |
+ base::TimeTicks deadline; |
+ base::Callback<void(MojoResult)> callback; |
+ scoped_refptr<base::MessageLoopProxy> message_loop; |
+}; |
+ |
+// WatcherBackend -------------------------------------------------------------- |
+ |
+// WatcherBackend is responsible for managing the requests and interacting with |
+// MessagePumpMojo. All access (outside of creation/destruction) is done on the |
+// thread WatcherThreadManager creates. |
+class WatcherBackend : public MessagePumpMojoHandler { |
+ public: |
+ WatcherBackend(); |
+ virtual ~WatcherBackend(); |
+ |
+ void StartWatching(const WatchData& data); |
+ void StopWatching(WatcherID watcher_id); |
+ |
+ private: |
+ typedef std::map<MojoHandle, WatchData> HandleToWatchDataMap; |
+ |
+ // Invoked when a handle needs to be removed and notified. |
+ void RemoveAndNotify(MojoHandle handle, MojoResult result); |
+ |
+ // Searches through |handle_to_data_| for |watcher_id|. Returns true if found |
+ // and sets |handle| to the MojoHandle. Returns false if not a known id. |
+ bool GetMojoHandleByWatcherID(WatcherID watcher_id, MojoHandle* handle) const; |
+ |
+ // MessagePumpMojoHandler overrides: |
+ virtual void OnHandleReady(MojoHandle handle) OVERRIDE; |
+ virtual void OnHandleError(MojoHandle handle, MojoResult result) OVERRIDE; |
+ |
+ // Maps from assigned id to WatchData. |
+ HandleToWatchDataMap handle_to_data_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(WatcherBackend); |
+}; |
+ |
+WatcherBackend::WatcherBackend() { |
+} |
+ |
+WatcherBackend::~WatcherBackend() { |
+} |
+ |
+void WatcherBackend::StartWatching(const WatchData& data) { |
+ RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
+ |
+ DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
+ |
+ handle_to_data_[data.handle] = data; |
+ message_pump_mojo->AddHandler(this, data.handle, |
+ data.wait_flags, |
+ data.deadline); |
+} |
+ |
+void WatcherBackend::StopWatching(WatcherID watcher_id) { |
+ // Because of the thread hop it is entirely possible to get here and not |
+ // have a valid handle registered for |watcher_id|. |
+ MojoHandle handle; |
+ if (!GetMojoHandleByWatcherID(watcher_id, &handle)) |
+ return; |
+ |
+ handle_to_data_.erase(handle); |
+ message_pump_mojo->RemoveHandler(handle); |
+} |
+ |
+void WatcherBackend::RemoveAndNotify(MojoHandle handle, |
+ MojoResult result) { |
+ if (handle_to_data_.count(handle) == 0) |
+ return; |
+ |
+ const WatchData data(handle_to_data_[handle]); |
+ handle_to_data_.erase(handle); |
+ message_pump_mojo->RemoveHandler(handle); |
+ data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); |
+} |
+ |
+bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, |
+ MojoHandle* handle) const { |
+ for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); |
+ i != handle_to_data_.end(); ++i) { |
+ if (i->second.id == watcher_id) { |
+ *handle = i->second.handle; |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+void WatcherBackend::OnHandleReady(MojoHandle handle) { |
+ RemoveAndNotify(handle, MOJO_RESULT_OK); |
+} |
+ |
+void WatcherBackend::OnHandleError(MojoHandle handle, MojoResult result) { |
+ RemoveAndNotify(handle, result); |
+} |
+ |
// WatcherThreadManager -------------------------------------------------------- |
-// WatcherThreadManager manages listening for the handles. It is a singleton. It |
-// spawns a thread that waits for any handle passed to StartWatching() to be |
-// ready. Additionally it creates a message pipe for communication between the |
-// two threads. The message pipe is used solely to wake up the background |
-// thread. This happens when the set of handles changes, or during shutdown. |
+// WatcherThreadManager manages the background thread that listens for handles |
+// to be ready. All requests are handled by WatcherBackend. |
class WatcherThreadManager { |
public: |
// Returns the shared instance. |
@@ -50,84 +162,20 @@ class WatcherThreadManager { |
const base::Callback<void(MojoResult)>& callback); |
// Stops watching a handle. |
+ // This may be invoked on any thread. |
void StopWatching(WatcherID watcher_id); |
private: |
friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; |
- // Tracks a single request. |
- struct HandleAndCallback { |
- HandleAndCallback() |
- : handle(MOJO_HANDLE_INVALID), |
- wait_flags(MOJO_WAIT_FLAG_NONE), |
- message_loop(NULL) {} |
- |
- MojoHandle handle; |
- MojoWaitFlags wait_flags; |
- base::TimeTicks deadline; |
- base::Callback<void(MojoResult)> callback; |
- scoped_refptr<base::MessageLoopProxy> message_loop; |
- }; |
- |
- // Contains the state needed for MojoWaitMany. |
- // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to |
- // pass to MojoWaitMany. |
- struct WaitState { |
- // List of ids. |
- std::vector<WatcherID> ids; |
- |
- // List of handles. |
- std::vector<MojoHandle> handles; |
- |
- // List of flags each handle is waiting on. |
- std::vector<MojoWaitFlags> wait_flags; |
- |
- // First deadline. |
- MojoDeadline deadline; |
- |
- // Set of ids whose deadline has been reached. |
- std::set<WatcherID> deadline_exceeded; |
- }; |
- |
- typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap; |
- |
WatcherThreadManager(); |
~WatcherThreadManager(); |
- // Invoked on the background thread. Runs a loop waiting on current set of |
- // handles. |
- void RunOnBackgroundThread(); |
- |
- // Writes to the communication pipe to wake up the background thread. |
- void SignalBackgroundThread(); |
- |
- // Invoked when a handle associated with |id| should be removed and notified. |
- // |result| gives the reason for removing. |
- void RemoveAndNotify(WatcherID id, MojoResult result); |
- |
- // Removes all callbacks schedule for |handle|. This is used when a handle |
- // is identified as invalid. |
- void RemoveHandle(MojoHandle handle); |
- |
- MojoHandle read_handle() const { return control_pipe_.handle_0(); } |
- MojoHandle write_handle() const { return control_pipe_.handle_1(); } |
- |
- // Returns state needed for MojoWaitMany. |
- WaitState GetWaitState(); |
- |
- // Guards members accessed on both threads. |
- base::Lock lock_; |
- |
- // Used for communicating with the background thread. |
- ScopedMessagePipe control_pipe_; |
- |
base::Thread thread_; |
- // Maps from assigned id to the callback. |
- IDToCallbackMap id_to_callback_; |
+ base::AtomicSequenceNumber watcher_id_generator_; |
- // If true the background loop should exit. |
- bool quit_; |
+ WatcherBackend backend_; |
DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
}; |
@@ -143,177 +191,42 @@ WatcherID WatcherThreadManager::StartWatching( |
MojoWaitFlags wait_flags, |
base::TimeTicks deadline, |
const base::Callback<void(MojoResult)>& callback) { |
- WatcherID id = 0; |
- { |
- static int next_id = 0; |
- base::AutoLock lock(lock_); |
- // TODO(sky): worry about overflow? |
- id = ++next_id; |
- id_to_callback_[id].handle = handle; |
- id_to_callback_[id].callback = callback; |
- id_to_callback_[id].wait_flags = wait_flags; |
- id_to_callback_[id].deadline = deadline; |
- id_to_callback_[id].message_loop = base::MessageLoopProxy::current(); |
- } |
- SignalBackgroundThread(); |
- return id; |
+ WatchData data; |
+ data.id = watcher_id_generator_.GetNext(); |
+ data.handle = handle; |
+ data.callback = callback; |
+ data.wait_flags = wait_flags; |
+ data.deadline = deadline; |
+ data.message_loop = base::MessageLoopProxy::current(); |
+ // We outlive |thread_|, so it's safe to use Unretained() here. |
+ thread_.message_loop()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&WatcherBackend::StartWatching, |
+ base::Unretained(&backend_), |
+ data)); |
+ return data.id; |
} |
- |
void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
- { |
- base::AutoLock lock(lock_); |
- // It's possible we've already serviced the handle but HandleWatcher hasn't |
- // processed it yet. |
- IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id); |
- if (i == id_to_callback_.end()) |
- return; |
- id_to_callback_.erase(i); |
- } |
- SignalBackgroundThread(); |
+ // We outlive |thread_|, so it's safe to use Unretained() here. |
+ thread_.message_loop()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&WatcherBackend::StopWatching, |
+ base::Unretained(&backend_), |
+ watcher_id)); |
} |
WatcherThreadManager::WatcherThreadManager() |
- : thread_(kWatcherThreadName), |
- quit_(false) { |
- // TODO(sky): deal with error condition? |
- CHECK_NE(MOJO_HANDLE_INVALID, read_handle()); |
- thread_.Start(); |
- thread_.message_loop()->PostTask( |
- FROM_HERE, |
- base::Bind(&WatcherThreadManager::RunOnBackgroundThread, |
- base::Unretained(this))); |
+ : thread_(kWatcherThreadName) { |
+ base::Thread::Options thread_options; |
+ thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); |
+ thread_.StartWithOptions(thread_options); |
} |
WatcherThreadManager::~WatcherThreadManager() { |
- { |
- base::AutoLock lock(lock_); |
- quit_ = true; |
- } |
- SignalBackgroundThread(); |
- |
thread_.Stop(); |
} |
-void WatcherThreadManager::RunOnBackgroundThread() { |
- while (true) { |
- const WaitState state = GetWaitState(); |
- for (std::set<WatcherID>::const_iterator i = |
- state.deadline_exceeded.begin(); |
- i != state.deadline_exceeded.end(); ++i) { |
- RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED); |
- } |
- const MojoResult result = MojoWaitMany(&state.handles.front(), |
- &state.wait_flags.front(), |
- state.handles.size(), |
- state.deadline); |
- |
- if (result >= 0) { |
- DCHECK_LT(result, static_cast<int>(state.handles.size())); |
- // Last handle is used to wake us up. |
- if (result == static_cast<int>(state.handles.size()) - 1) { |
- uint32_t num_bytes = 0; |
- MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0, |
- MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
- { |
- base::AutoLock lock(lock_); |
- if (quit_) |
- return; |
- } |
- } else { |
- RemoveAndNotify(state.ids[result], MOJO_RESULT_OK); |
- } |
- } else if (result == MOJO_RESULT_INVALID_ARGUMENT || |
- result == MOJO_RESULT_FAILED_PRECONDITION) { |
- // One of the handles is invalid or the flags supplied is invalid, remove |
- // it. |
- // Use -1 as last handle is used for communication and should never be |
- // invalid. |
- for (size_t i = 0; i < state.handles.size() - 1; ++i) { |
- const MojoResult result = |
- MojoWait(state.handles[i], state.wait_flags[i], 0); |
- switch (result) { |
- // TODO: do we really want to notify for all these conditions? |
- case MOJO_RESULT_OK: |
- case MOJO_RESULT_FAILED_PRECONDITION: |
- case MOJO_RESULT_INVALID_ARGUMENT: |
- RemoveAndNotify(state.ids[i], result); |
- break; |
- case MOJO_RESULT_DEADLINE_EXCEEDED: |
- break; |
- default: |
- NOTREACHED(); |
- } |
- } |
- } |
- } |
-} |
- |
-void WatcherThreadManager::SignalBackgroundThread() { |
- // TODO(sky): deal with error? |
- MojoWriteMessage(write_handle(), NULL, 0, NULL, 0, |
- MOJO_WRITE_MESSAGE_FLAG_NONE); |
-} |
- |
-void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) { |
- HandleAndCallback to_notify; |
- { |
- base::AutoLock lock(lock_); |
- IDToCallbackMap::iterator i = id_to_callback_.find(id); |
- if (i == id_to_callback_.end()) |
- return; |
- to_notify = i->second; |
- id_to_callback_.erase(i); |
- } |
- to_notify.message_loop->PostTask(FROM_HERE, |
- base::Bind(to_notify.callback, result)); |
-} |
- |
-void WatcherThreadManager::RemoveHandle(MojoHandle handle) { |
- { |
- base::AutoLock lock(lock_); |
- for (IDToCallbackMap::iterator i = id_to_callback_.begin(); |
- i != id_to_callback_.end(); ) { |
- if (i->second.handle == handle) { |
- id_to_callback_.erase(i++); |
- } else { |
- ++i; |
- } |
- } |
- } |
-} |
- |
-WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() { |
- WaitState state; |
- const base::TimeTicks now(HandleWatcher::NowTicks()); |
- base::TimeDelta deadline; |
- { |
- base::AutoLock lock(lock_); |
- for (IDToCallbackMap::const_iterator i = id_to_callback_.begin(); |
- i != id_to_callback_.end(); ++i) { |
- if (!i->second.deadline.is_null()) { |
- if (i->second.deadline <= now) { |
- state.deadline_exceeded.insert(i->first); |
- continue; |
- } else { |
- const base::TimeDelta delta = i->second.deadline - now; |
- if (deadline == base::TimeDelta() || delta < deadline) |
- deadline = delta; |
- } |
- } |
- state.ids.push_back(i->first); |
- state.handles.push_back(i->second.handle); |
- state.wait_flags.push_back(i->second.wait_flags); |
- } |
- } |
- state.ids.push_back(0); |
- state.handles.push_back(read_handle()); |
- state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE); |
- state.deadline = (deadline == base::TimeDelta()) ? |
- MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds(); |
- return state; |
-} |
- |
} // namespace |
// HandleWatcher::StartState --------------------------------------------------- |