| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/common/handle_watcher.h" | 5 #include "mojo/common/handle_watcher.h" |
| 6 | 6 |
| 7 #include <map> | 7 #include <map> |
| 8 | 8 |
| 9 #include "base/atomic_sequence_num.h" | 9 #include "base/atomic_sequence_num.h" |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 64 // WatcherBackend is responsible for managing the requests and interacting with | 64 // WatcherBackend is responsible for managing the requests and interacting with |
| 65 // MessagePumpMojo. All access (outside of creation/destruction) is done on the | 65 // MessagePumpMojo. All access (outside of creation/destruction) is done on the |
| 66 // thread WatcherThreadManager creates. | 66 // thread WatcherThreadManager creates. |
| 67 class WatcherBackend : public MessagePumpMojoHandler { | 67 class WatcherBackend : public MessagePumpMojoHandler { |
| 68 public: | 68 public: |
| 69 WatcherBackend(); | 69 WatcherBackend(); |
| 70 virtual ~WatcherBackend(); | 70 virtual ~WatcherBackend(); |
| 71 | 71 |
| 72 void StartWatching(const WatchData& data); | 72 void StartWatching(const WatchData& data); |
| 73 | 73 |
| 74 // Cancels a previously schedule request to start a watch. When done signals | 74 // Cancels a previously scheduled request to start a watch. |
| 75 // |event|. | 75 void StopWatching(WatcherID watcher_id); |
| 76 void StopWatching(WatcherID watcher_id, base::WaitableEvent* event); | |
| 77 | 76 |
| 78 private: | 77 private: |
| 79 typedef std::map<Handle, WatchData> HandleToWatchDataMap; | 78 typedef std::map<Handle, WatchData> HandleToWatchDataMap; |
| 80 | 79 |
| 81 // Invoked when a handle needs to be removed and notified. | 80 // Invoked when a handle needs to be removed and notified. |
| 82 void RemoveAndNotify(const Handle& handle, MojoResult result); | 81 void RemoveAndNotify(const Handle& handle, MojoResult result); |
| 83 | 82 |
| 84 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found | 83 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found |
| 85 // and sets |handle| to the Handle. Returns false if not a known id. | 84 // and sets |handle| to the Handle. Returns false if not a known id. |
| 86 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; | 85 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; |
| (...skipping 18 matching lines...) Expand all Loading... |
| 105 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); | 104 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
| 106 | 105 |
| 107 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); | 106 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
| 108 | 107 |
| 109 handle_to_data_[data.handle] = data; | 108 handle_to_data_[data.handle] = data; |
| 110 message_pump_mojo->AddHandler(this, data.handle, | 109 message_pump_mojo->AddHandler(this, data.handle, |
| 111 data.handle_signals, | 110 data.handle_signals, |
| 112 data.deadline); | 111 data.deadline); |
| 113 } | 112 } |
| 114 | 113 |
| 115 void WatcherBackend::StopWatching(WatcherID watcher_id, | 114 void WatcherBackend::StopWatching(WatcherID watcher_id) { |
| 116 base::WaitableEvent* event) { | |
| 117 // Because of the thread hop it is entirely possible to get here and not | 115 // Because of the thread hop it is entirely possible to get here and not |
| 118 // have a valid handle registered for |watcher_id|. | 116 // have a valid handle registered for |watcher_id|. |
| 119 Handle handle; | 117 Handle handle; |
| 120 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | 118 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { |
| 121 handle_to_data_.erase(handle); | 119 handle_to_data_.erase(handle); |
| 122 message_pump_mojo->RemoveHandler(handle); | 120 message_pump_mojo->RemoveHandler(handle); |
| 123 } | 121 } |
| 124 event->Signal(); | |
| 125 } | 122 } |
| 126 | 123 |
| 127 void WatcherBackend::RemoveAndNotify(const Handle& handle, | 124 void WatcherBackend::RemoveAndNotify(const Handle& handle, |
| 128 MojoResult result) { | 125 MojoResult result) { |
| 129 if (handle_to_data_.count(handle) == 0) | 126 if (handle_to_data_.count(handle) == 0) |
| 130 return; | 127 return; |
| 131 | 128 |
| 132 const WatchData data(handle_to_data_[handle]); | 129 const WatchData data(handle_to_data_[handle]); |
| 133 handle_to_data_.erase(handle); | 130 handle_to_data_.erase(handle); |
| 134 message_pump_mojo->RemoveHandler(handle); | 131 message_pump_mojo->RemoveHandler(handle); |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 175 WatcherID StartWatching(const Handle& handle, | 172 WatcherID StartWatching(const Handle& handle, |
| 176 MojoHandleSignals handle_signals, | 173 MojoHandleSignals handle_signals, |
| 177 base::TimeTicks deadline, | 174 base::TimeTicks deadline, |
| 178 const base::Callback<void(MojoResult)>& callback); | 175 const base::Callback<void(MojoResult)>& callback); |
| 179 | 176 |
| 180 // Stops watching a handle. | 177 // Stops watching a handle. |
| 181 // This may be invoked on any thread. | 178 // This may be invoked on any thread. |
| 182 void StopWatching(WatcherID watcher_id); | 179 void StopWatching(WatcherID watcher_id); |
| 183 | 180 |
| 184 private: | 181 private: |
| 182 enum RequestType { |
| 183 REQUEST_START, |
| 184 REQUEST_STOP, |
| 185 }; |
| 186 |
| 187 // See description of |requests_| for details. |
| 188 struct RequestData { |
| 189 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} |
| 190 |
| 191 RequestType type; |
| 192 WatchData start_data; |
| 193 WatcherID stop_id; |
| 194 base::WaitableEvent* stop_event; |
| 195 }; |
| 196 |
| 197 typedef std::vector<RequestData> Requests; |
| 198 |
| 185 friend struct DefaultSingletonTraits<WatcherThreadManager>; | 199 friend struct DefaultSingletonTraits<WatcherThreadManager>; |
| 200 |
| 186 WatcherThreadManager(); | 201 WatcherThreadManager(); |
| 187 | 202 |
| 203 // Schedules a request on the background thread. See |requests_| for details. |
| 204 void AddRequest(const RequestData& data); |
| 205 |
| 206 // Processes requests added to |requests_|. This is invoked on the backend |
| 207 // thread. |
| 208 void ProcessRequestsOnBackendThread(); |
| 209 |
| 188 base::Thread thread_; | 210 base::Thread thread_; |
| 189 | 211 |
| 190 base::AtomicSequenceNumber watcher_id_generator_; | 212 base::AtomicSequenceNumber watcher_id_generator_; |
| 191 | 213 |
| 192 WatcherBackend backend_; | 214 WatcherBackend backend_; |
| 193 | 215 |
| 216 // Protects |requests_|. |
| 217 base::Lock lock_; |
| 218 |
| 219 // Start/Stop result in adding a RequestData to |requests_| (protected by |
| 220 // |lock_|). When the background thread wakes up it processes the requests. |
| 221 Requests requests_; |
| 222 |
| 194 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); | 223 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
| 195 }; | 224 }; |
| 196 | 225 |
| 197 WatcherThreadManager::~WatcherThreadManager() { | 226 WatcherThreadManager::~WatcherThreadManager() { |
| 198 thread_.Stop(); | 227 thread_.Stop(); |
| 199 } | 228 } |
| 200 | 229 |
| 201 WatcherThreadManager* WatcherThreadManager::GetInstance() { | 230 WatcherThreadManager* WatcherThreadManager::GetInstance() { |
| 202 return Singleton<WatcherThreadManager>::get(); | 231 return Singleton<WatcherThreadManager>::get(); |
| 203 } | 232 } |
| 204 | 233 |
| 205 WatcherID WatcherThreadManager::StartWatching( | 234 WatcherID WatcherThreadManager::StartWatching( |
| 206 const Handle& handle, | 235 const Handle& handle, |
| 207 MojoHandleSignals handle_signals, | 236 MojoHandleSignals handle_signals, |
| 208 base::TimeTicks deadline, | 237 base::TimeTicks deadline, |
| 209 const base::Callback<void(MojoResult)>& callback) { | 238 const base::Callback<void(MojoResult)>& callback) { |
| 210 WatchData data; | 239 RequestData request_data; |
| 211 data.id = watcher_id_generator_.GetNext(); | 240 request_data.type = REQUEST_START; |
| 212 data.handle = handle; | 241 request_data.start_data.id = watcher_id_generator_.GetNext(); |
| 213 data.callback = callback; | 242 request_data.start_data.handle = handle; |
| 214 data.handle_signals = handle_signals; | 243 request_data.start_data.callback = callback; |
| 215 data.deadline = deadline; | 244 request_data.start_data.handle_signals = handle_signals; |
| 216 data.message_loop = base::MessageLoopProxy::current(); | 245 request_data.start_data.deadline = deadline; |
| 246 request_data.start_data.message_loop = base::MessageLoopProxy::current(); |
| 217 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), | 247 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), |
| 218 data.message_loop.get()); | 248 request_data.start_data.message_loop.get()); |
| 219 // We own |thread_|, so it's safe to use Unretained() here. | 249 AddRequest(request_data); |
| 220 thread_.message_loop()->PostTask( | 250 return request_data.start_data.id; |
| 221 FROM_HERE, | |
| 222 base::Bind(&WatcherBackend::StartWatching, | |
| 223 base::Unretained(&backend_), | |
| 224 data)); | |
| 225 return data.id; | |
| 226 } | 251 } |
| 227 | 252 |
| 228 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { | 253 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
| 254 // Handle the case of StartWatching() followed by StopWatching() before |
| 255 // |thread_| woke up. |
| 256 { |
| 257 base::AutoLock auto_lock(lock_); |
| 258 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { |
| 259 if (i->type == REQUEST_START && i->start_data.id == watcher_id) { |
| 260 // Watcher ids are not reused, so if we find it we can stop. |
| 261 requests_.erase(i); |
| 262 return; |
| 263 } |
| 264 } |
| 265 } |
| 266 |
| 229 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 267 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 230 base::WaitableEvent event(true, false); | 268 base::WaitableEvent event(true, false); |
| 231 // We own |thread_|, so it's safe to use Unretained() here. | 269 RequestData request_data; |
| 232 thread_.message_loop()->PostTask( | 270 request_data.type = REQUEST_STOP; |
| 233 FROM_HERE, | 271 request_data.stop_id = watcher_id; |
| 234 base::Bind(&WatcherBackend::StopWatching, | 272 request_data.stop_event = &event; |
| 235 base::Unretained(&backend_), | 273 AddRequest(request_data); |
| 236 watcher_id, | |
| 237 &event)); | |
| 238 | 274 |
| 239 // We need to block until the handle is actually removed. | 275 // We need to block until the handle is actually removed. |
| 240 event.Wait(); | 276 event.Wait(); |
| 241 } | 277 } |
| 242 | 278 |
| 279 void WatcherThreadManager::AddRequest(const RequestData& data) { |
| 280 { |
| 281 base::AutoLock auto_lock(lock_); |
| 282 const bool was_empty = requests_.empty(); |
| 283 requests_.push_back(data); |
| 284 if (!was_empty) |
| 285 return; |
| 286 } |
| 287 // We own |thread_|, so it's safe to use Unretained() here. |
| 288 thread_.message_loop()->PostTask( |
| 289 FROM_HERE, |
| 290 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, |
| 291 base::Unretained(this))); |
| 292 } |
| 293 |
| 294 void WatcherThreadManager::ProcessRequestsOnBackendThread() { |
| 295 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); |
| 296 |
| 297 Requests requests; |
| 298 { |
| 299 base::AutoLock auto_lock(lock_); |
| 300 requests_.swap(requests); |
| 301 } |
| 302 for (size_t i = 0; i < requests.size(); ++i) { |
| 303 if (requests[i].type == REQUEST_START) { |
| 304 backend_.StartWatching(requests[i].start_data); |
| 305 } else { |
| 306 backend_.StopWatching(requests[i].stop_id); |
| 307 requests[i].stop_event->Signal(); |
| 308 } |
| 309 } |
| 310 } |
| 311 |
| 243 WatcherThreadManager::WatcherThreadManager() | 312 WatcherThreadManager::WatcherThreadManager() |
| 244 : thread_(kWatcherThreadName) { | 313 : thread_(kWatcherThreadName) { |
| 245 base::Thread::Options thread_options; | 314 base::Thread::Options thread_options; |
| 246 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); | 315 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); |
| 247 thread_.StartWithOptions(thread_options); | 316 thread_.StartWithOptions(thread_options); |
| 248 } | 317 } |
| 249 | 318 |
| 250 // HandleWatcher::State -------------------------------------------------------- | 319 // HandleWatcher::State -------------------------------------------------------- |
| 251 | 320 |
| 252 // Represents the state of the HandleWatcher. Owns the user's callback and | 321 // Represents the state of the HandleWatcher. Owns the user's callback and |
| 253 // monitors the current thread's MessageLoop to know when to force the callback | 322 // monitors the current thread's MessageLoop to know when to force the callback |
| 254 // to run (with an error) even though the pipe hasn't been signaled yet. | 323 // to run (with an error) even though the pipe hasn't been signaled yet. |
| 255 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { | 324 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { |
| 256 public: | 325 public: |
| 257 State(HandleWatcher* watcher, | 326 State(HandleWatcher* watcher, |
| 258 const Handle& handle, | 327 const Handle& handle, |
| 259 MojoHandleSignals handle_signals, | 328 MojoHandleSignals handle_signals, |
| 260 MojoDeadline deadline, | 329 MojoDeadline deadline, |
| 261 const base::Callback<void(MojoResult)>& callback) | 330 const base::Callback<void(MojoResult)>& callback) |
| 262 : watcher_(watcher), | 331 : watcher_(watcher), |
| 263 callback_(callback), | 332 callback_(callback), |
| 333 got_ready_(false), |
| 264 weak_factory_(this) { | 334 weak_factory_(this) { |
| 265 base::MessageLoop::current()->AddDestructionObserver(this); | 335 base::MessageLoop::current()->AddDestructionObserver(this); |
| 266 | 336 |
| 267 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | 337 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( |
| 268 handle, | 338 handle, |
| 269 handle_signals, | 339 handle_signals, |
| 270 MojoDeadlineToTimeTicks(deadline), | 340 MojoDeadlineToTimeTicks(deadline), |
| 271 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr())); | 341 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr())); |
| 272 } | 342 } |
| 273 | 343 |
| 274 virtual ~State() { | 344 virtual ~State() { |
| 275 base::MessageLoop::current()->RemoveDestructionObserver(this); | 345 base::MessageLoop::current()->RemoveDestructionObserver(this); |
| 276 | 346 |
| 277 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | 347 // If we've been notified the handle is ready (|got_ready_| is true) then |
| 348 // the watch has been implicitly removed by |
| 349 // WatcherThreadManager/MessagePumpMojo and we don't have to call |
| 350 // StopWatching(). To do so would needlessly entail posting a task and |
| 351 // blocking until the background thread services it. |
| 352 if (!got_ready_) |
| 353 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); |
| 278 } | 354 } |
| 279 | 355 |
| 280 private: | 356 private: |
| 281 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { | 357 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { |
| 282 // The current thread is exiting. Simulate a watch error. | 358 // The current thread is exiting. Simulate a watch error. |
| 283 OnHandleReady(MOJO_RESULT_ABORTED); | 359 NotifyAndDestroy(MOJO_RESULT_ABORTED); |
| 284 } | 360 } |
| 285 | 361 |
| 286 void OnHandleReady(MojoResult result) { | 362 void OnHandleReady(MojoResult result) { |
| 363 got_ready_ = true; |
| 364 NotifyAndDestroy(result); |
| 365 } |
| 366 |
| 367 void NotifyAndDestroy(MojoResult result) { |
| 287 base::Callback<void(MojoResult)> callback = callback_; | 368 base::Callback<void(MojoResult)> callback = callback_; |
| 288 watcher_->Stop(); // Destroys |this|. | 369 watcher_->Stop(); // Destroys |this|. |
| 289 | 370 |
| 290 callback.Run(result); | 371 callback.Run(result); |
| 291 } | 372 } |
| 292 | 373 |
| 293 HandleWatcher* watcher_; | 374 HandleWatcher* watcher_; |
| 294 WatcherID watcher_id_; | 375 WatcherID watcher_id_; |
| 295 base::Callback<void(MojoResult)> callback_; | 376 base::Callback<void(MojoResult)> callback_; |
| 296 | 377 |
| 378 // Have we been notified that the handle is ready? |
| 379 bool got_ready_; |
| 380 |
| 297 // Used to weakly bind |this| to the WatcherThreadManager. | 381 // Used to weakly bind |this| to the WatcherThreadManager. |
| 298 base::WeakPtrFactory<State> weak_factory_; | 382 base::WeakPtrFactory<State> weak_factory_; |
| 299 }; | 383 }; |
| 300 | 384 |
| 301 // HandleWatcher --------------------------------------------------------------- | 385 // HandleWatcher --------------------------------------------------------------- |
| 302 | 386 |
| 303 HandleWatcher::HandleWatcher() { | 387 HandleWatcher::HandleWatcher() { |
| 304 } | 388 } |
| 305 | 389 |
| 306 HandleWatcher::~HandleWatcher() { | 390 HandleWatcher::~HandleWatcher() { |
| 307 } | 391 } |
| 308 | 392 |
| 309 void HandleWatcher::Start(const Handle& handle, | 393 void HandleWatcher::Start(const Handle& handle, |
| 310 MojoHandleSignals handle_signals, | 394 MojoHandleSignals handle_signals, |
| 311 MojoDeadline deadline, | 395 MojoDeadline deadline, |
| 312 const base::Callback<void(MojoResult)>& callback) { | 396 const base::Callback<void(MojoResult)>& callback) { |
| 313 DCHECK(handle.is_valid()); | 397 DCHECK(handle.is_valid()); |
| 314 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); | 398 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); |
| 315 | 399 |
| 316 state_.reset(new State(this, handle, handle_signals, deadline, callback)); | 400 state_.reset(new State(this, handle, handle_signals, deadline, callback)); |
| 317 } | 401 } |
| 318 | 402 |
| 319 void HandleWatcher::Stop() { | 403 void HandleWatcher::Stop() { |
| 320 state_.reset(); | 404 state_.reset(); |
| 321 } | 405 } |
| 322 | 406 |
| 323 } // namespace common | 407 } // namespace common |
| 324 } // namespace mojo | 408 } // namespace mojo |
| OLD | NEW |