Chromium Code Reviews| Index: mojo/common/handle_watcher.cc |
| diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc |
| index a159185ba35bc0029d658ab312168c746627a19a..e1e1ca0e8e1fa6881a23c6ee01e69f79773d475c 100644 |
| --- a/mojo/common/handle_watcher.cc |
| +++ b/mojo/common/handle_watcher.cc |
| @@ -5,19 +5,18 @@ |
| #include "mojo/common/handle_watcher.h" |
| #include <map> |
| -#include <set> |
| -#include <vector> |
| +#include "base/atomic_sequence_num.h" |
| #include "base/bind.h" |
| #include "base/lazy_instance.h" |
| #include "base/memory/weak_ptr.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/message_loop/message_loop_proxy.h" |
| -#include "base/synchronization/lock.h" |
| #include "base/threading/thread.h" |
| #include "base/time/tick_clock.h" |
| #include "base/time/time.h" |
| -#include "mojo/common/scoped_message_pipe.h" |
| +#include "mojo/common/message_pump_mojo.h" |
| +#include "mojo/common/message_pump_mojo_handler.h" |
| namespace mojo { |
| namespace common { |
| @@ -28,13 +27,126 @@ namespace { |
| const char kWatcherThreadName[] = "handle-watcher-thread"; |
| +// TODO(sky): shouldn't be necessary to cache this. |
|
darin (slow to review)
2013/11/21 18:14:52
This is going to be fixed once we sort out the RTT
sky
2013/11/22 19:39:44
I updated the TODO for now.
|
| +MessagePumpMojo* message_pump_mojo_ = NULL; |
|
darin (slow to review)
2013/11/21 18:14:52
nit: shouldn't end with "_"
sky
2013/11/22 19:39:44
Done.
|
| + |
| +scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { |
| + message_pump_mojo_ = new MessagePumpMojo; |
| + return scoped_ptr<base::MessagePump>(message_pump_mojo_).Pass(); |
| +} |
| + |
| +// Tracks the data for a single call to Start(). |
| +struct WatchData { |
| + WatchData() |
| + : id(0), |
| + handle(MOJO_HANDLE_INVALID), |
| + wait_flags(MOJO_WAIT_FLAG_NONE), |
| + message_loop(NULL) {} |
| + |
| + WatcherID id; |
| + MojoHandle handle; |
| + MojoWaitFlags wait_flags; |
| + base::TimeTicks deadline; |
| + base::Callback<void(MojoResult)> callback; |
| + scoped_refptr<base::MessageLoopProxy> message_loop; |
| +}; |
| + |
| +// WatcherBackend -------------------------------------------------------------- |
| + |
| +// WatcherBackend is responsible for managing the requests and interacting with |
| +// MessagePumpMojo. All access (outside of creation/destruction) is done on the |
| +// thread WatcherThreadManager creates. |
| +class WatcherBackend : public MessagePumpMojoHandler { |
| + public: |
| + WatcherBackend(); |
| + virtual ~WatcherBackend(); |
| + |
| + void StartWatching(const WatchData& data); |
| + void StopWatching(WatcherID watcher_id); |
| + |
| + private: |
| + typedef std::map<MojoHandle, WatchData> HandleToWatchDataMap; |
| + |
| + // Invoked when a handle needs to be removed and notified. |
| + void RemoveAndNotify(MojoHandle handle, MojoResult result); |
| + |
| + // Searches through |handle_to_data_| for |watcher_id|. Returns true if found |
| + // and sets |handle| to the MojoHandle. Returns false if not a known id. |
| + bool GetMojoHandleByWatcherID(WatcherID watcher_id, MojoHandle* handle) const; |
| + |
| + // MessagePumpMojoHandler overrides: |
| + virtual void OnHandleReady(MojoHandle handle) OVERRIDE; |
| + virtual void OnHandleError(MojoHandle handle, MojoResult result) OVERRIDE; |
| + |
| + // Maps from assigned id to WatchData. |
| + HandleToWatchDataMap handle_to_data_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(WatcherBackend); |
| +}; |
| + |
| +WatcherBackend::WatcherBackend() { |
| +} |
| + |
| +WatcherBackend::~WatcherBackend() { |
| +} |
| + |
| +void WatcherBackend::StartWatching(const WatchData& data) { |
| + RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
| + |
| + DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
| + |
| + handle_to_data_[data.handle] = data; |
| + message_pump_mojo_->AddHandler(this, data.handle, |
| + data.wait_flags, |
| + data.deadline); |
| +} |
| + |
| +void WatcherBackend::StopWatching(WatcherID watcher_id) { |
| + // Because of the thread hop it is entirely possible to get here and not |
| + // have a valid handle registered for |watcher_id|. |
| + MojoHandle handle; |
| + if (!GetMojoHandleByWatcherID(watcher_id, &handle)) |
| + return; |
| + |
| + handle_to_data_.erase(handle); |
| + message_pump_mojo_->RemoveHandler(handle); |
| +} |
| + |
| +void WatcherBackend::RemoveAndNotify(MojoHandle handle, |
| + MojoResult result) { |
| + if (handle_to_data_.count(handle) == 0) |
| + return; |
| + |
| + const WatchData data(handle_to_data_[handle]); |
| + handle_to_data_.erase(handle); |
| + message_pump_mojo_->RemoveHandler(handle); |
| + data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); |
|
darin (slow to review)
2013/11/21 18:14:52
Hmm, when called from OnHandleReady, this PostTask
sky
2013/11/22 19:39:44
Remember this is on the backend thread, so the Pos
|
| +} |
| + |
| +bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, |
| + MojoHandle* handle) const { |
| + for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); |
| + i != handle_to_data_.end(); ++i) { |
| + if (i->second.id == watcher_id) { |
| + *handle = i->second.handle; |
| + return true; |
| + } |
| + } |
| + return false; |
| +} |
| + |
| +void WatcherBackend::OnHandleReady(MojoHandle handle) { |
| + RemoveAndNotify(handle, MOJO_RESULT_OK); |
| +} |
| + |
| +void WatcherBackend::OnHandleError(MojoHandle handle, MojoResult result) { |
| + RemoveAndNotify(handle, result); |
| +} |
| + |
| // WatcherThreadManager -------------------------------------------------------- |
| -// WatcherThreadManager manages listening for the handles. It is a singleton. It |
| -// spawns a thread that waits for any handle passed to StartWatching() to be |
| -// ready. Additionally it creates a message pipe for communication between the |
| -// two threads. The message pipe is used solely to wake up the background |
| -// thread. This happens when the set of handles changes, or during shutdown. |
| +// WatcherThreadManager manages the background thread that listens for handles |
| +// to be ready. All requests are handled by WatcherBackend. |
| class WatcherThreadManager { |
| public: |
| // Returns the shared instance. |
| @@ -50,84 +162,20 @@ class WatcherThreadManager { |
| const base::Callback<void(MojoResult)>& callback); |
| // Stops watching a handle. |
| + // This may be invoked on any thread. |
| void StopWatching(WatcherID watcher_id); |
| private: |
| friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; |
| - // Tracks a single request. |
| - struct HandleAndCallback { |
| - HandleAndCallback() |
| - : handle(MOJO_HANDLE_INVALID), |
| - wait_flags(MOJO_WAIT_FLAG_NONE), |
| - message_loop(NULL) {} |
| - |
| - MojoHandle handle; |
| - MojoWaitFlags wait_flags; |
| - base::TimeTicks deadline; |
| - base::Callback<void(MojoResult)> callback; |
| - scoped_refptr<base::MessageLoopProxy> message_loop; |
| - }; |
| - |
| - // Contains the state needed for MojoWaitMany. |
| - // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to |
| - // pass to MojoWaitMany. |
| - struct WaitState { |
| - // List of ids. |
| - std::vector<WatcherID> ids; |
| - |
| - // List of handles. |
| - std::vector<MojoHandle> handles; |
| - |
| - // List of flags each handle is waiting on. |
| - std::vector<MojoWaitFlags> wait_flags; |
| - |
| - // First deadline. |
| - MojoDeadline deadline; |
| - |
| - // Set of ids whose deadline has been reached. |
| - std::set<WatcherID> deadline_exceeded; |
| - }; |
| - |
| - typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap; |
| - |
| WatcherThreadManager(); |
| ~WatcherThreadManager(); |
| - // Invoked on the background thread. Runs a loop waiting on current set of |
| - // handles. |
| - void RunOnBackgroundThread(); |
| - |
| - // Writes to the communication pipe to wake up the background thread. |
| - void SignalBackgroundThread(); |
| - |
| - // Invoked when a handle associated with |id| should be removed and notified. |
| - // |result| gives the reason for removing. |
| - void RemoveAndNotify(WatcherID id, MojoResult result); |
| - |
| - // Removes all callbacks schedule for |handle|. This is used when a handle |
| - // is identified as invalid. |
| - void RemoveHandle(MojoHandle handle); |
| - |
| - MojoHandle read_handle() const { return control_pipe_.handle_0(); } |
| - MojoHandle write_handle() const { return control_pipe_.handle_1(); } |
| - |
| - // Returns state needed for MojoWaitMany. |
| - WaitState GetWaitState(); |
| - |
| - // Guards members accessed on both threads. |
| - base::Lock lock_; |
| - |
| - // Used for communicating with the background thread. |
| - ScopedMessagePipe control_pipe_; |
| - |
| base::Thread thread_; |
| - // Maps from assigned id to the callback. |
| - IDToCallbackMap id_to_callback_; |
| + base::AtomicSequenceNumber watcher_id_generator_; |
| - // If true the background loop should exit. |
| - bool quit_; |
| + WatcherBackend backend_; |
| DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
| }; |
| @@ -143,177 +191,42 @@ WatcherID WatcherThreadManager::StartWatching( |
| MojoWaitFlags wait_flags, |
| base::TimeTicks deadline, |
| const base::Callback<void(MojoResult)>& callback) { |
| - WatcherID id = 0; |
| - { |
| - static int next_id = 0; |
| - base::AutoLock lock(lock_); |
| - // TODO(sky): worry about overflow? |
| - id = ++next_id; |
| - id_to_callback_[id].handle = handle; |
| - id_to_callback_[id].callback = callback; |
| - id_to_callback_[id].wait_flags = wait_flags; |
| - id_to_callback_[id].deadline = deadline; |
| - id_to_callback_[id].message_loop = base::MessageLoopProxy::current(); |
| - } |
| - SignalBackgroundThread(); |
| - return id; |
| + WatchData data; |
| + data.id = watcher_id_generator_.GetNext(); |
| + data.handle = handle; |
| + data.callback = callback; |
| + data.wait_flags = wait_flags; |
| + data.deadline = deadline; |
| + data.message_loop = base::MessageLoopProxy::current(); |
| + // We outlive |thread_|, so it's safe to use Unretained() here. |
| + thread_.message_loop()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&WatcherBackend::StartWatching, |
| + base::Unretained(&backend_), |
| + data)); |
| + return data.id; |
| } |
| - |
| void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
| - { |
| - base::AutoLock lock(lock_); |
| - // It's possible we've already serviced the handle but HandleWatcher hasn't |
| - // processed it yet. |
| - IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id); |
| - if (i == id_to_callback_.end()) |
| - return; |
| - id_to_callback_.erase(i); |
| - } |
| - SignalBackgroundThread(); |
| + // We outlive |thread_|, so it's safe to use Unretained() here. |
| + thread_.message_loop()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&WatcherBackend::StopWatching, |
| + base::Unretained(&backend_), |
| + watcher_id)); |
| } |
| WatcherThreadManager::WatcherThreadManager() |
| - : thread_(kWatcherThreadName), |
| - quit_(false) { |
| - // TODO(sky): deal with error condition? |
| - CHECK_NE(MOJO_HANDLE_INVALID, read_handle()); |
| - thread_.Start(); |
| - thread_.message_loop()->PostTask( |
| - FROM_HERE, |
| - base::Bind(&WatcherThreadManager::RunOnBackgroundThread, |
| - base::Unretained(this))); |
| + : thread_(kWatcherThreadName) { |
| + base::Thread::Options thread_options; |
| + thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); |
| + thread_.StartWithOptions(thread_options); |
| } |
| WatcherThreadManager::~WatcherThreadManager() { |
| - { |
| - base::AutoLock lock(lock_); |
| - quit_ = true; |
| - } |
| - SignalBackgroundThread(); |
| - |
| thread_.Stop(); |
| } |
| -void WatcherThreadManager::RunOnBackgroundThread() { |
| - while (true) { |
| - const WaitState state = GetWaitState(); |
| - for (std::set<WatcherID>::const_iterator i = |
| - state.deadline_exceeded.begin(); |
| - i != state.deadline_exceeded.end(); ++i) { |
| - RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED); |
| - } |
| - const MojoResult result = MojoWaitMany(&state.handles.front(), |
| - &state.wait_flags.front(), |
| - state.handles.size(), |
| - state.deadline); |
| - |
| - if (result >= 0) { |
| - DCHECK_LT(result, static_cast<int>(state.handles.size())); |
| - // Last handle is used to wake us up. |
| - if (result == static_cast<int>(state.handles.size()) - 1) { |
| - uint32_t num_bytes = 0; |
| - MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0, |
| - MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| - { |
| - base::AutoLock lock(lock_); |
| - if (quit_) |
| - return; |
| - } |
| - } else { |
| - RemoveAndNotify(state.ids[result], MOJO_RESULT_OK); |
| - } |
| - } else if (result == MOJO_RESULT_INVALID_ARGUMENT || |
| - result == MOJO_RESULT_FAILED_PRECONDITION) { |
| - // One of the handles is invalid or the flags supplied is invalid, remove |
| - // it. |
| - // Use -1 as last handle is used for communication and should never be |
| - // invalid. |
| - for (size_t i = 0; i < state.handles.size() - 1; ++i) { |
| - const MojoResult result = |
| - MojoWait(state.handles[i], state.wait_flags[i], 0); |
| - switch (result) { |
| - // TODO: do we really want to notify for all these conditions? |
| - case MOJO_RESULT_OK: |
| - case MOJO_RESULT_FAILED_PRECONDITION: |
| - case MOJO_RESULT_INVALID_ARGUMENT: |
| - RemoveAndNotify(state.ids[i], result); |
| - break; |
| - case MOJO_RESULT_DEADLINE_EXCEEDED: |
| - break; |
| - default: |
| - NOTREACHED(); |
| - } |
| - } |
| - } |
| - } |
| -} |
| - |
| -void WatcherThreadManager::SignalBackgroundThread() { |
| - // TODO(sky): deal with error? |
| - MojoWriteMessage(write_handle(), NULL, 0, NULL, 0, |
| - MOJO_WRITE_MESSAGE_FLAG_NONE); |
| -} |
| - |
| -void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) { |
| - HandleAndCallback to_notify; |
| - { |
| - base::AutoLock lock(lock_); |
| - IDToCallbackMap::iterator i = id_to_callback_.find(id); |
| - if (i == id_to_callback_.end()) |
| - return; |
| - to_notify = i->second; |
| - id_to_callback_.erase(i); |
| - } |
| - to_notify.message_loop->PostTask(FROM_HERE, |
| - base::Bind(to_notify.callback, result)); |
| -} |
| - |
| -void WatcherThreadManager::RemoveHandle(MojoHandle handle) { |
| - { |
| - base::AutoLock lock(lock_); |
| - for (IDToCallbackMap::iterator i = id_to_callback_.begin(); |
| - i != id_to_callback_.end(); ) { |
| - if (i->second.handle == handle) { |
| - id_to_callback_.erase(i++); |
| - } else { |
| - ++i; |
| - } |
| - } |
| - } |
| -} |
| - |
| -WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() { |
| - WaitState state; |
| - const base::TimeTicks now(HandleWatcher::NowTicks()); |
| - base::TimeDelta deadline; |
| - { |
| - base::AutoLock lock(lock_); |
| - for (IDToCallbackMap::const_iterator i = id_to_callback_.begin(); |
| - i != id_to_callback_.end(); ++i) { |
| - if (!i->second.deadline.is_null()) { |
| - if (i->second.deadline <= now) { |
| - state.deadline_exceeded.insert(i->first); |
| - continue; |
| - } else { |
| - const base::TimeDelta delta = i->second.deadline - now; |
| - if (deadline == base::TimeDelta() || delta < deadline) |
| - deadline = delta; |
| - } |
| - } |
| - state.ids.push_back(i->first); |
| - state.handles.push_back(i->second.handle); |
| - state.wait_flags.push_back(i->second.wait_flags); |
| - } |
| - } |
| - state.ids.push_back(0); |
| - state.handles.push_back(read_handle()); |
| - state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE); |
| - state.deadline = (deadline == base::TimeDelta()) ? |
| - MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds(); |
| - return state; |
| -} |
| - |
| } // namespace |
| // HandleWatcher::StartState --------------------------------------------------- |