| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/message_pump/handle_watcher.h" | 5 #include "mojo/message_pump/handle_watcher.h" |
| 6 | 6 |
| 7 #include <map> | 7 #include <map> |
| 8 | 8 |
| 9 #include "base/atomic_sequence_num.h" | 9 #include "base/atomic_sequence_num.h" |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| 11 #include "base/lazy_instance.h" | 11 #include "base/lazy_instance.h" |
| 12 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/macros.h" | 13 #include "base/macros.h" |
| 14 #include "base/memory/singleton.h" | 14 #include "base/memory/singleton.h" |
| 15 #include "base/memory/weak_ptr.h" | 15 #include "base/memory/weak_ptr.h" |
| 16 #include "base/message_loop/message_loop.h" | 16 #include "base/message_loop/message_loop.h" |
| 17 #include "base/profiler/scoped_tracker.h" | 17 #include "base/profiler/scoped_tracker.h" |
| 18 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
| 19 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
| 20 #include "base/synchronization/waitable_event.h" | |
| 21 #include "base/thread_task_runner_handle.h" | 20 #include "base/thread_task_runner_handle.h" |
| 22 #include "base/threading/thread.h" | 21 #include "base/threading/thread.h" |
| 23 #include "base/threading/thread_restrictions.h" | |
| 24 #include "base/time/time.h" | 22 #include "base/time/time.h" |
| 25 #include "mojo/message_pump/message_pump_mojo.h" | 23 #include "mojo/message_pump/message_pump_mojo.h" |
| 26 #include "mojo/message_pump/message_pump_mojo_handler.h" | 24 #include "mojo/message_pump/message_pump_mojo_handler.h" |
| 27 #include "mojo/message_pump/time_helper.h" | 25 #include "mojo/message_pump/time_helper.h" |
| 28 | 26 |
| 29 namespace mojo { | 27 namespace mojo { |
| 30 namespace common { | 28 namespace common { |
| 31 | 29 |
| 32 typedef int WatcherID; | 30 typedef int WatcherID; |
| 33 | 31 |
| (...skipping 23 matching lines...) Expand all Loading... |
| 57 | 55 |
| 58 // WatcherBackend is responsible for managing the requests and interacting with | 56 // WatcherBackend is responsible for managing the requests and interacting with |
| 59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the | 57 // MessagePumpMojo. All access (outside of creation/destruction) is done on the |
| 60 // thread WatcherThreadManager creates. | 58 // thread WatcherThreadManager creates. |
| 61 class WatcherBackend : public MessagePumpMojoHandler { | 59 class WatcherBackend : public MessagePumpMojoHandler { |
| 62 public: | 60 public: |
| 63 WatcherBackend(); | 61 WatcherBackend(); |
| 64 ~WatcherBackend() override; | 62 ~WatcherBackend() override; |
| 65 | 63 |
| 66 void StartWatching(const WatchData& data); | 64 void StartWatching(const WatchData& data); |
| 67 | |
| 68 // Cancels a previously scheduled request to start a watch. | |
| 69 void StopWatching(WatcherID watcher_id); | 65 void StopWatching(WatcherID watcher_id); |
| 70 | 66 |
| 71 private: | 67 private: |
| 72 typedef std::map<Handle, WatchData> HandleToWatchDataMap; | 68 typedef std::map<Handle, WatchData> HandleToWatchDataMap; |
| 73 | 69 |
| 74 // Invoked when a handle needs to be removed and notified. | 70 // Invoked when a handle needs to be removed and notified. |
| 75 void RemoveAndNotify(const Handle& handle, MojoResult result); | 71 void RemoveAndNotify(const Handle& handle, MojoResult result); |
| 76 | 72 |
| 77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found | 73 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found |
| 78 // and sets |handle| to the Handle. Returns false if not a known id. | 74 // and sets |handle| to the Handle. Returns false if not a known id. |
| (...skipping 23 matching lines...) Expand all Loading... |
| 102 handle_to_data_[data.handle] = data; | 98 handle_to_data_[data.handle] = data; |
| 103 MessagePumpMojo::current()->AddHandler(this, data.handle, | 99 MessagePumpMojo::current()->AddHandler(this, data.handle, |
| 104 data.handle_signals, | 100 data.handle_signals, |
| 105 data.deadline); | 101 data.deadline); |
| 106 } | 102 } |
| 107 | 103 |
| 108 void WatcherBackend::StopWatching(WatcherID watcher_id) { | 104 void WatcherBackend::StopWatching(WatcherID watcher_id) { |
| 109 // Because of the thread hop it is entirely possible to get here and not | 105 // Because of the thread hop it is entirely possible to get here and not |
| 110 // have a valid handle registered for |watcher_id|. | 106 // have a valid handle registered for |watcher_id|. |
| 111 Handle handle; | 107 Handle handle; |
| 112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | 108 if (!GetMojoHandleByWatcherID(watcher_id, &handle)) |
| 113 handle_to_data_.erase(handle); | 109 return; |
| 114 MessagePumpMojo::current()->RemoveHandler(handle); | 110 |
| 115 } | 111 handle_to_data_.erase(handle); |
| 112 MessagePumpMojo::current()->RemoveHandler(handle); |
| 116 } | 113 } |
| 117 | 114 |
| 118 void WatcherBackend::RemoveAndNotify(const Handle& handle, | 115 void WatcherBackend::RemoveAndNotify(const Handle& handle, |
| 119 MojoResult result) { | 116 MojoResult result) { |
| 120 if (handle_to_data_.count(handle) == 0) | 117 if (handle_to_data_.count(handle) == 0) |
| 121 return; | 118 return; |
| 122 | 119 |
| 123 const WatchData data(handle_to_data_[handle]); | 120 const WatchData data(handle_to_data_[handle]); |
| 124 handle_to_data_.erase(handle); | 121 handle_to_data_.erase(handle); |
| 125 MessagePumpMojo::current()->RemoveHandler(handle); | 122 MessagePumpMojo::current()->RemoveHandler(handle); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 144 } | 141 } |
| 145 | 142 |
| 146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { | 143 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { |
| 147 RemoveAndNotify(handle, result); | 144 RemoveAndNotify(handle, result); |
| 148 } | 145 } |
| 149 | 146 |
| 150 // WatcherThreadManager -------------------------------------------------------- | 147 // WatcherThreadManager -------------------------------------------------------- |
| 151 | 148 |
| 152 // WatcherThreadManager manages the background thread that listens for handles | 149 // WatcherThreadManager manages the background thread that listens for handles |
| 153 // to be ready. All requests are handled by WatcherBackend. | 150 // to be ready. All requests are handled by WatcherBackend. |
| 154 } // namespace | |
| 155 | |
| 156 class WatcherThreadManager { | 151 class WatcherThreadManager { |
| 157 public: | 152 public: |
| 158 ~WatcherThreadManager(); | 153 ~WatcherThreadManager(); |
| 159 | 154 |
| 160 // Returns the shared instance. | 155 // Returns the shared instance. |
| 161 static WatcherThreadManager* GetInstance(); | 156 static WatcherThreadManager* GetInstance(); |
| 162 | 157 |
| 163 // Starts watching the requested handle. Returns a unique ID that is used to | 158 // Starts watching the requested handle. Returns a unique ID that is used to |
| 164 // stop watching the handle. When the handle is ready |callback| is notified | 159 // stop watching the handle. When the handle is ready |callback| is notified |
| 165 // on the thread StartWatching() was invoked on. | 160 // on the thread StartWatching() was invoked on. |
| 166 // This may be invoked on any thread. | 161 // This may be invoked on any thread. |
| 167 WatcherID StartWatching(const Handle& handle, | 162 WatcherID StartWatching(const Handle& handle, |
| 168 MojoHandleSignals handle_signals, | 163 MojoHandleSignals handle_signals, |
| 169 base::TimeTicks deadline, | 164 base::TimeTicks deadline, |
| 170 const base::Callback<void(MojoResult)>& callback); | 165 const base::Callback<void(MojoResult)>& callback); |
| 171 | 166 |
| 172 // Stops watching a handle. | 167 // Stops watching a handle. |
| 173 // This may be invoked on any thread. | 168 // This may be invoked on any thread. |
| 174 void StopWatching(WatcherID watcher_id); | 169 void StopWatching(WatcherID watcher_id); |
| 175 | 170 |
| 176 private: | 171 private: |
| 177 enum RequestType { | 172 enum RequestType { |
| 178 REQUEST_START, | 173 REQUEST_START, |
| 179 REQUEST_STOP, | 174 REQUEST_STOP, |
| 180 }; | 175 }; |
| 181 | 176 |
| 182 // See description of |requests_| for details. | 177 // See description of |requests_| for details. |
| 183 struct RequestData { | 178 struct RequestData { |
| 184 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} | 179 RequestData() : type(REQUEST_START), stop_id(0) {} |
| 185 | 180 |
| 186 RequestType type; | 181 RequestType type; |
| 187 WatchData start_data; | 182 WatchData start_data; |
| 188 WatcherID stop_id; | 183 WatcherID stop_id; |
| 189 base::WaitableEvent* stop_event; | |
| 190 }; | 184 }; |
| 191 | 185 |
| 192 typedef std::vector<RequestData> Requests; | 186 typedef std::vector<RequestData> Requests; |
| 193 | 187 |
| 194 friend struct base::DefaultSingletonTraits<WatcherThreadManager>; | 188 friend struct base::DefaultSingletonTraits<WatcherThreadManager>; |
| 195 | 189 |
| 196 WatcherThreadManager(); | 190 WatcherThreadManager(); |
| 197 | 191 |
| 198 // Schedules a request on the background thread. See |requests_| for details. | 192 // Schedules a request on the background thread. See |requests_| for details. |
| 199 void AddRequest(const RequestData& data); | 193 void AddRequest(const RequestData& data); |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 255 return; | 249 return; |
| 256 } | 250 } |
| 257 } | 251 } |
| 258 } | 252 } |
| 259 | 253 |
| 260 // TODO(amistry): Remove ScopedTracker below once http://crbug.com/554761 is | 254 // TODO(amistry): Remove ScopedTracker below once http://crbug.com/554761 is |
| 261 // fixed. | 255 // fixed. |
| 262 tracked_objects::ScopedTracker tracking_profile( | 256 tracked_objects::ScopedTracker tracking_profile( |
| 263 FROM_HERE_WITH_EXPLICIT_FUNCTION( | 257 FROM_HERE_WITH_EXPLICIT_FUNCTION( |
| 264 "554761 WatcherThreadManager::StopWatching")); | 258 "554761 WatcherThreadManager::StopWatching")); |
| 265 base::ThreadRestrictions::ScopedAllowWait allow_wait; | |
| 266 base::WaitableEvent event(true, false); | |
| 267 RequestData request_data; | 259 RequestData request_data; |
| 268 request_data.type = REQUEST_STOP; | 260 request_data.type = REQUEST_STOP; |
| 269 request_data.stop_id = watcher_id; | 261 request_data.stop_id = watcher_id; |
| 270 request_data.stop_event = &event; | |
| 271 AddRequest(request_data); | 262 AddRequest(request_data); |
| 272 | |
| 273 // We need to block until the handle is actually removed. | |
| 274 event.Wait(); | |
| 275 } | 263 } |
| 276 | 264 |
| 277 void WatcherThreadManager::AddRequest(const RequestData& data) { | 265 void WatcherThreadManager::AddRequest(const RequestData& data) { |
| 278 { | 266 { |
| 279 base::AutoLock auto_lock(lock_); | 267 base::AutoLock auto_lock(lock_); |
| 280 const bool was_empty = requests_.empty(); | 268 const bool was_empty = requests_.empty(); |
| 281 requests_.push_back(data); | 269 requests_.push_back(data); |
| 282 if (!was_empty) | 270 if (!was_empty) |
| 283 return; | 271 return; |
| 284 } | 272 } |
| 285 // We own |thread_|, so it's safe to use Unretained() here. | 273 // We outlive |thread_|, so it's safe to use Unretained() here. |
| 286 thread_.task_runner()->PostTask( | 274 thread_.task_runner()->PostTask( |
| 287 FROM_HERE, | 275 FROM_HERE, |
| 288 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, | 276 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, |
| 289 base::Unretained(this))); | 277 base::Unretained(this))); |
| 290 } | 278 } |
| 291 | 279 |
| 292 void WatcherThreadManager::ProcessRequestsOnBackendThread() { | 280 void WatcherThreadManager::ProcessRequestsOnBackendThread() { |
| 293 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); | 281 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); |
| 294 | 282 |
| 295 Requests requests; | 283 Requests requests; |
| 296 { | 284 { |
| 297 base::AutoLock auto_lock(lock_); | 285 base::AutoLock auto_lock(lock_); |
| 298 requests_.swap(requests); | 286 requests_.swap(requests); |
| 299 } | 287 } |
| 300 for (size_t i = 0; i < requests.size(); ++i) { | 288 for (size_t i = 0; i < requests.size(); ++i) { |
| 301 if (requests[i].type == REQUEST_START) { | 289 if (requests[i].type == REQUEST_START) { |
| 302 backend_.StartWatching(requests[i].start_data); | 290 backend_.StartWatching(requests[i].start_data); |
| 303 } else { | 291 } else { |
| 304 backend_.StopWatching(requests[i].stop_id); | 292 backend_.StopWatching(requests[i].stop_id); |
| 305 requests[i].stop_event->Signal(); | |
| 306 } | 293 } |
| 307 } | 294 } |
| 308 } | 295 } |
| 309 | 296 |
| 310 WatcherThreadManager::WatcherThreadManager() | 297 WatcherThreadManager::WatcherThreadManager() |
| 311 : thread_(kWatcherThreadName) { | 298 : thread_(kWatcherThreadName) { |
| 312 base::Thread::Options thread_options; | 299 base::Thread::Options thread_options; |
| 313 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); | 300 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); |
| 314 thread_.StartWithOptions(thread_options); | 301 thread_.StartWithOptions(thread_options); |
| 315 } | 302 } |
| 316 | 303 |
| 304 } // namespace |
| 305 |
| 317 // HandleWatcher::StateBase and subclasses ------------------------------------- | 306 // HandleWatcher::StateBase and subclasses ------------------------------------- |
| 318 | 307 |
| 319 // The base class of HandleWatcher's state. Owns the user's callback and | 308 // The base class of HandleWatcher's state. Owns the user's callback and |
| 320 // monitors the current thread's MessageLoop to know when to force the callback | 309 // monitors the current thread's MessageLoop to know when to force the callback |
| 321 // to run (with an error) even though the pipe hasn't been signaled yet. | 310 // to run (with an error) even though the pipe hasn't been signaled yet. |
| 322 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { | 311 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { |
| 323 public: | 312 public: |
| 324 StateBase(HandleWatcher* watcher, | 313 StateBase(HandleWatcher* watcher, |
| 325 const base::Callback<void(MojoResult)>& callback) | 314 const base::Callback<void(MojoResult)>& callback) |
| 326 : watcher_(watcher), | 315 : watcher_(watcher), |
| (...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 472 this, handle, handle_signals, deadline, callback)); | 461 this, handle, handle_signals, deadline, callback)); |
| 473 } | 462 } |
| 474 } | 463 } |
| 475 | 464 |
| 476 void HandleWatcher::Stop() { | 465 void HandleWatcher::Stop() { |
| 477 state_.reset(); | 466 state_.reset(); |
| 478 } | 467 } |
| 479 | 468 |
| 480 } // namespace common | 469 } // namespace common |
| 481 } // namespace mojo | 470 } // namespace mojo |
| OLD | NEW |