| Index: mojo/common/handle_watcher.cc
|
| diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc
|
| index 0c7cdef63d5743300af530b84d29ce8e16e096c3..33bb0bdeaa7daae80a0c74e46b688052b1164c9c 100644
|
| --- a/mojo/common/handle_watcher.cc
|
| +++ b/mojo/common/handle_watcher.cc
|
| @@ -71,9 +71,8 @@ class WatcherBackend : public MessagePumpMojoHandler {
|
|
|
| void StartWatching(const WatchData& data);
|
|
|
| - // Cancels a previously schedule request to start a watch. When done signals
|
| - // |event|.
|
| - void StopWatching(WatcherID watcher_id, base::WaitableEvent* event);
|
| + // Cancels a previously scheduled request to start a watch.
|
| + void StopWatching(WatcherID watcher_id);
|
|
|
| private:
|
| typedef std::map<Handle, WatchData> HandleToWatchDataMap;
|
| @@ -112,8 +111,7 @@ void WatcherBackend::StartWatching(const WatchData& data) {
|
| data.deadline);
|
| }
|
|
|
| -void WatcherBackend::StopWatching(WatcherID watcher_id,
|
| - base::WaitableEvent* event) {
|
| +void WatcherBackend::StopWatching(WatcherID watcher_id) {
|
| // Because of the thread hop it is entirely possible to get here and not
|
| // have a valid handle registered for |watcher_id|.
|
| Handle handle;
|
| @@ -121,7 +119,6 @@ void WatcherBackend::StopWatching(WatcherID watcher_id,
|
| handle_to_data_.erase(handle);
|
| message_pump_mojo->RemoveHandler(handle);
|
| }
|
| - event->Signal();
|
| }
|
|
|
| void WatcherBackend::RemoveAndNotify(const Handle& handle,
|
| @@ -182,15 +179,47 @@ class WatcherThreadManager {
|
| void StopWatching(WatcherID watcher_id);
|
|
|
| private:
|
| + enum RequestType {
|
| + REQUEST_START,
|
| + REQUEST_STOP,
|
| + };
|
| +
|
| + // See description of |requests_| for details.
|
| + struct RequestData {
|
| + RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
|
| +
|
| + RequestType type;
|
| + WatchData start_data;
|
| + WatcherID stop_id;
|
| + base::WaitableEvent* stop_event;
|
| + };
|
| +
|
| + typedef std::vector<RequestData> Requests;
|
| +
|
| friend struct DefaultSingletonTraits<WatcherThreadManager>;
|
| +
|
| WatcherThreadManager();
|
|
|
| + // Schedules a request on the background thread. See |requests_| for details.
|
| + void AddRequest(const RequestData& data);
|
| +
|
| + // Processes requests added to |requests_|. This is invoked on the backend
|
| + // thread.
|
| + void ProcessRequestsOnBackendThread();
|
| +
|
| base::Thread thread_;
|
|
|
| base::AtomicSequenceNumber watcher_id_generator_;
|
|
|
| WatcherBackend backend_;
|
|
|
| + // Protects |requests_|.
|
| + base::Lock lock_;
|
| +
|
| + // Start/Stop result in adding a RequestData to |requests_| (protected by
|
| + // |lock_|). When the background thread wakes up it processes the requests.
|
| + Requests requests_;
|
| +
|
| DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
|
| };
|
|
|
| @@ -207,37 +236,77 @@ WatcherID WatcherThreadManager::StartWatching(
|
| MojoHandleSignals handle_signals,
|
| base::TimeTicks deadline,
|
| const base::Callback<void(MojoResult)>& callback) {
|
| - WatchData data;
|
| - data.id = watcher_id_generator_.GetNext();
|
| - data.handle = handle;
|
| - data.callback = callback;
|
| - data.handle_signals = handle_signals;
|
| - data.deadline = deadline;
|
| - data.message_loop = base::MessageLoopProxy::current();
|
| + RequestData request_data;
|
| + request_data.type = REQUEST_START;
|
| + request_data.start_data.id = watcher_id_generator_.GetNext();
|
| + request_data.start_data.handle = handle;
|
| + request_data.start_data.callback = callback;
|
| + request_data.start_data.handle_signals = handle_signals;
|
| + request_data.start_data.deadline = deadline;
|
| + request_data.start_data.message_loop = base::MessageLoopProxy::current();
|
| DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
|
| - data.message_loop.get());
|
| - // We own |thread_|, so it's safe to use Unretained() here.
|
| - thread_.message_loop()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&WatcherBackend::StartWatching,
|
| - base::Unretained(&backend_),
|
| - data));
|
| - return data.id;
|
| + request_data.start_data.message_loop.get());
|
| + AddRequest(request_data);
|
| + return request_data.start_data.id;
|
| }
|
|
|
| void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
|
| + // Handle the case of StartWatching() followed by StopWatching() before
|
| + // |thread_| woke up.
|
| + {
|
| + base::AutoLock auto_lock(lock_);
|
| + for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
|
| + if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
|
| + // Watcher ids are not reused, so if we find it we can stop.
|
| + requests_.erase(i);
|
| + return;
|
| + }
|
| + }
|
| + }
|
| +
|
| base::ThreadRestrictions::ScopedAllowWait allow_wait;
|
| base::WaitableEvent event(true, false);
|
| + RequestData request_data;
|
| + request_data.type = REQUEST_STOP;
|
| + request_data.stop_id = watcher_id;
|
| + request_data.stop_event = &event;
|
| + AddRequest(request_data);
|
| +
|
| + // We need to block until the handle is actually removed.
|
| + event.Wait();
|
| +}
|
| +
|
| +void WatcherThreadManager::AddRequest(const RequestData& data) {
|
| + {
|
| + base::AutoLock auto_lock(lock_);
|
| + const bool was_empty = requests_.empty();
|
| + requests_.push_back(data);
|
| + if (!was_empty)
|
| + return;
|
| + }
|
| // We own |thread_|, so it's safe to use Unretained() here.
|
| thread_.message_loop()->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&WatcherBackend::StopWatching,
|
| - base::Unretained(&backend_),
|
| - watcher_id,
|
| - &event));
|
| + base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
|
| + base::Unretained(this)));
|
| +}
|
|
|
| - // We need to block until the handle is actually removed.
|
| - event.Wait();
|
| +void WatcherThreadManager::ProcessRequestsOnBackendThread() {
|
| + DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
|
| +
|
| + Requests requests;
|
| + {
|
| + base::AutoLock auto_lock(lock_);
|
| + requests_.swap(requests);
|
| + }
|
| + for (size_t i = 0; i < requests.size(); ++i) {
|
| + if (requests[i].type == REQUEST_START) {
|
| + backend_.StartWatching(requests[i].start_data);
|
| + } else {
|
| + backend_.StopWatching(requests[i].stop_id);
|
| + requests[i].stop_event->Signal();
|
| + }
|
| + }
|
| }
|
|
|
| WatcherThreadManager::WatcherThreadManager()
|
| @@ -261,6 +330,7 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
|
| const base::Callback<void(MojoResult)>& callback)
|
| : watcher_(watcher),
|
| callback_(callback),
|
| + got_ready_(false),
|
| weak_factory_(this) {
|
| base::MessageLoop::current()->AddDestructionObserver(this);
|
|
|
| @@ -274,16 +344,27 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
|
| virtual ~State() {
|
| base::MessageLoop::current()->RemoveDestructionObserver(this);
|
|
|
| - WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
|
| + // If we've been notified the handle is ready (|got_ready_| is true) then
|
| + // the watch has been implicitly removed by
|
| + // WatcherThreadManager/MessagePumpMojo and we don't have to call
|
| + // StopWatching(). To do so would needlessly entail posting a task and
|
| + // blocking until the background thread services it.
|
| + if (!got_ready_)
|
| + WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
|
| }
|
|
|
| private:
|
| virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
|
| // The current thread is exiting. Simulate a watch error.
|
| - OnHandleReady(MOJO_RESULT_ABORTED);
|
| + NotifyAndDestroy(MOJO_RESULT_ABORTED);
|
| }
|
|
|
| void OnHandleReady(MojoResult result) {
|
| + got_ready_ = true;
|
| + NotifyAndDestroy(result);
|
| + }
|
| +
|
| + void NotifyAndDestroy(MojoResult result) {
|
| base::Callback<void(MojoResult)> callback = callback_;
|
| watcher_->Stop(); // Destroys |this|.
|
|
|
| @@ -294,6 +375,9 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
|
| WatcherID watcher_id_;
|
| base::Callback<void(MojoResult)> callback_;
|
|
|
| + // Have we been notified that the handle is ready?
|
| + bool got_ready_;
|
| +
|
| // Used to weakly bind |this| to the WatcherThreadManager.
|
| base::WeakPtrFactory<State> weak_factory_;
|
| };
|
|
|