Chromium Code Reviews| Index: mojo/common/handle_watcher.cc |
| diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc |
| index 0c7cdef63d5743300af530b84d29ce8e16e096c3..5657626747e7c0f0feac9b9c5971a842fcfe61cc 100644 |
| --- a/mojo/common/handle_watcher.cc |
| +++ b/mojo/common/handle_watcher.cc |
| @@ -71,9 +71,8 @@ class WatcherBackend : public MessagePumpMojoHandler { |
| void StartWatching(const WatchData& data); |
| - // Cancels a previously schedule request to start a watch. When done signals |
| - // |event|. |
| - void StopWatching(WatcherID watcher_id, base::WaitableEvent* event); |
| + // Cancels a previously schedule request to start a watch. |
|
darin (slow to review)
2014/08/21 23:55:12
nit: schedule -> scheduled
sky
2014/08/22 00:03:51
Done.
|
| + void StopWatching(WatcherID watcher_id); |
| private: |
| typedef std::map<Handle, WatchData> HandleToWatchDataMap; |
| @@ -112,8 +111,7 @@ void WatcherBackend::StartWatching(const WatchData& data) { |
| data.deadline); |
| } |
| -void WatcherBackend::StopWatching(WatcherID watcher_id, |
| - base::WaitableEvent* event) { |
| +void WatcherBackend::StopWatching(WatcherID watcher_id) { |
| // Because of the thread hop it is entirely possible to get here and not |
| // have a valid handle registered for |watcher_id|. |
| Handle handle; |
| @@ -121,7 +119,6 @@ void WatcherBackend::StopWatching(WatcherID watcher_id, |
| handle_to_data_.erase(handle); |
| message_pump_mojo->RemoveHandler(handle); |
| } |
| - event->Signal(); |
| } |
| void WatcherBackend::RemoveAndNotify(const Handle& handle, |
| @@ -182,15 +179,47 @@ class WatcherThreadManager { |
| void StopWatching(WatcherID watcher_id); |
| private: |
| + enum RequestType { |
| + REQUEST_START, |
| + REQUEST_STOP, |
| + }; |
| + |
| + // See description of |requests_| for details. |
| + struct RequestData { |
| + RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} |
| + |
| + RequestType type; |
| + WatchData start_data; |
| + WatcherID stop_id; |
| + base::WaitableEvent* stop_event; |
| + }; |
| + |
| + typedef std::vector<RequestData> Requests; |
| + |
| friend struct DefaultSingletonTraits<WatcherThreadManager>; |
| + |
| WatcherThreadManager(); |
| + // Schedules a request on the background thread. See |requests_| for details. |
| + void AddRequest(const RequestData& data); |
| + |
| + // Processes requests added to |requests_|. This is invoked on the backend |
| + // thread. |
| + void ProcessRequestsOnBackendThread(); |
| + |
| base::Thread thread_; |
| base::AtomicSequenceNumber watcher_id_generator_; |
| WatcherBackend backend_; |
| + // Protects |requests_|. |
| + base::Lock lock_; |
| + |
| + // Start/Stop result in adding a RequestData to |requests_| (protected by |
| + // |lock_|). When the background thread wakes up it processes the requests. |
| + Requests requests_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
| }; |
| @@ -207,37 +236,77 @@ WatcherID WatcherThreadManager::StartWatching( |
| MojoHandleSignals handle_signals, |
| base::TimeTicks deadline, |
| const base::Callback<void(MojoResult)>& callback) { |
| - WatchData data; |
| - data.id = watcher_id_generator_.GetNext(); |
| - data.handle = handle; |
| - data.callback = callback; |
| - data.handle_signals = handle_signals; |
| - data.deadline = deadline; |
| - data.message_loop = base::MessageLoopProxy::current(); |
| + RequestData request_data; |
| + request_data.type = REQUEST_START; |
| + request_data.start_data.id = watcher_id_generator_.GetNext(); |
| + request_data.start_data.handle = handle; |
| + request_data.start_data.callback = callback; |
| + request_data.start_data.handle_signals = handle_signals; |
| + request_data.start_data.deadline = deadline; |
| + request_data.start_data.message_loop = base::MessageLoopProxy::current(); |
| DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), |
| - data.message_loop.get()); |
| - // We own |thread_|, so it's safe to use Unretained() here. |
| - thread_.message_loop()->PostTask( |
| - FROM_HERE, |
| - base::Bind(&WatcherBackend::StartWatching, |
| - base::Unretained(&backend_), |
| - data)); |
| - return data.id; |
| + request_data.start_data.message_loop.get()); |
| + AddRequest(request_data); |
| + return request_data.start_data.id; |
| } |
| void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
| + // Handle the case of StartWatching() followed by StopWatching() before |
| + // |thread_| woke up. |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { |
| + if (i->type == REQUEST_START && i->start_data.id == watcher_id) { |
| + // Watcher ids are not reused, so if we find it we can stop. |
| + requests_.erase(i); |
| + return; |
| + } |
| + } |
| + } |
| + |
| base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| base::WaitableEvent event(true, false); |
| + RequestData request_data; |
| + request_data.type = REQUEST_STOP; |
| + request_data.stop_id = watcher_id; |
| + request_data.stop_event = &event; |
| + AddRequest(request_data); |
| + |
| + // We need to block until the handle is actually removed. |
| + event.Wait(); |
| +} |
| + |
| +void WatcherThreadManager::AddRequest(const RequestData& data) { |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + const bool was_empty = requests_.empty(); |
| + requests_.push_back(data); |
| + if (!was_empty) |
| + return; |
| + } |
| // We own |thread_|, so it's safe to use Unretained() here. |
| thread_.message_loop()->PostTask( |
| FROM_HERE, |
| - base::Bind(&WatcherBackend::StopWatching, |
| - base::Unretained(&backend_), |
| - watcher_id, |
| - &event)); |
| + base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, |
| + base::Unretained(this))); |
| +} |
| - // We need to block until the handle is actually removed. |
| - event.Wait(); |
| +void WatcherThreadManager::ProcessRequestsOnBackendThread() { |
| + DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); |
| + |
| + Requests requests; |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + requests_.swap(requests); |
| + } |
| + for (size_t i = 0; i < requests.size(); ++i) { |
| + if (requests[i].type == REQUEST_START) { |
| + backend_.StartWatching(requests[i].start_data); |
| + } else { |
| + backend_.StopWatching(requests[i].stop_id); |
| + requests[i].stop_event->Signal(); |
| + } |
| + } |
| } |
| WatcherThreadManager::WatcherThreadManager() |
| @@ -261,6 +330,7 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver { |
| const base::Callback<void(MojoResult)>& callback) |
| : watcher_(watcher), |
| callback_(callback), |
| + got_ready_(false), |
| weak_factory_(this) { |
| base::MessageLoop::current()->AddDestructionObserver(this); |
| @@ -274,16 +344,22 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver { |
| virtual ~State() { |
| base::MessageLoop::current()->RemoveDestructionObserver(this); |
| - WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); |
| + if (!got_ready_) |
|
darin (slow to review)
2014/08/21 23:55:12
isn't there a race condition here? what if StartWa
sky
2014/08/22 00:03:51
Then got_ready_ would be false, right?
|
| + WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); |
| } |
| private: |
| virtual void WillDestroyCurrentMessageLoop() OVERRIDE { |
| // The current thread is exiting. Simulate a watch error. |
| - OnHandleReady(MOJO_RESULT_ABORTED); |
| + NotifyAndDestroy(MOJO_RESULT_ABORTED); |
| } |
| void OnHandleReady(MojoResult result) { |
| + got_ready_ = true; |
| + NotifyAndDestroy(result); |
| + } |
| + |
| + void NotifyAndDestroy(MojoResult result) { |
| base::Callback<void(MojoResult)> callback = callback_; |
| watcher_->Stop(); // Destroys |this|. |
| @@ -294,6 +370,9 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver { |
| WatcherID watcher_id_; |
| base::Callback<void(MojoResult)> callback_; |
| + // Have we been notified that the handle is ready? |
| + bool got_ready_; |
| + |
| // Used to weakly bind |this| to the WatcherThreadManager. |
| base::WeakPtrFactory<State> weak_factory_; |
| }; |