Index: mojo/common/handle_watcher.cc |
diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc |
index 0c7cdef63d5743300af530b84d29ce8e16e096c3..33bb0bdeaa7daae80a0c74e46b688052b1164c9c 100644 |
--- a/mojo/common/handle_watcher.cc |
+++ b/mojo/common/handle_watcher.cc |
@@ -71,9 +71,8 @@ class WatcherBackend : public MessagePumpMojoHandler { |
void StartWatching(const WatchData& data); |
- // Cancels a previously schedule request to start a watch. When done signals |
- // |event|. |
- void StopWatching(WatcherID watcher_id, base::WaitableEvent* event); |
+ // Cancels a previously scheduled request to start a watch. |
+ void StopWatching(WatcherID watcher_id); |
private: |
typedef std::map<Handle, WatchData> HandleToWatchDataMap; |
@@ -112,8 +111,7 @@ void WatcherBackend::StartWatching(const WatchData& data) { |
data.deadline); |
} |
-void WatcherBackend::StopWatching(WatcherID watcher_id, |
- base::WaitableEvent* event) { |
+void WatcherBackend::StopWatching(WatcherID watcher_id) { |
// Because of the thread hop it is entirely possible to get here and not |
// have a valid handle registered for |watcher_id|. |
Handle handle; |
@@ -121,7 +119,6 @@ void WatcherBackend::StopWatching(WatcherID watcher_id, |
handle_to_data_.erase(handle); |
message_pump_mojo->RemoveHandler(handle); |
} |
- event->Signal(); |
} |
void WatcherBackend::RemoveAndNotify(const Handle& handle, |
@@ -182,15 +179,47 @@ class WatcherThreadManager { |
void StopWatching(WatcherID watcher_id); |
private: |
+ enum RequestType { |
+ REQUEST_START, |
+ REQUEST_STOP, |
+ }; |
+ |
+ // See description of |requests_| for details. |
+ struct RequestData { |
+ RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} |
+ |
+ RequestType type; |
+ WatchData start_data; |
+ WatcherID stop_id; |
+ base::WaitableEvent* stop_event; |
+ }; |
+ |
+ typedef std::vector<RequestData> Requests; |
+ |
friend struct DefaultSingletonTraits<WatcherThreadManager>; |
+ |
WatcherThreadManager(); |
+ // Schedules a request on the background thread. See |requests_| for details. |
+ void AddRequest(const RequestData& data); |
+ |
+ // Processes requests added to |requests_|. This is invoked on the backend |
+ // thread. |
+ void ProcessRequestsOnBackendThread(); |
+ |
base::Thread thread_; |
base::AtomicSequenceNumber watcher_id_generator_; |
WatcherBackend backend_; |
+ // Protects |requests_|. |
+ base::Lock lock_; |
+ |
+ // Start/Stop result in adding a RequestData to |requests_| (protected by |
+ // |lock_|). When the background thread wakes up it processes the requests. |
+ Requests requests_; |
+ |
DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
}; |
@@ -207,37 +236,77 @@ WatcherID WatcherThreadManager::StartWatching( |
MojoHandleSignals handle_signals, |
base::TimeTicks deadline, |
const base::Callback<void(MojoResult)>& callback) { |
- WatchData data; |
- data.id = watcher_id_generator_.GetNext(); |
- data.handle = handle; |
- data.callback = callback; |
- data.handle_signals = handle_signals; |
- data.deadline = deadline; |
- data.message_loop = base::MessageLoopProxy::current(); |
+ RequestData request_data; |
+ request_data.type = REQUEST_START; |
+ request_data.start_data.id = watcher_id_generator_.GetNext(); |
+ request_data.start_data.handle = handle; |
+ request_data.start_data.callback = callback; |
+ request_data.start_data.handle_signals = handle_signals; |
+ request_data.start_data.deadline = deadline; |
+ request_data.start_data.message_loop = base::MessageLoopProxy::current(); |
DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), |
- data.message_loop.get()); |
- // We own |thread_|, so it's safe to use Unretained() here. |
- thread_.message_loop()->PostTask( |
- FROM_HERE, |
- base::Bind(&WatcherBackend::StartWatching, |
- base::Unretained(&backend_), |
- data)); |
- return data.id; |
+ request_data.start_data.message_loop.get()); |
+ AddRequest(request_data); |
+ return request_data.start_data.id; |
} |
void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
+ // Handle the case of StartWatching() followed by StopWatching() before |
+ // |thread_| woke up. |
+ { |
+ base::AutoLock auto_lock(lock_); |
+ for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { |
+ if (i->type == REQUEST_START && i->start_data.id == watcher_id) { |
+ // Watcher ids are not reused, so if we find it we can stop. |
+ requests_.erase(i); |
+ return; |
+ } |
+ } |
+ } |
+ |
base::ThreadRestrictions::ScopedAllowWait allow_wait; |
base::WaitableEvent event(true, false); |
+ RequestData request_data; |
+ request_data.type = REQUEST_STOP; |
+ request_data.stop_id = watcher_id; |
+ request_data.stop_event = &event; |
+ AddRequest(request_data); |
+ |
+ // We need to block until the handle is actually removed. |
+ event.Wait(); |
+} |
+ |
+void WatcherThreadManager::AddRequest(const RequestData& data) { |
+ { |
+ base::AutoLock auto_lock(lock_); |
+ const bool was_empty = requests_.empty(); |
+ requests_.push_back(data); |
+ if (!was_empty) |
+ return; |
+ } |
// We own |thread_|, so it's safe to use Unretained() here. |
thread_.message_loop()->PostTask( |
FROM_HERE, |
- base::Bind(&WatcherBackend::StopWatching, |
- base::Unretained(&backend_), |
- watcher_id, |
- &event)); |
+ base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, |
+ base::Unretained(this))); |
+} |
- // We need to block until the handle is actually removed. |
- event.Wait(); |
+void WatcherThreadManager::ProcessRequestsOnBackendThread() { |
+ DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); |
+ |
+ Requests requests; |
+ { |
+ base::AutoLock auto_lock(lock_); |
+ requests_.swap(requests); |
+ } |
+ for (size_t i = 0; i < requests.size(); ++i) { |
+ if (requests[i].type == REQUEST_START) { |
+ backend_.StartWatching(requests[i].start_data); |
+ } else { |
+ backend_.StopWatching(requests[i].stop_id); |
+ requests[i].stop_event->Signal(); |
+ } |
+ } |
} |
WatcherThreadManager::WatcherThreadManager() |
@@ -261,6 +330,7 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver { |
const base::Callback<void(MojoResult)>& callback) |
: watcher_(watcher), |
callback_(callback), |
+ got_ready_(false), |
weak_factory_(this) { |
base::MessageLoop::current()->AddDestructionObserver(this); |
@@ -274,16 +344,27 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver { |
virtual ~State() { |
base::MessageLoop::current()->RemoveDestructionObserver(this); |
- WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); |
+ // If we've been notified the handle is ready (|got_ready_| is true) then |
+ // the watch has been implicitly removed by |
+ // WatcherThreadManager/MessagePumpMojo and we don't have to call |
+ // StopWatching(). To do so would needlessly entail posting a task and |
+ // blocking until the background thread services it. |
+ if (!got_ready_) |
+ WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); |
} |
private: |
virtual void WillDestroyCurrentMessageLoop() OVERRIDE { |
// The current thread is exiting. Simulate a watch error. |
- OnHandleReady(MOJO_RESULT_ABORTED); |
+ NotifyAndDestroy(MOJO_RESULT_ABORTED); |
} |
void OnHandleReady(MojoResult result) { |
+ got_ready_ = true; |
+ NotifyAndDestroy(result); |
+ } |
+ |
+ void NotifyAndDestroy(MojoResult result) { |
base::Callback<void(MojoResult)> callback = callback_; |
watcher_->Stop(); // Destroys |this|. |
@@ -294,6 +375,9 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver { |
WatcherID watcher_id_; |
base::Callback<void(MojoResult)> callback_; |
+ // Have we been notified that the handle is ready? |
+ bool got_ready_; |
+ |
// Used to weakly bind |this| to the WatcherThreadManager. |
base::WeakPtrFactory<State> weak_factory_; |
}; |